超人不哭 發表於 2022-5-17 18:20:00

【Nodejs】Node.js中的"多线程"

<p>worker_threads 的出现让 Node.js 拥有<strong>多工作线程,但</strong>这个概念不同于Java等其它后端语言中的多线程。</p>
<p>Node.js 通过提供 cluster、child_process API 创建子进程的方式来赋予Node.js “多线程”能力。但是这种创建进程的方式会牺牲共享内存,并且数据通信必须通过json进行传输。(有一定的局限性和性能问题)</p>
<p>基于此 Node.js V10.5.0 提供了 worker_threads,它比 child_process 或 cluster更轻量级。 与child_process 或 cluster 不同,worker_threads 可以<strong>共享内存</strong>,通过传输 ArrayBuffer 实例或共享 SharedArrayBuffer 实例来实现。</p>
<p><strong>这里有一个误区:很多人可能认为在node.js核心模块中添加一个新的模块,来创建线程以及实现线程间同步问题,从而解决CPU密集型操作的问题?</strong></p>
<p>但事实并非如此,Node.js 并没有其它支持多线的程语言(如:java),诸如"synchronized"之类的关键字来实现线程同步的概念。<strong>Node.js的 worker_threads 区别于它们的多线程</strong>。如果添加线程,语言本身的性质将发生变化,所以不能将线程作为一组新的可用类或函数添加。</p>
<p>我们可以将其理解为:<strong>JavaScript和Node.js永远不会有线程</strong>,只有基于Node.js 架构的<strong>多工作线程</strong>。</p>
<p><strong>这张图很好的诠释了多工作线程机制。</strong>(1.理解node.js的event loop机制 2.和其它多线程语言对比性理解)</p>
<p><img src="https://img2022.cnblogs.com/blog/1228394/202205/1228394-20220517164235974-1885245290.png"></p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<h3 id="前置知识">前置知识</h3>
<p>Node.js 保持了JavaScript在浏览器中单线程的特点。它的优势是没有线程间数据同步的性能消耗也不会出现死锁的情况。所以它是线程安全并且性能高效的。</p>
<p>单线程有它的弱点,无法充分利用多核CPU 资源,CPU 密集型计算可能会导致 I/O 阻塞,以及出现错误可能会导致应用崩溃。</p>
<p>为了解决单线程弱点:</p>
<p>浏览器端: HTML5 制定了 Web Worker 标准(Web Worker 的作用,就是为 JavaScript 创造<strong>多线程环境</strong>,允许主线程创建 Worker 线程,将一些任务分配给后者运行)。</p>
<p>Node端:采用了和 Web Worker相同的思路来解决单线程中大量计算问题 ,官方提供了 child_process 模块和 cluster 模块, cluster 底层是基于child_process实现。</p>
<p>child_process、cluster都是用于创建子进程,然后子进程间通过事件消息来传递结果,这个可以很好地保持应用模型的简单和低依赖。从而解决无法利用多核 CPU 和程序健壮性问题。</p>
<p>Node V10.5.0: 提供了实验性质的 worker_threads模块,才让Node拥有了多工作线程。</p>
<p>Node V12.0.0:worker_threads 已经成为正式标准,可以在生产环境放心使用。</p>
<p>也有很多开发者认为 worker_threads 违背了nodejs设计的初衷,事实上那是它并没有真正理解 worker_threads 的底层原理。其次是每一种语言的出现都有它的历史背景和需要解决的问题,在技术发展的过程中各种语言都是在取长补短,worker_threads 的设计就是技术发展的需要。</p>
<h2 id="一nodejs事件循环模型">一、Nodejs事件循环模型</h2>
<h3 id="nodejs">Node.js</h3>
<p>Node.js是构建在 Chrome’s V8 引擎之上的JavaScript 运行时环境。事件驱动(event-driven)和非阻塞 I/O 模型(non-blocking I/O model)的语言特性使 Node.js 天生高效(efficient)且轻量(lightweight)。它使用 npm 作为包管理器。</p>
<p><strong>Event Loop</strong></p>
<p>事件循环(Event Loop)分发 I/O 任务,最终工作线程(Work Thread)将任务丢到线程池(Thread Pool)里去执行,而事件循环只要等待执行结果就可以了。</p>
<p><img src="https://img2022.cnblogs.com/blog/1228394/202205/1228394-20220517164729826-1571292476.png"></p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p>&nbsp;将上一张图再细化</p>
<p><img src="https://img2020.cnblogs.com/blog/647943/202110/647943-20211019090108753-658988618.jpg">&nbsp;</p>
<ul>
<li>Node Standard Library:Node.js 标准库</li>
<li>Node Bindings:将 V8 等暴露的 C/C++ 接口转成JavaScript Api</li>
<li>Chrome v8:JavaScript 引擎,采用 C/C++ 编写</li>
<li>libuv:由事件循环(Event Loop)和线程池(Async I/O)组成,负责所有 I/O 任务的分发与执行</li>
</ul>
<p style="margin-left: 60px"><img src="https://img2020.cnblogs.com/blog/647943/202110/647943-20211019090130339-213713226.jpg"></p>
<ol>
<li style="list-style-type: none"><ol>
<li>Client 请求到达 node api,该请求被添加到Event Queue(事件队列)。这是因为Node.js 无法同时处理多个请求。</li>
<li>Event Loop(事件循环) 始终检查 Event Queue 中是否有待处理事件,如果有就从 Event Queue 中从前到后依次取出,然后提供服务。</li>
<li>Event Loop 是单线程非阻塞I/O,它会把请求发送给 C++ Thread Pool(线程池)去处理,底层是基于C++ Libuv 异步I/O模型结构可以支持高并发。</li>
<li>现在 C++ Thread Pool有大量的请求,如数据库请求,文件请求等。</li>
<li>任何线程完成任务时,Callback(回调函数)就会被触发,并将响应发送给 Event Loop。</li>
<li>最终 Event Loop 会将请求返回给 Client。</li>
</ol></li>
</ol>
<h3 id="二一个实验">二、一个实验</h3>
<p>我们以计算10,000,000以内的素数为实验。</p>
<p>这个实验会涉及到父子进程通信。比官方 http server 并发请求更有说服力。</p>
<p><code>block_primes.js</code></p>
<div class="cnblogs_code">
<pre>const min = 2<span style="color: rgba(0, 0, 0, 1)">
const max </span>= 1e7

