时光停留 發表於 2021-12-10 17:48:00

Node.js 多线程——worker_threads

<h3 id="item-1">Node.js 是如何工作的</h3>
<p>Node.js 使用两种线程:<em>event loop</em>&nbsp;处理的主线程和&nbsp;<em>worker pool</em>&nbsp;中的几个辅助线程。</p>
<p>事件循环是一种机制,它采用回调(函数)并注册它们,准备在将来的某个时刻执行。它与相关的 JavaScript 代码在同一个线程中运行。当 JavaScript 操作阻塞线程时,事件循环也会被阻止。</p>
<p>工作池是一种执行模型,它产生并处理单独的线程,然后同步执行任务,并将结果返回到事件循环。事件循环使用返回的结果执行提供的回调。</p>
<p>简而言之,它负责异步 I/O操作 —— 主要是与系统磁盘和网络的交互。它主要由诸如&nbsp;<code>fs</code>(I/O 密集)或&nbsp;<code>crypto</code>(CPU 密集)等模块使用。工作池用&nbsp;libuv&nbsp;实现,当 Node 需要在 JavaScript 和 C++ 之间进行内部通信时,会导致轻微的延迟,但这几乎不可察觉。</p>
<p>基于这两种机制,我们可以编写如下代码:</p>
<div class="cnblogs_code">
<pre>fs.readFile(path.join(__dirname, './package.json'), (err, content) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (err) {
   </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
}

console.log(content.toString());
});</span></pre>
</div>
<p>前面提到的&nbsp;<code>fs</code>&nbsp;模块告诉工作池使用其中一个线程来读取文件的内容,并在完成后通知事件循环。然后事件循环获取提供的回调函数,并用文件的内容执行它。</p>
<p>以上是非阻塞代码的示例,我们不必同步等待某事的发生。只需告诉工作池去读取文件,并用结果去调用提供的函数即可。由于工作池有自己的线程,因此事件循环可以在读取文件时继续正常执行。</p>
<p>在不需要同步执行某些复杂操作时,这一切都相安无事:任何运行时间太长的函数都会阻塞线程。如果应用程序中有大量这类功能,就可能会明显降低服务器的吞吐量,甚至完全冻结它。在这种情况下,无法继续将工作委派给工作池。</p>
<p>在需要对数据进行复杂的计算时(如AI、机器学习或大数据)无法真正有效地使用 Node.js,因为操作阻塞了主(且唯一)线程,使服务器无响应。在 Node.js v10.5.0 发布之前就是这种情况,在这一版本增加了对多线程的支持。</p>
<h3 id="item-2">worker_threads</h3>
<p><code>worker_threads</code>&nbsp;模块允许我们创建功能齐全的多线程 Node.js 程序。</p>
<p>thread worker 是在单独的线程中生成的一段代码(通常从文件中取出)。</p>
<p>注意,术语&nbsp;<em>thread worker</em>,<em>worker</em>&nbsp;和&nbsp;<em>thread</em>&nbsp;经常互换使用,他们都指的是同一件事。</p>
<p>要想使用 thread worker,必须导入&nbsp;<code>worker_threads</code>&nbsp;模块。让我们先写一个函数来帮助我们生成这些thread worker,然后再讨论它们的属性。</p>
<div class="cnblogs_code">
<pre>type WorkerCallback = (err: any, result?: any) =&gt;<span style="color: rgba(0, 0, 0, 1)"> any;

