莫正银 發表於 2021-3-10 09:04:00

Node.js 并发能力总结

<h2 data-tool="mdnice编辑器">简介</h2>
<p data-tool="mdnice编辑器">Node.js 有多重并发的能力,包括单线程异步、多线程、多进程等,这些能力可以根据业务进行不同选择,帮助提高代码的运行效率。</p>
<p data-tool="mdnice编辑器">本文希望通过读 p-limit、pm2 和 worker_threads 的一些代码,来了解 Node.js 的并发能力。</p>
<blockquote data-tool="mdnice编辑器">
<p>版本说明</p>
<ul class="list-paddingleft-2">
<li>Node.js 15.4.0</li>
<li>Npm: 7.0.15</li>
</ul>
</blockquote>
<h2 data-tool="mdnice编辑器">异步</h2>
<p data-tool="mdnice编辑器">Node.js 最常用的并发手段就是异步,不因为资源的消耗而阻塞程序的执行。</p>
<h3 data-tool="mdnice编辑器">什么样的并发</h3>
<p data-tool="mdnice编辑器">从逻辑上讲,异步并不是为了并发,而是为了不阻塞主线程。但是我们却可以同时发起多个异步操作,来起到并发的效果,虽然计算的过程是同步的。</p>
<p data-tool="mdnice编辑器">当性能的瓶颈是 I/O 操作,比如查询数据库、读取文件或者是访问网络,我们就可以使用异步的方式,来完成并发。而由于计算量比较小,所以不会过多的限制性能。每当这个时候,你只需要默默担心下游的 QPS 就好了。</p>
<p data-tool="mdnice编辑器">以 I/O 操作为主的应用,更适合用 Node.js 来做,比如 Web 服务中同时执行 M 个 SQL,亦或是离线脚本中同时访问发起 N 个 RPC 服务。</p>
<p data-tool="mdnice编辑器">所以在代码中使用 async/await 的确很舒服,但是适当的合并请求,使用 Promise.all 才能提高性能。</p>
<h3 data-tool="mdnice编辑器">限制并发</h3>
<p data-tool="mdnice编辑器">一旦你习惯了 Promise.all,同时了解了 EventLoop 的机制,你会发现 I/O 请求的限制往往在下游。因为对于 Node.js 来说,同时发送 10 个 RPC 请求和同时发送 100 个 RPC 请求的成本差别并不大,都是“发送-等待”的节奏,但是下游的“供应商”是会受不了的,这时你需要限制并发数。</p>
<h4 data-tool="mdnice编辑器">限制并发数</h4>
<p data-tool="mdnice编辑器">常用限制并发数的 Npm 包是 p-limit,大致用法如下。</p>
<div class="cnblogs_code">
<pre>const fns =<span style="color: rgba(0, 0, 0, 1)"> [
fetchSomething1,
fetchSomething2,
fetchSomething3,
];