<span style="color: rgba(0, 0, 255, 1)">function</span><span style="color: rgba(0, 0, 0, 1)"> generatePrimes(start, range) {
let primes </span>=<span style="color: rgba(0, 0, 0, 1)"> []
let isPrime </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
let end </span>= start +<span style="color: rgba(0, 0, 0, 1)"> range
</span><span style="color: rgba(0, 0, 255, 1)">for</span> (let i = start; i &lt; end; i++<span style="color: rgba(0, 0, 0, 1)">) {
    </span><span style="color: rgba(0, 0, 255, 1)">for</span> (let j = min; j &lt; Math.sqrt(end); j++<span style="color: rgba(0, 0, 0, 1)">) {
      </span><span style="color: rgba(0, 0, 255, 1)">if</span> (i !== j &amp;&amp; i%j === 0<span style="color: rgba(0, 0, 0, 1)">) {
      isPrime </span>= <span style="color: rgba(0, 0, 255, 1)">false</span>
      <span style="color: rgba(0, 0, 255, 1)">break</span><span style="color: rgba(0, 0, 0, 1)">
      }
    }
    </span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (isPrime) {
      primes.push(i)
    }
    isPrime </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
}
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> primes
}
const primes </span>=<span style="color: rgba(0, 0, 0, 1)"> generatePrimes(min, max)
console.log(primes.length)</span></pre>
</div>
<p>单线程计算</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 运行</span>
<span style="color: rgba(0, 0, 0, 1)">$ time node block_primes.js

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 输出</span><span style="color: rgba(0, 128, 0, 1)">
//</span><span style="color: rgba(0, 128, 0, 1)"> 664579</span>