export </span><span style="color: rgba(0, 0, 255, 1)">function</span> runWorker(path: string, cb: WorkerCallback, workerData: object | <span style="color: rgba(0, 0, 255, 1)">null</span> = <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) {
const worker </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Worker(path, { workerData });

worker.on(</span>'message', cb.bind(<span style="color: rgba(0, 0, 255, 1)">null</span>, <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">));
worker.on(</span>'error'<span style="color: rgba(0, 0, 0, 1)">, cb);

worker.on(</span>'exit', (exitCode) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
   </span><span style="color: rgba(0, 0, 255, 1)">if</span> (exitCode === 0<span style="color: rgba(0, 0, 0, 1)">) {
   </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
   }

   </span><span style="color: rgba(0, 0, 255, 1)">return</span> cb(<span style="color: rgba(0, 0, 255, 1)">new</span> Error(`Worker has stopped <span style="color: rgba(0, 0, 255, 1)">with</span><span style="color: rgba(0, 0, 0, 1)"> code ${exitCode}`));
});

</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> worker;
}</span></pre>
</div>
<p>要创建一个 worker,首先必须创建一个&nbsp;<code>Worker</code>&nbsp;类的实例。它的第一个参数提供了包含 worker 的代码的文件的路径;第二个参数提供了一个名为&nbsp;<code>workerData</code>&nbsp;的包含一个属性的对象。这是我们希望线程在开始运行时可以访问的数据。</p>
<p>请注意:不管你是用的是 JavaScript, 还是最终要转换为 JavaScript 的语言(例如,TypeScript),路径应该始终引用带有&nbsp;<code>.js</code>&nbsp;或&nbsp;<code>.mjs</code>&nbsp;扩展名的文件。</p>
<p>我还想指出为什么使用回调方法,而不是返回在触发&nbsp;<code>message</code>&nbsp;事件时将解决的 promise。这是因为 worker 可以发送许多&nbsp;<code>message</code>&nbsp;事件,而不是一个。</p>
<p>正如你在上面的例子中所看到的,线程间的通信是基于事件的,这意味着我们设置了 worker 在发送给定事件后调用的侦听器。</p>
<p>以下是最常见的事件:</p>
<div class="cnblogs_code">
<pre>worker.on('error', (error) =&gt; {});</pre>
</div>
<p>只要 worker 中有未捕获的异常,就会发出&nbsp;<code>error</code>&nbsp;事件。然后终止 worker,错误可以作为提供的回调中的第一个参数。</p>
<div class="cnblogs_code">
<pre>worker.on('exit', (exitCode) =&gt; {});</pre>
</div>
<p>在 worker 退出时会发出&nbsp;<code>exit</code>&nbsp;事件。如果在worker中调用了&nbsp;<code>process.exit()</code>,那么&nbsp;<code>exitCode</code>&nbsp;将被提供给回调。如果 worker 以&nbsp;<code>worker.terminate()</code>&nbsp;终止,则代码为1。</p>
<div class="cnblogs_code">
<pre>worker.on('online', () =&gt; {});</pre>
</div>
<p>只要 worker 停止解析 JavaScript 代码并开始执行,就会发出&nbsp;<code>online</code>&nbsp;事件。它不常用,但在特定情况下可以提供信息。</p>
<div class="cnblogs_code">
<pre>worker.on('message', (data) =&gt; {});</pre>
</div>
<p>只要 worker 将数据发送到父线程,就会发出&nbsp;<code>message</code>&nbsp;事件。</p>
<p>现在让我们来看看如何在线程之间共享数据。</p>
<h3 id="item-3">在线程之间交换数据</h3>
<p>要将数据发送到另一个线程,可以用&nbsp;<code>port.postMessage()</code>&nbsp;方法。它的原型如下:</p>
<div class="cnblogs_code">
<pre>port.postMessage(data[, transferList])</pre>
</div>
<p>port 对象可以是&nbsp;<code>parentPort</code>,也可以是&nbsp;<code>MessagePort</code>&nbsp;的实例 —— 稍后会详细讲解。</p>
<h4>数据参数</h4>
<p>第一个参数 —— 这里被称为&nbsp;<code>data</code>&nbsp;—— 是一个被复制到另一个线程的对象。它可以是复制算法所支持的任何内容。</p>
<p>数据由结构化克隆算法进行复制(包含function的对象引用都会报错DataCloneError:xxxx could not be cloned)。引用自 Mozilla:</p>
<blockquote>
<p>它通过递归输入对象来进行克隆,同时保持之前访问过的引用的映射,以避免无限遍历循环。</p>
</blockquote>
<p>该算法不复制函数、错误、属性描述符或原型链。还需要注意的是,以这种方式复制对象与使用 JSON 不同,因为它可以包含循环引用和类型化数组,而 JSON 不能。</p>
<p>由于能够复制类型化数组,该算法可以在线程之间共享内存。</p>
<p>实例:</p>
<h2>1、代码</h2>
<p>server.js</p>
<div class="cnblogs_code">
<pre>const express = require('express'<span style="color: rgba(0, 0, 0, 1)">);
const ws </span>= require('ws'<span style="color: rgba(0, 0, 0, 1)">);
const convertMessage </span>= require('./worker');<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">引入worker中的方法</span>
<span style="color: rgba(0, 0, 0, 1)">
const app </span>=<span style="color: rgba(0, 0, 0, 1)"> express()
const wsServer </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> ws.Server({ noServer: <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)"> });
wsServer.on(</span>'connection', (socket, req) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    socket.on(</span>'message', message =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
      console.log(message);
    });
});
const port </span>= 3002<span style="color: rgba(0, 0, 0, 1)">
app.get(</span>'/test', (req, res) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">1.接收到test请求,调用convertMessage,发起子线程</span>
    convertMessage().then(() =&gt;<span style="color: rgba(0, 0, 0, 1)"> {<br>      <span style="color: rgba(0, 128, 0, 1)">//5.converMessage resove后向客户端发送success</span>
      res.send(</span>'success'<span style="color: rgba(0, 0, 0, 1)">)
    })
})
app.get(</span>'/', async (req, res) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    res.send(</span>'Hello World!'<span style="color: rgba(0, 0, 0, 1)">)
})
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">启动服务</span>
const server = app.listen(port, () =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    console.log(`Example app listening at http:</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">localhost:${port}`)</span>
<span style="color: rgba(0, 0, 0, 1)">})
server.on(</span>'upgrade', (request, socket, head) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    wsServer.handleUpgrade(request, socket, head, socket </span>=&gt;<span style="color: rgba(0, 0, 0, 1)"> {
      wsServer.emit(</span>'connection'<span style="color: rgba(0, 0, 0, 1)">, socket, request);
    });
});</span></pre>
</div>
<p>&nbsp;</p>
<p>worker.js</p>
<div>
<div>
<div class="cnblogs_code">
<pre>const { Worker, workerData } = require('worker_threads'<span style="color: rgba(0, 0, 0, 1)">)