const limit </span>= pLimit(10<span style="color: rgba(0, 0, 0, 1)">);
Promise.all(
fns
    .map(fn </span>=&gt;<span style="color: rgba(0, 0, 0, 1)">
      limit(async () </span>=&gt;<span style="color: rgba(0, 0, 0, 1)"> {
      await fn() </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> fetch1/2/3</span>
<span style="color: rgba(0, 0, 0, 1)">      })
    ) </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> map</span>
); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Promise.all</span></pre>
</div>
<p>&nbsp;</p>
<h4 data-tool="mdnice编辑器">pLimit 函数源码</h4>
<p data-tool="mdnice编辑器">为了深入了解,我们看一段 p-limit 的源码,具体如下。</p>
<div class="cnblogs_code">
<pre>const pLimit = concurrency =&gt;<span style="color: rgba(0, 0, 0, 1)"> {

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> ...</span>
<span style="color: rgba(0, 0, 0, 1)">
const queue </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Queue();
let activeCount </span>= 0<span style="color: rgba(0, 0, 0, 1)">;

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> ...</span>
<span style="color: rgba(0, 0, 0, 1)">
const enqueue </span>= (fn, resolve, ...args) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    queue.enqueue(run.bind(</span><span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">, fn, resolve, ...args));

    (async () </span>=&gt;<span style="color: rgba(0, 0, 0, 1)"> {
      await Promise.resolve();

      </span><span style="color: rgba(0, 0, 255, 1)">if</span> (activeCount &lt; concurrency &amp;&amp; queue.size &lt; 0<span style="color: rgba(0, 0, 0, 1)">) {
      queue.dequeue()();
      }
    })();
};

const generator </span>= (fn, ...args) =&gt; <span style="color: rgba(0, 0, 255, 1)">new</span> Promise(resolve =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    enqueue(fn, resolve, ...args);
});

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> ...</span>

<span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> generator;
};</span></pre>
</div>
<p>&nbsp;</p>
<p data-tool="mdnice编辑器">稍微解释一下上面的代码:</p>
<ol class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>
<p>pLimit 函数的入参 concurrency 是最大并发数,变量 activeCount 表示当前在执行的异步函数的数量</p>
<p>a.调用一次 pLimit 会生成一个限制并发的函数 generator</p>
<p>b.多个 generator 函数会共用一个队列</p>
<p>c. activeCount 需要小于 concurrency</p>
</li>
<li>
<p>pLimit 的实现依据队列(yocto-queue)</p>
<p>a. 队列有两个方法:equeue 和 dequeue,equeue 负责进入队列</p>
<p>b. 每个 generator 函数执行会将一个函数压如队列</p>
<p>c. 当发现 activeCount 小于最大并发数时,则调用 dequeue 弹出一个函数,并执行它。</p>
</li>
<li>每次被压入队列的不是原始函数,而是经过 run 函数处理的函数。</li>
</ol>
<h4 data-tool="mdnice编辑器">函数 run &amp; next</h4>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> run 函数</span>
const run = async (fn, resolve, ...args) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
activeCount</span>++<span style="color: rgba(0, 0, 0, 1)">;

const result </span>= (async () =&gt;<span style="color: rgba(0, 0, 0, 1)"> fn(...args))();
resolve(result);

</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
    await result;
} </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> {}

next();
};

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> next 函数</span>
const next = () =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
activeCount</span>--<span style="color: rgba(0, 0, 0, 1)">;

</span><span style="color: rgba(0, 0, 255, 1)">if</span> (queue.size &gt; 0<span style="color: rgba(0, 0, 0, 1)">) {
    queue.dequeue()();
}
};</span></pre>
</div>
<p>&nbsp;</p>
<ol class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>
<p>函数 run 做 3 件事情,这三件事情为顺序执行:</p>
<p>i&nbsp;. 让 activeCount +1</p>
<p>ii&nbsp;. 执行异步函数 fn,并将结果传递给 resolve</p>
<p>&nbsp; &nbsp; &nbsp;a.&nbsp;为保证 next 的顺序,采用了 await result</p>
<p>iii. 调用 next 函数</p>
</li>
<li>
<p>函数 next 做两件事情</p>
<p>i. 让 activeCount -1</p>
<p>ii. 当队列中还有元素时,弹出一个元素并执行,按照上面的逻辑,run 就会被调用</p>
</li>
</ol>
<p data-tool="mdnice编辑器">通过函数 enqueue、run 和 next,plimit 就产生了一个限制大小但不断消耗的异步函数队列,从而起到限流的作用。</p>
<p data-tool="mdnice编辑器">更详细的 p-limit 使用:Node 开发中使用 p-limit 限制并发原理<sup></sup></p>
<h3 data-tool="mdnice编辑器">超时怎么办</h3>
<p data-tool="mdnice编辑器">pPromise 并没有处理超时,简单的办法是可以使用 setTimeout 实现一个。</p>
<div class="cnblogs_code">
<pre>let timer = <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
const timerPromise </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Promise((resolve, reject) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
timer </span>= setTimeout(() =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    reject(</span>'time out'<span style="color: rgba(0, 0, 0, 1)">);
}, </span>1000<span style="color: rgba(0, 0, 0, 1)">);
});