8<span style="color: rgba(0, 0, 0, 1)">.11s user
</span>0<span style="color: rgba(0, 0, 0, 1)">.03s system
</span>99%<span style="color: rgba(0, 0, 0, 1)"> cpu
</span>8.147 total</pre>
</div>
<p>结论:单核利用率 99%,总耗时 8.147s</p>
<h3 id="三cluster-了解">三、cluster [了解]</h3>
<p>如今的机器基本都是多核 cpu。为了能充分利用 cpu 计算能力,node.js V0.8(2012-06-22) 新增了一个内置模块 cluster。它可以通过一个父进程管理一堆子进程的方式来实现集群的功能。</p>
<p>cluster 底层就是 child_process,master 进程做总控,启动 1 个 agent 和 n 个 worker,agent 来做任务调度,获取任务,并分配给某个空闲的 worker 来做。</p>
<p>需要注意的是:每个 worker 进程通过使用 child_process.fork() 函数,基于 IPC(Inter-Process Communication,进程间通信),实现与 master 进程间通信。</p>
<p>fork 出的子进程拥有和父进程一致的数据空间、堆、栈等资源(fork 当时),但是是独立的,也就是说二者不能共享这些存储空间。 那我们直接用 fork 自己实现不就行了。</p>
<p>这样的方式仅仅实现了多进程。多进程运行还涉及父子进程通信,子进程管理,以及负载均衡等问题,这些特性 cluster 帮你实现了。</p>
<p>cluster_primes.js<br>该方法明显是一个 cpu 密集型计算。 我本地电脑配置为 MacBook Pro (15-inch, 2018) ,运行该测试代码,生成的报告显示,需要9.731s 时间:</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 计算 start, 至 start + range 之间的素数</span>
<span style="color: rgba(0, 0, 255, 1)">function</span><span style="color: rgba(0, 0, 0, 1)"> generatePrimes(start, range) {
    let primes </span>=<span style="color: rgba(0, 0, 0, 1)"> []
    let isPrime </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
    let end </span>= start +<span style="color: rgba(0, 0, 0, 1)"> range
    </span><span style="color: rgba(0, 0, 255, 1)">for</span> (let i = start; i &lt; end; i++<span style="color: rgba(0, 0, 0, 1)">) {
      </span><span style="color: rgba(0, 0, 255, 1)">for</span> (let j = min; j &lt; Math.sqrt(end); j++<span style="color: rgba(0, 0, 0, 1)">) {
            </span><span style="color: rgba(0, 0, 255, 1)">if</span> (i !== j &amp;&amp; i%j === 0<span style="color: rgba(0, 0, 0, 1)">) {
                isPrime </span>= <span style="color: rgba(0, 0, 255, 1)">false</span>
                <span style="color: rgba(0, 0, 255, 1)">break</span><span style="color: rgba(0, 0, 0, 1)">
            }
      }

      </span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (isPrime) {
            primes.push(i)
      }

      isPrime </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
    }
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> primes
}