module.exports </span>= <span style="color: rgba(0, 0, 255, 1)">function</span><span style="color: rgba(0, 0, 0, 1)"> convertMessage() {
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span> Promise((resolve, reject) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
      </span><span style="color: rgba(0, 128, 0, 1)">//2.子线程中执行</span><span style="color: rgba(0, 128, 0, 1)">./index.js文件</span>
      const worker = <span style="color: rgba(0, 0, 255, 1)">new</span> Worker('./index.js'<span style="color: rgba(0, 0, 0, 1)">);
      worker.on(</span>'message', (message) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
            </span><span style="color: rgba(0, 128, 0, 1)">//4.</span><span style="color: rgba(0, 128, 0, 1)">接收到子线程通过postMessage传回的message</span>
<span style="color: rgba(0, 0, 0, 1)">            console.log(message)
            resolve()
      });
      worker.on(</span>'error'<span style="color: rgba(0, 0, 0, 1)">, reject);
      worker.on(</span>'exit', (code) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
            </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">子线程执行完成后触发exit事件</span>
            <span style="color: rgba(0, 0, 255, 1)">if</span> (code !== 0<span style="color: rgba(0, 0, 0, 1)">) {
                reject(</span><span style="color: rgba(0, 0, 255, 1)">new</span> Error(`Worker stopped <span style="color: rgba(0, 0, 255, 1)">with</span><span style="color: rgba(0, 0, 0, 1)"> exit code ${code}`));
            }
      })
    })
}</span></pre>
</div>
<p>index.js</p>
</div>
</div>
<div>
<div>
<div class="cnblogs_code">
<pre>const { parentPort, workerData } = require('worker_threads'<span style="color: rgba(0, 0, 0, 1)">)

</span><span style="color: rgba(0, 0, 255, 1)">for</span> (let i = 0; i &lt; 100; i++<span style="color: rgba(0, 0, 0, 1)">) {   
    </span><span style="color: rgba(0, 128, 0, 1)">//3.</span><span style="color: rgba(0, 128, 0, 1)">通过主线程的parentPort,向主线程发送消息</span>
<span style="color: rgba(0, 0, 0, 1)">    parentPort.postMessage(`index.js执行中${i}`)
}</span></pre>
</div>
<h2>2、测试结果</h2>
<p>后端通过node server.js启动服务,前端通过http://localhost:3002/test发起请求:</p>
<p>后端log如下:</p>
</div>
<p>&nbsp;<img src="https://img2020.cnblogs.com/blog/1228394/202112/1228394-20211210171427726-500761686.png"></p>
<p>前端结果:</p>
<p>&nbsp;<img src="https://img2020.cnblogs.com/blog/1228394/202112/1228394-20211210171537219-1808037799.png"></p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p>&nbsp;</p>
</div>
<p>&nbsp;</p><br><br>
来源:https://www.cnblogs.com/vickylinj/p/15673067.html
頁: [1]
查看完整版本: Node.js 多线程——worker_threads