Promise.all([
timerPromise,
fetchPromise,
])
.then(res </span>=&gt;<span style="color: rgba(0, 0, 0, 1)"> clearTimeout(timer))
.</span><span style="color: rgba(0, 0, 255, 1)">catch</span>(err =&gt; console.error(err));</pre>
</div>
<p>&nbsp;</p>
<p data-tool="mdnice编辑器">如果想看更正规的写法,可以参照 p-timeout 的代码,下面是一段的截取。</p>
<div class="cnblogs_code">
<pre>const pTimeout = (promise, milliseconds, fallback, options) =&gt; <span style="color: rgba(0, 0, 255, 1)">new</span> Promise((resolve, reject) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {


</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">...</span>
<span style="color: rgba(0, 0, 0, 1)">
const timer </span>= options.customTimers.setTimeout.call(undefined, () =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> (<span style="color: rgba(0, 0, 255, 1)">typeof</span> fallback === 'function'<span style="color: rgba(0, 0, 0, 1)">) {
      </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
      resolve(fallback());
      } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (error) {
      reject(error);
      }
      </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">;
    }

    const message </span>= <span style="color: rgba(0, 0, 255, 1)">typeof</span> fallback === 'string' ?<span style="color: rgba(0, 0, 0, 1)"> fallback : `Promise timed out after ${milliseconds} milliseconds`;
    const timeoutError </span>= fallback <span style="color: rgba(0, 0, 255, 1)">instanceof</span> Error ? fallback : <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> TimeoutError(message);
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> ...</span>
<span style="color: rgba(0, 0, 0, 1)">
    reject(timeoutError);
}, milliseconds);

(async () </span>=&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
      resolve(await promise);
    } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (error) {
      reject(error);
    } </span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)"> {
      options.customTimers.clearTimeout.call(undefined, timer);
    }
})();
});</span></pre>
</div>
<p>&nbsp;</p>
<p data-tool="mdnice编辑器">p-limit 做了更多的校验和更好的封装:</p>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>
<p>把超时和主程序封装在一个 Promise 中</p>
</li>
<ul class="list-paddingleft-2">
<li>更利于用户理解</li>
<li>灵活度更高:如果使用 Promise.all 只能通过 reject 表示超时,而 p-limit 可以通过 resolve 和 reject 两个方式触发超时</li>
</ul>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>
<p>对于超时后的错误提示做了封装</p>
</li>
<ul class="list-paddingleft-2">
<li>用户可以指定错误信息</li>
<li>超时可以触发特定的错误,或者是指定的函数</li>
</ul>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>clearTimeout 加在 finally 中的写法更舒服</li>
</ul>
<h3 data-tool="mdnice编辑器">Async Hooks</h3>
<p data-tool="mdnice编辑器">为了方便追踪异步资源,我们可以使用 async_hooks 模块。</p>
<blockquote data-tool="mdnice编辑器">
<p>The async_hooks module provides an API to track asynchronous resources.</p>
</blockquote>
<h4 data-tool="mdnice编辑器">什么是异步资源</h4>
<p data-tool="mdnice编辑器">在 NodeJS 中,一个异步资源表示为一个关联回调函数的对象。有以下几个特点:</p>
<ol class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>回调可以被多次调用(比如反复打开文件,多次创建网络连接);</li>
</ol><ol class="list-paddingleft-2" start="2" data-tool="mdnice编辑器">
<li>资源可以在回调被调用之前关闭;</li>
</ol><ol class="list-paddingleft-2" start="3" data-tool="mdnice编辑器">
<li>AsyncHook 更多的是异步抽象,而不会去管理这些异步的不同。</li>
</ol><ol class="list-paddingleft-2" start="4" data-tool="mdnice编辑器">
<li>当多个 Worker 使用时,每个线程会创建自己的 async_hooks 的接口。</li>
</ol>
<h4 data-tool="mdnice编辑器">概述</h4>
<blockquote data-tool="mdnice编辑器">
<p>https://nodejs.org/dist/latest-v15.x/docs/api/async_hooks.html</p>
</blockquote>
<p data-tool="mdnice编辑器">先看一段 async_hooks 的代码</p>
<div class="cnblogs_code">
<pre>const fs = require('fs'<span style="color: rgba(0, 0, 0, 1)">);
const asyncHooks </span>= require('async_hooks'<span style="color: rgba(0, 0, 0, 1)">);