</span><span style="color: rgba(0, 128, 0, 1)">/*</span><span style="color: rgba(0, 128, 0, 1)">*
* - 加载clustr模块
* - 设定启动进程数为cpu个数
</span><span style="color: rgba(0, 128, 0, 1)">*/</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> cluster = require('cluster'<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">var</span> numCPUs = require('os'<span style="color: rgba(0, 0, 0, 1)">).cpus().length

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 素数的计算</span>
const min = 2<span style="color: rgba(0, 0, 0, 1)">
const max </span>= 1e7 <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> = 10000000</span>
let primes =<span style="color: rgba(0, 0, 0, 1)"> []


</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (cluster.isMaster) {
    const range </span>= Math.ceil((max - min) /<span style="color: rgba(0, 0, 0, 1)"> numCPUs)
    let start </span>=<span style="color: rgba(0, 0, 0, 1)"> min

    </span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> i = 0; i &lt; numCPUs; i++<span style="color: rgba(0, 0, 0, 1)">) {
      const worker </span>= cluster.fork() <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 启动子进程</span>
      <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">在主进程中,这会发送消息给特定的工作进程</span>
<span style="color: rgba(0, 0, 0, 1)">      worker.send({ start: start, range: range })

      start </span>+=<span style="color: rgba(0, 0, 0, 1)"> range
      
      worker.on(</span>'message', (msg) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
            primes </span>=<span style="color: rgba(0, 0, 0, 1)"> primes.concat(msg.data)
            worker.kill()
      })
      
    }
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 当任何一个工作进程关闭的时候,cluster 模块都将会触发 'exit' 事件</span>
    cluster.on('exit', <span style="color: rgba(0, 0, 255, 1)">function</span><span style="color: rgba(0, 0, 0, 1)">(worker, code, signal) {
      console.log(</span>'worker ' + worker.process.pid + ' died'<span style="color: rgba(0, 0, 0, 1)">)
    })
} </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> {
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 监听子进程发送的信息</span>
    process.on('message', (msg) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
      console.log(msg)
      const { start, range} </span>=<span style="color: rgba(0, 0, 0, 1)"> msg
      const data </span>=<span style="color: rgba(0, 0, 0, 1)"> generatePrimes(start, range)
      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 在工作进程中,这会发送消息给主进程</span>
<span style="color: rgba(0, 0, 0, 1)">      process.send({ data: data })
    })
}</span></pre>
</div>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 执行</span>
<span style="color: rgba(0, 0, 0, 1)">$ time node cluster_primes.js

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 输出 可以看出一共启动了 12个子进程</span>
{ start: 2500004, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>833336, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>3333338, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>2, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>1666670, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>4166672, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>5000006, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>5833340, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>6666674, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>7500008, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>8333342, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>9166676, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
worker </span>31008<span style="color: rgba(0, 0, 0, 1)"> died
worker </span>31009<span style="color: rgba(0, 0, 0, 1)"> died
worker </span>31010<span style="color: rgba(0, 0, 0, 1)"> died
worker </span>31011<span style="color: rgba(0, 0, 0, 1)"> died
worker </span>31012<span style="color: rgba(0, 0, 0, 1)"> died
worker </span>31013<span style="color: rgba(0, 0, 0, 1)"> died
worker </span>31014<span style="color: rgba(0, 0, 0, 1)"> died
worker </span>31015<span style="color: rgba(0, 0, 0, 1)"> died
worker </span>31018<span style="color: rgba(0, 0, 0, 1)"> died
worker </span>31019<span style="color: rgba(0, 0, 0, 1)"> died
worker </span>31016<span style="color: rgba(0, 0, 0, 1)"> died
worker </span>31017<span style="color: rgba(0, 0, 0, 1)"> died

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 性能</span>
6<span style="color: rgba(0, 0, 0, 1)">.68s user
</span>0<span style="color: rgba(0, 0, 0, 1)">.24s system
</span>519%<span style="color: rgba(0, 0, 0, 1)"> cpu
</span>1.332 total</pre>
</div>
<h3 id="四child_process-了解"><strong>四、child_process</strong>&nbsp;[了解]</h3>
<p>在Node.js中,提供了一个 child_process 模块,通过它可以开启多个子进程,在多个子进程之间可以共享内存空间,可以通过子进程的互相通信来实现信息的交换。</p>
<p><code>child_process_main.js</code></p>
<div class="cnblogs_code">
<pre>const { fork } = require('child_process'<span style="color: rgba(0, 0, 0, 1)">)
const worker </span>= fork(__dirname + '/child_process_worker.js'<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">var</span> numCPUs = require('os'<span style="color: rgba(0, 0, 0, 1)">).cpus().length

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 接收工作进程计算结果</span>
let max = 1e7<span style="color: rgba(0, 0, 0, 1)">
let min </span>= 2<span style="color: rgba(0, 0, 0, 1)">
let start </span>= 2<span style="color: rgba(0, 0, 0, 1)">
let primes </span>=<span style="color: rgba(0, 0, 0, 1)"> []

const range </span>= Math.ceil((max - min) /<span style="color: rgba(0, 0, 0, 1)"> numCPUs)

</span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> i = 0; i &lt; numCPUs; i++<span style="color: rgba(0, 0, 0, 1)">) {
    worker.send({ start: start, range: range })
    start </span>+=<span style="color: rgba(0, 0, 0, 1)"> range
    worker.on(</span>'message', (msg) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
      primes </span>=<span style="color: rgba(0, 0, 0, 1)"> primes.concat(msg.data)
      worker.kill()
    })
}</span></pre>
</div>
<p>child_<em>process_</em>worker.js</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 素数的计算</span>
<span style="color: rgba(0, 0, 255, 1)">function</span><span style="color: rgba(0, 0, 0, 1)"> generatePrimes(start, range) {
    let primes </span>=<span style="color: rgba(0, 0, 0, 1)"> []
    let isPrime </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
    let end </span>= start +<span style="color: rgba(0, 0, 0, 1)"> range
    </span><span style="color: rgba(0, 0, 255, 1)">for</span> (let i = start; i &lt; end; i++<span style="color: rgba(0, 0, 0, 1)">) {
      </span><span style="color: rgba(0, 0, 255, 1)">for</span> (let j = 2; j &lt; Math.sqrt(end); j++<span style="color: rgba(0, 0, 0, 1)">) {
            </span><span style="color: rgba(0, 0, 255, 1)">if</span> (i !== j &amp;&amp; i%j === 0<span style="color: rgba(0, 0, 0, 1)">) {
                isPrime </span>= <span style="color: rgba(0, 0, 255, 1)">false</span>
                <span style="color: rgba(0, 0, 255, 1)">break</span><span style="color: rgba(0, 0, 0, 1)">
            }
      }

      </span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (isPrime) {
            primes.push(i)
      }

      isPrime </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
    }
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> primes
}