let indent </span>= 0<span style="color: rgba(0, 0, 0, 1)">;
const asyncHook </span>=<span style="color: rgba(0, 0, 0, 1)"> asyncHooks.createHook({
init(asyncId, type, triggerAsyncId, resource) {
    const eid </span>=<span style="color: rgba(0, 0, 0, 1)"> asyncHooks.executionAsyncId();
    const indentStr </span>= ' '<span style="color: rgba(0, 0, 0, 1)">.repeat(indent);
    fs.writeSync(
      </span>1<span style="color: rgba(0, 0, 0, 1)">,
      ${indentStr}${type}(${asyncId}):
      trigger: ${triggerAsyncId} execution: ${eid}, resouce.keys: ${Object.keys(resource)}\n);
},
before(asyncId) {
    const indentStr </span>= ' '<span style="color: rgba(0, 0, 0, 1)">.repeat(indent);
    fs.writeSync(</span>1<span style="color: rgba(0, 0, 0, 1)">, ${indentStr}before:${asyncId}\n);
    indent </span>+= 2<span style="color: rgba(0, 0, 0, 1)">;
},
after(asyncId) {
    indent </span>-= 2<span style="color: rgba(0, 0, 0, 1)">;
    const indentStr </span>= ' '<span style="color: rgba(0, 0, 0, 1)">.repeat(indent);
    fs.writeSync(</span>1<span style="color: rgba(0, 0, 0, 1)">, ${indentStr}after:${asyncId}\n);
},
destroy(asyncId) {
    const indentStr </span>= ' '<span style="color: rgba(0, 0, 0, 1)">.repeat(indent);
    fs.writeSync(</span>1<span style="color: rgba(0, 0, 0, 1)">, ${indentStr}destroy:${asyncId}\n);
},
});

asyncHook.enable();

Promise.resolve(</span>'ok').then(() =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
setTimeout(() </span>=&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    console.log(</span>'&gt;&gt;&gt;'<span style="color: rgba(0, 0, 0, 1)">, asyncHooks.executionAsyncId());
}, </span>10<span style="color: rgba(0, 0, 0, 1)">);
});</span></pre>
</div>
<p>&nbsp;</p>
<p data-tool="mdnice编辑器">运行结果如下。</p>
<p><img src="https://img2020.cnblogs.com/blog/1313648/202103/1313648-20210310090156684-1404360028.png" alt="" width="536" height="166" loading="lazy"></p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<h4 data-tool="mdnice编辑器">Async Hooks 的方法</h4>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>asyncHook.enable() / asyncHook.disable():打开/关闭 Async Hooks</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>
<p>Hook callbacks:当资源进入不同阶段,下面的函数会被调用</p>
</li>
<ul class="list-paddingleft-2">
<li>init:被声明时调用</li>
<li>before:声明之后、执行之前调用</li>
<li>after:异步执行完成后立即调用</li>
<li>destroy:异步资源被销毁时被调用</li>
</ul>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>
<p>变量</p>
</li>
<ul class="list-paddingleft-2">
<li>asyncId:异步的 ID,每一次异步调用会使用唯一的 id,Hook callbacks 的方法,可以使用 asyncId 串起来。</li>
<li>triggerAsyncId: 触发当前 asyncId 的 asyncId。</li>
</ul>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>
<p>使用 asyncId 和 triggerAsyncId 可以完整的追踪到异步调用的顺序</p>
</li>
<ul class="list-paddingleft-2">
<li>其中根节点 root 是 1。</li>
<li>上面代码的调用顺序:1 -&gt; 2 -&gt; 3 -&gt; 4 -&gt; 5,6,7</li>
<li>映射代码上就是:root -&gt; Promise.resolve -&gt; Promise.then -&gt; setTimeout -&gt; console.log</li>
</ul>
</ul>
<h4 data-tool="mdnice编辑器">Async Hooks: type</h4>
<p data-tool="mdnice编辑器">在上面的 init 方法中 type 参数标明了资源类型,type 类型有 30 多种,具体可以参看下面的链接。</p>
<blockquote data-tool="mdnice编辑器">
<p>https://nodejs.org/dist/latest-v15.x/docs/api/async_hooks.html#async_hooks_type</p>
</blockquote>
<p data-tool="mdnice编辑器">本次程序主要用到了下面几种:</p>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>PROMISE:Promise 对象</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>Timeout:setTimeout 使用</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>TTYWRAP:console.log</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>SIGNALWRAP:console.log</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>TickObject:console.log</li>
</ul>
<h4 data-tool="mdnice编辑器">使用 Async Hooks 的注意事项</h4>
<p data-tool="mdnice编辑器">不要在 Async Hooks 的方法中使用异步函数,或者会引发异步的函数,如 console.log。因为 Async Hooks 方法就是在监控异步,而自身使用异步函数,会导致自己调用自己。</p>
<p data-tool="mdnice编辑器">如果想打印输出怎么办?</p>
<p data-tool="mdnice编辑器">好的解决办法是使用 fs.writeSync 或者 fs.writeFileSync,即同步输出的办法。</p>
<h2 data-tool="mdnice编辑器">多进程:Cluster</h2>
<p data-tool="mdnice编辑器">异步在 I/O 资源的利用上可以实现并发, 但是异步无法并发的使用 CPU 资源。多进程才能更好地利用多核操作系统的优点。</p>
<h3 data-tool="mdnice编辑器">启动子进程</h3>
<p data-tool="mdnice编辑器">Node.js 使用 Cluster 模块来完成多进程,我们可以通过 pm2 的代码来了解多进程,可以先从下面两个文件入手:</p>
<p data-tool="mdnice编辑器"><strong>lib/God.js 和 lib/God/ClusterMode.js。</strong></p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> lib/God.js</span>