</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 监听子进程发送的信息</span>
process.on('message', (msg) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    const { start, range} </span>=<span style="color: rgba(0, 0, 0, 1)"> msg
    console.log(msg)
    const data </span>=<span style="color: rgba(0, 0, 0, 1)"> generatePrimes(start, range)
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 在工作进程中,这会发送消息给主进程</span>
<span style="color: rgba(0, 0, 0, 1)">    process.send({ data: data })
})

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 收到kill信息,进程退出</span>
process.on('SIGHUP', <span style="color: rgba(0, 0, 255, 1)">function</span><span style="color: rgba(0, 0, 0, 1)">() {
    process.exit()
})</span></pre>
</div>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 执行 </span>
<span style="color: rgba(0, 0, 0, 1)">$ time node child_process_main.js

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 输出</span>
{ start: 2, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>833336, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>1666670, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>2500004, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>3333338, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>4166672, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>5000006, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>5833340, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>6666674, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>7500008, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>8333342, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }
{ start: </span>9166676, range: 833334<span style="color: rgba(0, 0, 0, 1)"> }

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 性能</span>
5<span style="color: rgba(0, 0, 0, 1)">.94s user
</span>0<span style="color: rgba(0, 0, 0, 1)">.06s system
</span>100%<span style="color: rgba(0, 0, 0, 1)"> cpu
</span>5.998 total</pre>
</div>
<p>结论:启动了12个子进程,cpu利用率为100%,总耗时5.998s</p>
<h3 id="五worker_threads">五、worker_threads</h3>
<h4 id="1加载-worker_threads-模块"><strong>1、加载 worker_threads 模块</strong></h4>
<p>node.js v10.5.0 引入的实验性质API,开启时需要使用 --experimental-worker 参数。</p>
<p>node.js v12.0.0 里面默认开启,也预示着您可以将该特性用于生产环境中。</p>
<div class="cnblogs_code">
<pre>/ v 10.15.3<span style="color: rgba(0, 0, 0, 1)">
$ node </span>-e "require('worker_threads'); console.log('success');"
<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 输出</span>
internal/modules/cjs/loader.js:584
    <span style="color: rgba(0, 0, 255, 1)">throw</span><span style="color: rgba(0, 0, 0, 1)"> err;
    </span>^<span style="color: rgba(0, 0, 0, 1)">

Error: Cannot find module </span>'worker_threads'<span style="color: rgba(0, 0, 0, 1)">
    at Function.Module._resolveFilename (internal</span>/modules/cjs/loader.js:582:15)
    at Function.Module._load (internal/modules/cjs/loader.js:508:25)
    at Module.require (internal/modules/cjs/loader.js:637:17)
    at require (internal/modules/cjs/helpers.js:22:18)
    at :1:1<span style="color: rgba(0, 0, 0, 1)">
    at Script.runInThisContext (vm.js:</span>119:20<span style="color: rgba(0, 0, 0, 1)">)
    at Object.runInThisContext (vm.js:</span>326:38<span style="color: rgba(0, 0, 0, 1)">)
    at Object.</span>&lt;anonymous&gt; (-wrapper:6:22<span style="color: rgba(0, 0, 0, 1)">)
    at Module._compile (internal</span>/modules/cjs/loader.js:701:30)
    at evalScript (internal/bootstrap/node.js:589:27<span style="color: rgba(0, 0, 0, 1)">)
</span>----------------------------------------------------------

<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> v 10.15.3</span>
$ node --experimental-worker -<span style="color: rgba(0, 0, 0, 1)">e
    </span>"require('worker_threads'); console.log('success');"
<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 输出</span>
<span style="color: rgba(0, 0, 0, 1)">success

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> v 12.6.0</span>
$ node -e "require('worker_threads'); console.log('success');"
<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 输出</span>
success</pre>
</div>
<h4 id="2官方介绍">2、官方介绍</h4>
<p>Workers (threads) (工作线程)对于执行CPU密集型的JavaScript操作非常有用。它们对I/O密集型工作没有多大帮助。js的内置异步I/O操作比 Workers 效率更高。</p>
<p>worker_threads 比使用 child_process 或 cluster可以获得的并行性更轻量级。 此外,worker_threads 可以有效地共享内存。</p>
<h4 id="3hello-world"><strong>3、Hello world</strong></h4>
<p>例子:threads_example1.js</p>
<div class="cnblogs_code">
<pre>const { Worker, isMainThread, parentPort } = require('worker_threads'<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (isMainThread) {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> This code is executed in the main thread and not in the worker.</span>

<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Create the worker.</span>
const worker = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Worker(__filename);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Listen for messages from the worker and print them.</span>
worker.on('message', (msg) =&gt;<span style="color: rgba(0, 0, 0, 1)"> { console.log(msg); });
} </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> This code is executed in the worker and not in the main thread.</span>

<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Send a message to the main thread.</span>
parentPort.postMessage('Hello world!'<span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>
</div>
<ul>
<li>Worker: 该类用于创建 worker对象。有一个必填参数__filename(文件路径),该文件会被worker执行。同时我们可以在主线程中通过worker.on监听message事件</li>
<li>isMainThread: 该对象用于区分是主线程(true)还是工作线程(false)</li>
<li>parentPort: 该对象的 postMessage 方法用于 worker 线程向主线程发送消息</li>
</ul>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">$ node threads_example1.js
Hello world</span>!</pre>
</div>
<h4 id="4对实验进行改造"><strong>4、对实验进行改造</strong></h4>
<p>我们使用&nbsp;<code>worker_threads</code>&nbsp;对该程序进行改造。</p>
<p><code>worker_threads_example.js</code></p>
<div class="cnblogs_code">
<pre>const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'<span style="color: rgba(0, 0, 0, 1)">)

</span><span style="color: rgba(0, 0, 255, 1)">function</span><span style="color: rgba(0, 0, 0, 1)"> generatePrimes(start, range) {
let primes </span>=<span style="color: rgba(0, 0, 0, 1)"> []
let isPrime </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
let end </span>= start +<span style="color: rgba(0, 0, 0, 1)"> range
</span><span style="color: rgba(0, 0, 255, 1)">for</span> (let i = start; i &lt; end; i++<span style="color: rgba(0, 0, 0, 1)">) {
    </span><span style="color: rgba(0, 0, 255, 1)">for</span> (let j = 2; j &lt; Math.sqrt(end); j++<span style="color: rgba(0, 0, 0, 1)">) {
      </span><span style="color: rgba(0, 0, 255, 1)">if</span> (i !== j &amp;&amp; i%j === 0<span style="color: rgba(0, 0, 0, 1)">) {
      isPrime </span>= <span style="color: rgba(0, 0, 255, 1)">false</span>
      <span style="color: rgba(0, 0, 255, 1)">break</span><span style="color: rgba(0, 0, 0, 1)">
      }
    }
    </span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (isPrime) {
      primes.push(i)
    }
    isPrime </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
}
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> primes
}


</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (isMainThread) {
const max </span>= 1e7<span style="color: rgba(0, 0, 0, 1)">
const min </span>= 2<span style="color: rgba(0, 0, 0, 1)">
let primes </span>=<span style="color: rgba(0, 0, 0, 1)"> []

const threadCount </span>= +process.argv || 2<span style="color: rgba(0, 0, 0, 1)">
const threads </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Set()
console.log(`Running </span><span style="color: rgba(0, 0, 255, 1)">with</span><span style="color: rgba(0, 0, 0, 1)"> ${threadCount} threads...`)
const range </span>= Math.ceil((max - min) /<span style="color: rgba(0, 0, 0, 1)"> threadCount)
let start </span>=<span style="color: rgba(0, 0, 0, 1)"> min

</span><span style="color: rgba(0, 0, 255, 1)">for</span> (let i = 0; i &lt; threadCount - 1; i++<span style="color: rgba(0, 0, 0, 1)">) {
    const myStart </span>=<span style="color: rgba(0, 0, 0, 1)"> start
    threads.add(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Worker(__filename, { workerData: { start: myStart, range }}))
    start </span>+=<span style="color: rgba(0, 0, 0, 1)"> range
}

threads.add(</span><span style="color: rgba(0, 0, 255, 1)">new</span> Worker(__filename, { workerData: { start, range: range + ((max - min + 1) %<span style="color: rgba(0, 0, 0, 1)"> threadCount)}}))

</span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (let worker of threads) {
    worker.on(</span>'error', (err) =&gt; { <span style="color: rgba(0, 0, 255, 1)">throw</span><span style="color: rgba(0, 0, 0, 1)"> err })
    worker.on(</span>'exit', () =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
      threads.</span><span style="color: rgba(0, 0, 255, 1)">delete</span><span style="color: rgba(0, 0, 0, 1)">(worker)
      console.log(`Thread exiting, ${threads.size} running...`)
      </span><span style="color: rgba(0, 0, 255, 1)">if</span> (threads.size === 0<span style="color: rgba(0, 0, 0, 1)">) {
      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> console.log(primes.join('\n'))</span>
<span style="color: rgba(0, 0, 0, 1)">      }
    })

    worker.on(</span>'message', (msg) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
      primes </span>=<span style="color: rgba(0, 0, 0, 1)"> primes.concat(msg)
    })
}
} </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> {
const data </span>=<span style="color: rgba(0, 0, 0, 1)"> generatePrimes(workerData.start, workerData.range)
parentPort.postMessage(data)
}</span></pre>
</div>
<p>该代码中在构造 worker的时候 传入了一个名为workerData的对象,这是我们希望线程在开始运行时可以访问的数据。</p>
<p>workerData 可以是任何一个JavaScript 值。</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 开启 1 个工作线程</span>
$ time node worker_threads_example.js 1<span style="color: rgba(0, 0, 0, 1)">
Running </span><span style="color: rgba(0, 0, 255, 1)">with</span> 1<span style="color: rgba(0, 0, 0, 1)"> threads...
Thread exiting, </span>0<span style="color: rgba(0, 0, 0, 1)"> running...
</span>8.25s user| 0.04s system| 100% cpu| 8.286<span style="color: rgba(0, 0, 0, 1)"> total

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 开启 2 个工作线程</span>
$ time node worker_threads_example.js 2<span style="color: rgba(0, 0, 0, 1)">
Running </span><span style="color: rgba(0, 0, 255, 1)">with</span> 2<span style="color: rgba(0, 0, 0, 1)"> threads...
Thread exiting, </span>1<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>0<span style="color: rgba(0, 0, 0, 1)"> running...
</span>7.22s user| 0.04s system| 175% cpu| 4.127<span style="color: rgba(0, 0, 0, 1)"> total

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 开启 4 个工作线程</span>
$ time node worker_threads_example.js 4<span style="color: rgba(0, 0, 0, 1)">
Running </span><span style="color: rgba(0, 0, 255, 1)">with</span> 4<span style="color: rgba(0, 0, 0, 1)"> threads...
Thread exiting, </span>3<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>2<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>1<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>0<span style="color: rgba(0, 0, 0, 1)"> running...
</span>6.75s user| 0.05s system| 313% cpu| 2.171<span style="color: rgba(0, 0, 0, 1)"> total

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 开启 8 个工作线程</span>
$ time node worker_threads_example.js 8<span style="color: rgba(0, 0, 0, 1)">
Running </span><span style="color: rgba(0, 0, 255, 1)">with</span> 8<span style="color: rgba(0, 0, 0, 1)"> threads...
Thread exiting, </span>7<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>6<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>5<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>4<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>3<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>2<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>1<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>0<span style="color: rgba(0, 0, 0, 1)"> running...
</span>6.53s user| 0.08s system| 473% cpu| 1.397<span style="color: rgba(0, 0, 0, 1)"> total