<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> ...</span>
<span style="color: rgba(0, 0, 0, 1)">cluster.setupMaster({
    windowsHide: </span><span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">,
    exec : path.resolve(path.dirname(module.filename), </span>'ProcessContainer.js'<span style="color: rgba(0, 0, 0, 1)">)
});
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> ...</span><span style="color: rgba(0, 128, 0, 1)">
//</span><span style="color: rgba(0, 128, 0, 1)"> lib/God/ClusterMode.js</span>
<span style="color: rgba(0, 0, 0, 1)">
module.exports </span>= <span style="color: rgba(0, 0, 255, 1)">function</span><span style="color: rgba(0, 0, 0, 1)"> ClusterMode(God) {

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> ...</span>

<span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
    clu </span>=<span style="color: rgba(0, 0, 0, 1)"> cluster.fork({
      pm2_env: JSON.stringify(env_copy),
      windowsHide: </span><span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
    });
} </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)">(e) {
    God.logAndGenerateError(e);
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> cb(e);
}

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> ...</span>
<span style="color: rgba(0, 0, 0, 1)">

};</span></pre>
</div>
<p>&nbsp;</p>
<p data-tool="mdnice编辑器">上面两端代码主要讲了 cluster 的两个基本函数:</p>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>setupMaster</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>fork</li>
</ul>
<p data-tool="mdnice编辑器">简单理解,就是 setupMaster 用于设置,而 fork 用于创建子进程。比如下面的例子。</p>
<div class="cnblogs_code">
<pre>const cluster = require('cluster'<span style="color: rgba(0, 0, 0, 1)">);
cluster.setupMaster({
exec: </span>'worker.js'<span style="color: rgba(0, 0, 0, 1)">,
args: [</span>'--use', 'https'<span style="color: rgba(0, 0, 0, 1)">],
silent: </span><span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
});
cluster.fork();</span></pre>
</div>
<p>&nbsp;</p>
<h3 data-tool="mdnice编辑器">通信</h3>
<p data-tool="mdnice编辑器">进程间的通信使用的是事件监听来通信。</p>
<div class="cnblogs_code">
<pre>const cluster = require('cluster'<span style="color: rgba(0, 0, 0, 1)">);
const http </span>= require('http'<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (cluster.isMaster) {
const worker </span>=<span style="color: rgba(0, 0, 0, 1)"> cluster.fork();
[
    </span>'error'<span style="color: rgba(0, 0, 0, 1)">,
    </span>'exit'<span style="color: rgba(0, 0, 0, 1)">,
    </span>'listening'<span style="color: rgba(0, 0, 0, 1)">,
    </span>'message'<span style="color: rgba(0, 0, 0, 1)">,
    </span>'online'<span style="color: rgba(0, 0, 0, 1)">
].forEach(workerEvent </span>=&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    worker.on(workerEvent, msg </span>=&gt;<span style="color: rgba(0, 0, 0, 1)"> {
      console.log([${workerEvent}] from worker:, msg);
    });
});
} </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> {
http.createServer(</span><span style="color: rgba(0, 0, 255, 1)">function</span><span style="color: rgba(0, 0, 0, 1)">(req, res) {
    process.send(${req.url});
    res.end(Hello World: ${req.url});
}).listen(</span>8000<span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>
</div>
<p>&nbsp;</p>
<p data-tool="mdnice编辑器">运行后,访问:http://localhost:8000/ 后结果如下:</p>
<p><img src="https://img2020.cnblogs.com/blog/1313648/202103/1313648-20210310090254010-568360193.png" alt="" loading="lazy"></p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p data-tool="mdnice编辑器">通过 process.send,子进程可以给主进程发送信息,发送的信息可以是字符串,或者是可以进行 JSONStringify 的对象。而如果一个对象不能 JSONStringify,则会报错,比如下面这段代码。</p>
<div class="cnblogs_code">
<pre>http.createServer(<span style="color: rgba(0, 0, 255, 1)">function</span><span style="color: rgba(0, 0, 0, 1)">(req, res) {
    process.send(req);
    res.end(Hello World: ${req.url});
}).listen(</span>8000);</pre>
</div>
<p>&nbsp;</p>
<p data-tool="mdnice编辑器">会报错:</p>
<p><img src="https://img2020.cnblogs.com/blog/1313648/202103/1313648-20210310090312650-1876446489.png" alt="" width="715" height="212" loading="lazy"></p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p data-tool="mdnice编辑器">这就意味着 Cluster 的通信是消息通信,但是没办法共享内存。(貌似就是进程的定义,但是强调一下没什么坏处)</p>
<h3 data-tool="mdnice编辑器">cluster.settings</h3>
<p data-tool="mdnice编辑器">可以通过 Cluster 模块对子进程进行设置。</p>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>execArgv:执行参数</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>exec:执行命令,包含可执行文件、脚本文件、参数。</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>args: 执行参数</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>cwd:执行目录</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>serialization: 使传递数据支持高级序列化,比如 BigInt、Map、Set、ArrayBuffer 等 JavaScript 内嵌类型</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>silent:是否沉默,如果设置为 true,子进程的输出就被屏蔽了</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>uid:子进程的 uid</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>gid:子进程的 gid</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>inspectPort:子线程的 inspect 端口</li>
</ul>
<h3 data-tool="mdnice编辑器">如何榨干机器性能</h3>
<p data-tool="mdnice编辑器">可以参看:nodejs 如何使用 cluster 榨干机器性能<sup></sup></p>
<h2 data-tool="mdnice编辑器">多线程:Worker Threads</h2>
<p data-tool="mdnice编辑器">如果想要共享内存,就需要多线程,Node.js 引入了 worker_threads 模块来完成多线程。</p>
<h3 data-tool="mdnice编辑器">监听端口</h3>
<p data-tool="mdnice编辑器">假设有一个 server.js 的文件。</p>
<div class="cnblogs_code">
<pre>const http = require('http'<span style="color: rgba(0, 0, 0, 1)">);


const runServer </span>= port =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
const server </span>= http.createServer((_req, res) =&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    res.writeHead(</span>200, { 'Content-Type': 'text/plain'<span style="color: rgba(0, 0, 0, 1)"> });
    const msg </span>=<span style="color: rgba(0, 0, 0, 1)"> `server on ${port}`;
    console.log(msg);
    res.end(msg);
});
server.listen(port);
};