$ time node worker_threads_example.js </span>12<span style="color: rgba(0, 0, 0, 1)">
Running </span><span style="color: rgba(0, 0, 255, 1)">with</span> 12<span style="color: rgba(0, 0, 0, 1)"> threads...
Thread exiting, </span>11<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>10<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>9<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>8<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>7<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>6<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>5<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>4<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>3<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>2<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>1<span style="color: rgba(0, 0, 0, 1)"> running...
Thread exiting, </span>0<span style="color: rgba(0, 0, 0, 1)"> running...
</span>6.51s user| 0.10s system| 515% cpu| 1.282<span style="color: rgba(0, 0, 0, 1)"> total

$ time node worker_threads_example.js </span>60
7.67s user| 0.40s system| 471% cpu| 1.712 total </pre>
</div>
<h3 id="结论">结论:</h3>
<p>工作线程数从1 提升到 12 ,我们发现耗时从8.286s 提升到 1.282s,cpu 利用率从 100% 提升到了 515%。</p>
<p>当我门再次把工作线程数调大到60的时候,user 耗时达到7.67s,cpu利用率降低到471%,总耗时上升到 1.712s,所以并不是工作线程数越多越好。</p>
<p>worker_threads 极大的提升了cpu利用率,提高了程序的运行性能。但使用过程中需要合理控制。</p>
<p>参照:https://www.cnblogs.com/ShuiNian/p/15423317.html</p><br><br>
来源:https://www.cnblogs.com/vickylinj/p/16281914.html
頁: [1]
查看完整版本: 【Nodejs】Node.js中的"多线程"