module.exports.runServer </span>= runServer;</pre>
</div>
<p>&nbsp;</p>
<h4 data-tool="mdnice编辑器">Cluster 监听</h4>
<p data-tool="mdnice编辑器">通过 cluster 监听端口,可以如下。</p>
<div class="cnblogs_code">
<pre>const cluster = require('cluster'<span style="color: rgba(0, 0, 0, 1)">);
const { runServer } </span>= require('./server'<span style="color: rgba(0, 0, 0, 1)">);


</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
</span><span style="color: rgba(0, 0, 255, 1)">for</span> (let i = 0; i &lt; 4; i ++<span style="color: rgba(0, 0, 0, 1)">) {
    cluster.fork();
}
} </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> {
console.log(`worker${cluster.worker.id}: ${cluster.worker.process.pid}`);
runServer(</span>3000<span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>
</div>
<p>&nbsp;</p>
<h4 data-tool="mdnice编辑器">类似的 Worker Threads 代码</h4>
<div class="cnblogs_code">
<pre>const { Worker, isMainThread } = require('worker_threads'<span style="color: rgba(0, 0, 0, 1)">);
const { runServer } </span>= require('./server'<span style="color: rgba(0, 0, 0, 1)">);


console.log(</span>'isMainThread'<span style="color: rgba(0, 0, 0, 1)">, isMainThread);


</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> (isMainThread) {
</span><span style="color: rgba(0, 0, 255, 1)">for</span> (let i = 0; i &lt; 3; i ++<span style="color: rgba(0, 0, 0, 1)">) {
    </span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Worker(__filename);
}
} </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> {
runServer(</span>4000<span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>
</div>
<p>&nbsp;</p>
<pre data-tool="mdnice编辑器"><code>&nbsp;</code></pre>
<p data-tool="mdnice编辑器">结果如下。</p>
<p><img src="https://img2020.cnblogs.com/blog/1313648/202103/1313648-20210310090358756-149053738.png" alt="" width="660" height="371" loading="lazy"></p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p data-tool="mdnice编辑器">我们没办法在一个进程中监听多个端口,具体可以查看 Node.: 中 net.js 和 cluster.js 做了什么。</p>
<p data-tool="mdnice编辑器">那么 Worker Threads 优势在哪?</p>
<h3 data-tool="mdnice编辑器">通信</h3>
<p data-tool="mdnice编辑器">Worker Threads 更擅长通信,这是线程的优势,不仅是可以消息通信,还可以共享内存。</p>
<p data-tool="mdnice编辑器">具体可以看:多线程 worker_threads 如何通信<sup></sup></p>
<h3 data-tool="mdnice编辑器">子线程管理</h3>
<p data-tool="mdnice编辑器">子线程通过 Worker 实例管理,而下面介绍实例化中的几个重要参数。</p>
<h4 data-tool="mdnice编辑器">资源限制 resouceLimits</h4>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>maxOldGenerationSizeMb:子线程中栈的最大内存</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>maxYoungGenerationSizeMb:子线程中创建对象的堆的最大内存</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>codeRangeSizeMb:生成代码消耗的内存</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>stackSizeMb:该线程默认堆的大小</li>
</ul>
<h4 data-tool="mdnice编辑器">子线程输出 stdout/stderr/stdin</h4>
<p data-tool="mdnice编辑器">如果这 stdout/stderr/stdin 设置为 true,子线程会有独立的管道输出,而不会把 out/err/in 合并到父进程。</p>
<h4 data-tool="mdnice编辑器">子线程参数 workerData, argv 和 execArgv</h4>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>workerData: 父线程传递给子线程的数据,必须要通过 require('worker_threads').workerData 获取。</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>argv: 父线程传递给子线程的参数,子线程通过 process.argv 获取。</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>execArgv: Node 的执行参数。</li>
</ul>
<h4 data-tool="mdnice编辑器">子线程环境 env 和 SHARE_ENV</h4>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>env: 父线程传递给子线程的环境,通过 process.env 可以获取。</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>SHARE_ENV:指定父线程和子线程可以共享环境变量</li>
</ul>
<h2 data-tool="mdnice编辑器">总结</h2>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>作为 Web 服务,提高并发数,选择 Cluster 更好;</li>
<li>作为脚本,希望提高并发,选择 Worker Threads 更好;</li>
</ul>
<ul class="list-paddingleft-2" data-tool="mdnice编辑器">
<li>当计算不是瓶颈,在某个进程或线程中,灵活异步的使用更好。</li>
</ul>
<h3 data-tool="mdnice编辑器">参考资料</h3>
<p></p>
<p>Node 开发中使用 p-limit 限制并发原理:&nbsp;<em>https://tech.bytedance.net/articles/6908747346445041671</em></p>
<p></p>
<p>nodejs 如何使用 cluster 榨干机器性能:&nbsp;<em>https://tech.bytedance.net/articles/6906846464304447495</em></p>
<p></p>
<p>多线程 worker_threads 如何通信:&nbsp;<em>https://tech.bytedance.net/articles/6907111611668889608</em></p>
<p>以上内容转自https://mp.weixin.qq.com/s/cXwM_ENAjxvvwaBHEsuHbA&nbsp;</p><p>喜欢这篇文章?欢迎打赏~~</p><p><img src="https://img2020.cnblogs.com/blog/1313648/202012/1313648-20201207210415386-746901846.png" alt="" loading="lazy"></p><p>&nbsp;</p><br><br>
来源:https://www.cnblogs.com/cangqinglang/p/14509478.html
頁: [1]
查看完整版本: Node.js 并发能力总结