别再说我不懂Node"流"了
<blockquote><p>Nodejs中包括4种类型的流:<code>Readable</code>、<code>Writable</code>、<code>Duplex</code>和<code>Transform</code>.</p>
</blockquote>
<h3 id="readable-stream">Readable Stream</h3>
<h4 id="自定义readable">自定义Readable</h4>
<p>自定义 <code>Readable</code> 流必须调用 <code>new stream.Readable()</code> 构造函数并实现 <code>readable._read()</code> 方法。</p>
<pre><code class="language-js">import { Readable } from "node:stream";
const readable = new Readable();
readable.on("data", (chunk) => {
console.log(chunk.toString());
});
readable.on('end', () => {
console.log('end');
})
readable.on('error', (err) => {
console.log('error-> ', err);
})
</code></pre>
<p>此时会触发<code>error</code>事件<br>
<code>error->Error : The _read() method is not implemented</code></p>
<p>因此要创建一个正常工作的Readable,需要实现_read方法,有三种方式实现自定义Readable流(<strong>Node的4种流都可以通过下面三种形式实现</strong>)。</p>
<p>方式一、在Readable实例上挂载<code>_read</code>方法</p>
<pre><code class="language-js">const readable = new Readable();
readable._read = function(){
this.push("hello world"); //写入readable的缓冲区
this.push(null)
}
</code></pre>
<p>方式二、Readable初始化给options参数传递<code>read</code>(这个相当于<code>_read</code>方法)</p>
<pre><code class="language-js">const readable = new Readable({
read(){
this.push("hello world");
this.push(null)
}
});
</code></pre>
<p>方式三、继承时实现<code>_read</code></p>
<pre><code class="language-js">class MyReadable extends Readable {
_read(){
this.push("hello world");
this.push(null)
}
}
const readable = new MyReadable();
</code></pre>
<h4 id="解释-_read-被调用的时机">解释 <code>_read</code> 被调用的时机</h4>
<p>在 Node.js 的流(Stream)API 中,<code>_read</code> 方法是 Readable 流的核心内部方法,它的调用时机主要有以下几点:</p>
<ol>
<li><strong>当消费者调用 <code>stream.read()</code> 方法时</strong>:当外部代码通过 <code>read()</code> 方法请求数据时,如果内部缓冲区没有足够的数据,Node.js 会调用 <code>_read</code> 方法来获取更多数据。</li>
<li><strong>当消费者添加 'data' 事件监听器时</strong>:当你为 Readable 流添加 'data' 事件监听器时,流会自动切换到流动模式(flowing mode),此时会自动调用 <code>_read</code> 方法开始获取数据。</li>
<li><strong>当流从暂停模式切换到流动模式时</strong>:例如通过调用 <code>resume()</code> 方法时,会触发 <code>_read</code> 的调用。</li>
<li><strong>初始化流时</strong>:在某些情况下,当流被创建并进入流动模式时,<code>_read</code> 方法会被自动调用一次来填充初始数据。</li>
</ol>
<p><code>_read</code> 方法的工作原理是:</p>
<ul>
<li>它负责从底层资源(如文件、网络等)获取数据</li>
<li>通过调用 <code>this.push(chunk)</code> 将数据放入流的内部缓冲区</li>
<li>当没有更多数据时,调用 <code>this.push(null)</code> 表示流结束</li>
</ul>
<h4 id="readable两种模式和三种状态">Readable两种模式和三种状态</h4>
<p><strong>两种模式</strong></p>
<ul>
<li>流动模式(flowing mode)。流会自动从内部缓冲区中读取并触发 <code>'data'</code> 事件,当缓存中没有数据时则调用<code>_read</code>把数据放入缓冲区。</li>
<li>暂停模式(paused mode)。流不会自动触发 <code>'data'</code> 事件,数据会留在内部缓冲区,通过显示<code>readable.read()</code>获取数据。</li>
</ul>
<p><strong>三种状态</strong></p>
<blockquote>
<p>具体来说,在任何给定的时间点,每个 <code>Readable</code> 都处于三种可能的状态之一:</p>
</blockquote>
<ul>
<li><code>readable.readableFlowing === null</code></li>
<li><code>readable.readableFlowing === false</code></li>
<li><code>readable.readableFlowing === true</code></li>
</ul>
<blockquote>
<p>当 <code>readable.readableFlowing</code> 为 <code>null</code> 时,则不提供消费流数据的机制。因此,流不会生成数据。在此状态下,为 <code>'data'</code> 事件绑定监听器、调用 <code>readable.pipe()</code> 方法、或调用 <code>readable.resume()</code> 方法会将 <code>readable.readableFlowing</code> 切换到 <code>true</code>,从而使 <code>Readable</code> 在生成数据时开始主动触发事件。</p>
</blockquote>
<blockquote>
<p>调用 <code>readable.pause()</code>、<code>readable.unpipe()</code> 或接收背压将导致 <code>readable.readableFlowing</code> 设置为 <code>false</code>,暂时停止事件的流动但不会停止数据的生成。在此状态下,为 <code>'data'</code> 事件绑定监听器不会将 <code>readable.readableFlowing</code> 切换到 <code>true</code>。</p>
</blockquote>
<p>总结:</p>
<table>
<thead>
<tr>
<th>值</th>
<th>状态描述</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>null</code></td>
<td><strong>暂停模式(默认)</strong>,流既没有开始自动流动数据,也没有明确被暂停或恢复。数据会被缓存在内部缓冲区中,直到你明确开始消费。</td>
</tr>
<tr>
<td><code>true</code></td>
<td><strong>流动模式(flowing mode)</strong> 自动消费</td>
</tr>
<tr>
<td><code>false</code></td>
<td><strong>暂停模式(paused mode)</strong> 不会自动消费,需要显式调用<code>read()</code>消费</td>
</tr>
</tbody>
</table>
<p>流动模式示例:</p>
<pre><code class="language-js">const readable = Readable.from(['A', 'B', 'C']);
// 监听了'data'事件,此时readableFlowing === true
readable.on('data', (chunk) => {
console.log('Got chunk:', chunk);
/*
Got chunk: A
Got chunk: B
Got chunk: C
*/
});
readable.on('end', () => {
console.log('end'); //end(流结束,不会再有新数据)
})
</code></pre>
<p>暂停模式示例:</p>
<pre><code class="language-js">const readable = Readable.from(['A', 'B', 'C']);
// ⚠️ 此时 readableFlowing === null
readable.on('readable', () => {
let chunk;
while (null !== (chunk = readable.read())) {
console.log('Got chunk:', chunk);
}
/*
Got chunk: A
Got chunk: B
Got chunk: C
*/
});
readable.on('end', () => {
console.log('end'); //end(流结束,不会再有新数据)
})
</code></pre>
<p>P.S. <code>readable.read()</code>需要在<code>'readable'</code>事件中读取数据,因为在外面调用可能返回 null :如果在 'readable' 事件触发之前或者当内部缓冲区为空时调用 read() ,它会返回 null ,表示当前没有数据可读。</p>
<h4 id="readable的事件和方法">Readable的事件和方法</h4>
<p><strong>事件</strong></p>
<ul>
<li><code>'data'</code>: 接受数据chunk(非对象模式下是Buffer或String), 数据在可用时会立即触发该事件。</li>
<li><code>'end'</code>: 当流中没有更多数据了(比如<code>this.push(null)</code>),可由<code>readable.end()</code>触发。</li>
<li><code>close</code>: 当流及其任何底层资源(例如文件描述符)已关闭时,则会触发 <code>'close'</code> 事件。该事件表明将不再触发更多事件,并且不会发生进一步的计算。默认情况下<code>readable.destroy()</code>会触发<code>close</code>事件。</li>
<li><code>'error'</code>: 低层流由于低层内部故障导致无法生成或者推送无效数据时,触发。</li>
<li><code>'readable'</code>: 当有数据可从流中读取时,将触发 <code>'readable'</code> 事件。</li>
<li><code>'pause'</code>: 当调用 <code>readable.pause()</code> 并且 <code>readableFlowing</code> 不是 <code>false</code> 时,则会触发 <code>'pause'</code> 事件。</li>
</ul>
<p><strong>方法</strong></p>
<table>
<thead>
<tr>
<th>方法</th>
<th>说明</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>read()</code></td>
<td>从可读缓冲区中取出数据</td>
</tr>
<tr>
<td><code>pipe(dest)</code> / <code>unpipe()</code></td>
<td>管道传输</td>
</tr>
<tr>
<td><code>pause()</code> / <code>resume()</code></td>
<td>控制流动模式</td>
</tr>
<tr>
<td><code>unshift(chunk)</code></td>
<td>将数据重新放回可读缓冲</td>
</tr>
<tr>
<td><code>push(chunk)</code></td>
<td>向可读端推送数据(用于自定义实现)</td>
</tr>
<tr>
<td><code>from(iterable[, options])</code></td>
<td>从迭代对象中创建流</td>
</tr>
</tbody>
</table>
<p>下面着重介绍下<code>pipe</code>和<code>from</code><br>
<code> 1.</code>Readable.pipe(destination[, options])<code> 第一个参数destination是一个写入流(或者Duplex/Transform,对应到其写入流部分),这个方法将使</code>Readable<code>自动切换到流动模式并将其所有数据推送到绑定的 </code>Writable`。</p>
<p>示例:</p>
<pre><code class="language-js">const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// readable的数据自动写入writable
readable.pipe(writable);
</code></pre>
<p>2.<code>Readable.from(iterable[, options])</code><br>
第一个参数是实现 <code>Symbol.asyncIterator</code> 或 <code>Symbol.iterator</code> 可迭代协议的对象。如果传递空值,则触发 'error' 事件。默认情况下,<code>Readable.from()</code> 会将 <code>options.objectMode</code> 设置为 <code>true</code>,这意味着每次读取数据都是一个Javascript值。</p>
<pre><code class="language-js">import { Readable } from 'node:stream';
async function * generate() {
yield 'hello';
yield {name: 'streams'};
}
const readable = Readable.from(generate());
readable.on('data', (chunk) => {
console.log(chunk);
/*
hello
{ name: 'streams' }
*/
});
</code></pre>
<h4 id="继承了readable的node-api">继承了Readable的Node API</h4>
<p><code>Readable</code> 流的示例包括:</p>
<ul>
<li>客户端上的 HTTP 响应</li>
<li>服务器上的 HTTP 请求</li>
<li>文件系统读取流</li>
<li>TCP 套接字</li>
<li>子进程标准输出和标准错误</li>
<li>process.stdin</li>
</ul>
<p><strong>文件系统读取流示例:</strong><br>
首先使用readFile一次性读取数据,这个时候如果是大文件,那么会占用非常大的内存。</p>
<pre><code class="language-js">// 方式一
fs.readFile(path.resolve(__dirname, './bigdata.txt'), 'utf8', (err, data) => {
if (err) {
console.error('读取文件时出错:', err);
return;
}
console.log('文件内容:');
console.log(data)
});
</code></pre>
<p>接下来我们创建一个流来读数据,分批次读取数据。</p>
<pre><code class="language-js">const readStream = fs.createReadStream(path.resolve(__dirname, './bigdata.txt'), {
encoding: 'utf8',
highWaterMark: 4, // 每次读取4个字节 (故意设置很小,方便观察)
});
readStream.on('data', (chunk) => {
console.log('读取到的数据:', chunk);
});
readStream.on('end', () => {
console.log('文件读取完成');
});
readStream.on('error', (err) => {
console.error('读取文件时出错:', err);
});
</code></pre>
<p>如下图所示,流的方式读取数据是分批次的。<br>
<img src="https://img2024.cnblogs.com/blog/1782142/202510/1782142-20251023152039951-1512027407.png"></p>
<p>但上述做法并不能真正解决<code>大文件占用大内存</code>,因为面临流的<code>背压</code>问题(大意就是读的快,写的慢导致读入的数据积压在输入缓冲区,后面「缓冲区、高压线和背压问题」一小节会探究这个问题)。<br>
可以用pipe来处理,代码如下:</p>
<pre><code class="language-js">const readStream = fs.createReadStream(path.resolve(__dirname, './bigdata.txt'), {
encoding: 'utf8',
highWaterMark: 4, // 每次读取4个字节
});
// 使用pipe连接可读流和可写流
readStream.pipe(process.stdout);
</code></pre>
<p>依旧是每次读4个字节写入可写流,但pipe会自动处理背压问题。</p>
<h3 id="writable-stream">Writable Stream</h3>
<h4 id="自定义writable">自定义Writable</h4>
<p>和实现Readable类似,自定义实现Writable,需要实现<code>_write</code>方法。</p>
<pre><code class="language-js">import {Writable} from 'node:stream';
class MyWritable extends Writable {
_write(chunk:any, encoding:BufferEncoding, next:()=>void) {
console.log('Got chunk:', chunk.toString());
setTimeout(()=>{
next();
}, 1000)
}
}
const writable = new MyWritable();
writable.write('hello writable'); //写入writable的缓冲区
</code></pre>
<h4 id="解释-_write-被调用的时机">解释 <code>_write</code> 被调用的时机</h4>
<p>Writable通过调用内部方法<code>_write</code> 实际处理写入数据。它接受三个参数:</p>
<ol>
<li><code>chunk</code> 是 <code>any</code>,encoding的编码模式决定了chunk具体是什么(Buffer还是字符串等)</li>
<li><code>encoding</code> 是 <code>BufferEncoding</code>类型,包括<code>"ascii" | "utf8" | "utf-8" | "utf16le" | "utf-16le" | "ucs2" | "ucs-2" | "base64" | "base64url" | "latin1" | "binary" | "hex"</code>。还有可能是<code>'buffer'</code>(下面会介绍到)。</li>
<li><code>next</code> 函数是一个回调函数,它在 <code>_write</code> 方法中扮演着非常重要的角色
<ul>
<li>信号作用 :调用 next() 表示当前数据块已经处理完成,流可以继续处理下一个数据块</li>
<li>流控制 :如果不调用 next() ,流会认为数据还在处理中,不会继续处理缓冲区中的其他数据</li>
<li>错误处理 :如果处理过程中出现错误,可以调用 next(error) 来通知流发生了错误</li>
</ul>
</li>
</ol>
<p><code>writable.write(data)</code> 仅是将数据写入内部缓冲区(此时不一定调用<code>_write</code>方法),当数据从内部缓冲区被消费时才会调用<code>_write</code>方法。<br>
<code>writable.write</code>可以快速写入多个,但是当<code>_write</code>需要next被调用后才能处理缓冲区的下一个数据,所以有部分是会存入内部缓冲区中,只有当上一个数据处理完成才会对下一个数据调用<code>_write</code>方法。<br>
区分:</p>
<ul>
<li><code>write</code>是将数据写入内部缓冲区。</li>
<li><code>_write</code>是将数据从内部缓冲区写入目的地(比如磁盘、网络等)。</li>
</ul>
<p><strong>关于write API和编码问题</strong><br>
<code>write</code>方法的其中的一种重载形式:<code>writable.write(data, encoding,callback)</code>,在默认情况下<code>encoding</code>参数是不会起作用的。</p>
<pre><code class="language-js">class MyWritable extends Writable {
_write(chunk:any, encoding:string, next:()=>void) {
console.log('encoding:', encoding); // encoding: buffer
console.log('Got chunk:', chunk); // Got chunk: <Buffer 68 65 6c 6c 6f 20 77 72 69 74 61 62 6c 65>
setTimeout(()=>{
next();
}, 1000)
}
}
const writable = new MyWritable({
decodeStrings: true, //默认,这个参数会被设置为true
});
writable.write('hello writable', 'utf-8'); //此时encoding参数不生效, data还是是转换成Buffer处理的(默认)。
</code></pre>
<pre><code class="language-js">class MyWritable extends Writable {
_write(chunk:any, encoding:string, next:()=>void) {
console.log('encoding:', encoding); // encoding: utf-8
console.log('Got chunk:', chunk); // Got chunk: hello writable
setTimeout(()=>{
next();
}, 1000)
}
}
const writable = new MyWritable({
decodeStrings: false, //设为false
});
writable.write('hello writable', 'utf-8');
//decodeStrings: false时,data才是按encoding='utf-8'处理。此时在内部_write可以发现第二参数encoding会是'utf-8', 第一个参数chunk则是一个字符串。
</code></pre>
<h4 id="writable的事件和方法">Writable的事件和方法</h4>
<p><strong>事件</strong></p>
<ul>
<li><code>'close'</code>: 当流及其任何底层资源(例如文件描述符)已关闭时,触发。</li>
<li><code>'error'</code>: 如果在写入或管道数据时发生错误,触发。</li>
<li><code>'drain'</code>: 当写入流内部的写入缓冲区被清空(目的地已接收这部分数据,缓冲区长度降为0),典型地,这发生在之前调用<code>writable.write()</code>返回了<strong>false</strong>(表示缓冲达到或超过<strong>highWaterMark</strong>)之后,一旦缓冲被完全“排空”,就会发出<code>'drain'</code>,表示可以安全继续写入。</li>
</ul>
<pre><code class="language-js">// 一次性批量写入大量数据,大小达到highWaterMark,令write方法返回false
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Last time!
writer.write(data, encoding, callback);
} else {
// 当缓冲区满了,ok=false
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// 当drain了(即缓冲区被清空了),可以继续写入
writer.once('drain', write);
}
}
}
</code></pre>
<ul>
<li><code>'finish'</code>: 在调用 <code>stream.end()</code> 方法之后,并且所有数据都已刷新到底层系统,触发。</li>
</ul>
<pre><code class="language-js">const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.on('finish', () => {
console.log('All writes are now complete.');
});
writer.end('This is the end\n');
</code></pre>
<ul>
<li><code>'pipe'</code>: Readable Stream上调用<code>readable.pipe(writable)</code>将数据传输到Writable Stream上时,触发。</li>
</ul>
<pre><code class="language-js">const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
console.log('Something is piping into the writer.');
assert.equal(src, reader);
});
reader.pipe(writer); //当流开始传输时触发writer的'pipe'事件
</code></pre>
<p><strong>方法</strong></p>
<table>
<thead>
<tr>
<th>方法</th>
<th>说明</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>write(chunk[, encoding][, callback])</code></td>
<td>写入数据</td>
</tr>
<tr>
<td><code>end([, encoding][, callback])</code></td>
<td>结束写入</td>
</tr>
<tr>
<td><code>cork()</code> / <code>uncork()</code></td>
<td>批量写入优化</td>
</tr>
<tr>
<td><code>setDefaultEncoding(encoding)</code></td>
<td>设置默认编码</td>
</tr>
<tr>
<td><code>destroy()</code></td>
<td>销毁流</td>
</tr>
</tbody>
</table>
<h4 id="继承了writable的node-api">继承了Writable的Node API</h4>
<ul>
<li>客户端的HTTP请求</li>
<li>服务端的HTTP响应</li>
<li>文件系统的写入流</li>
<li>子进程标准输入</li>
<li>process.stdout</li>
</ul>
<p><strong>服务端HTTP响应示例</strong></p>
<pre><code class="language-js">import http from 'node:http';
import { Readable } from 'node:stream';
// 创建一个自定义的可读流,用于分批生成数据
class BatchDataStream extends Readable {
constructor(options = {}) {
super(options);
this.dataSize = options.dataSize || 5; // 数据批次数量
this.currentBatch = 0;
this.interval = options.interval || 1000; // 每批数据的间隔时间(毫秒)
}
_read() {
// 如果已经发送完所有批次,结束流
if (this.currentBatch >= this.dataSize) {
this.push(null); // 表示流结束
return;
}
// 使用setTimeout模拟异步数据生成
setTimeout(() => {
const batchNumber = this.currentBatch + 1;
const data = `这是第 ${batchNumber} 批数据,时间戳: ${new Date().toISOString()}\n`;
console.log(`正在发送第 ${batchNumber} 批数据`);
// 将数据推送到流中
this.push(data);
this.currentBatch++;
}, this.interval);
}
}
// 创建HTTP服务器
const server = http.createServer((req, res) => {
// 设置响应头
res.setHeader('Content-Type', 'text/plain; charset=utf-8');
// res.setHeader('Transfer-Encoding', 'chunked');//但使用pipe传输数据,会自动设置Transfer-Encoding为chunked,所以这里不需要设置
console.log('收到新的请求,开始流式传输数据...');
// 创建数据流实例
const dataStream = new BatchDataStream({
dataSize: 10, // 总共发送10批数据
interval: 1000 // 每批数据间隔1秒
});
// 使用pipe将数据流直接连接到响应对象
dataStream.pipe(res);
// 当流结束时记录日志
dataStream.on('end', () => {
console.log('数据传输完成');
});
});
// 启动服务器
const PORT = 3000;
server.listen(PORT, () => {
console.log(`服务器运行在 http://localhost:${PORT}`);
console.log('请在浏览器中访问此地址,或使用 curl http://localhost:3000 查看流式数据传输');
});
</code></pre>
<p>运行<code>curl http://localhost:3000</code>可以看到每个1s钟接受一批数据。如下图:<img src="https://img2024.cnblogs.com/blog/1782142/202510/1782142-20251023152039969-587344257.png"></p>
<p>在网页上也可以查看这个请求对应的响应头的<code>Transfer-Encoding</code>被设置为<code>chunked</code>.(使用pipe会自动设置chunked)<br>
<img src="https://img2024.cnblogs.com/blog/1782142/202510/1782142-20251023152039975-905878302.png"></p>
<p><strong>拓展知识: Transfer-Encoding</strong><br>
<code>Chunked</code>传输编码是HTTP中的一种传输编码方式,它允许服务器将响应数据分成一系列小块(chunks)来传输。每个chunk都有一个头部,用于指示其大小,然后是一个回车换行(CRLF)分隔符,接着是chunk的实际数据,最后再加上一个CRLF分隔符。这个过程一直持续到最后一个chunk,它的大小为0,表示响应数据的结束。</p>
<p>以下是一个示例HTTP响应使用chunked传输编码的样本:</p>
<pre><code>HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked
4\r\n
This\r\n
7\r\n
is a \r\n
9\r\n
chunked \r\n
6\r\n
message\r\n
0\r\n
\r\n
</code></pre>
<p>大多数情况下,响应头会带上<code>Content-Length</code>字段(表示响应正文的长度),头<code>Transfer-Encoding: chunked</code>和<code>Content-Length</code>是互斥的,不会同时出现在响应头(如果同时出现Transfer-Encoding优先级是大于Content-Length的)</p>
<p><code>Chunked</code>传输的使用场景:大文件下载、API响应流(逐渐加载数据)、AI生成内容(文本图像)</p>
<h3 id="duplex-双工流">Duplex 双工流</h3>
<h4 id="自定义duplex双工流">自定义Duplex双工流</h4>
<p>自定义<code>Duplex</code>需要同时实现<code>_read</code>和<code>_write</code>方法。因为 <code>Duplex</code> 流包含了 <code>Readable</code> 和 <code>Writable</code>两个流,所以要维护两个独立的内部缓冲区,用于读取和写入,允许每一方独立于另一方操作,同时保持适当和高效的数据流。</p>
<p>自定义一个XxxDuplex,可以互相写入数据。</p>
<pre><code class="language-js">import { Duplex } from 'node:stream';
class XxxDuplex extends Duplex {
constructor(peer = null, options = {}) {
super(options);
this.peer = peer; // 另一端的 Duplex
}
// 当可写端接收到数据时
_write(chunk, encoding, callback) {
const data = chunk.toString();
console.log(`[${this.label}] 写入数据:`, data);
// 把数据发给对端
if (this.peer) {
this.peer.push(data);
}
callback(); // 通知写操作完成
}
// 当可读端被调用时(通常由 .read() 或流消费触发)
_read(size) {
// 不做额外操作,等待对端 push()
}
// 结束时
_final(callback) {
if (this.peer) this.peer.push(null); // 通知对端结束
callback();
}
}
// 创建两个互为对端的 Duplex 流
const duplexA = new XxxDuplex();
const duplexB = new XxxDuplex(duplexA);
duplexA.peer = duplexB;
// 加上标识
duplexA.label = 'A';
duplexB.label = 'B';
// 监听 B 的读取
duplexB.on('data', chunk => {
console.log(`[${duplexB.label}] 收到数据:`, chunk.toString());
});
duplexB.on('end', () => {
console.log(`[${duplexB.label}] 流结束`);
});
// A 向 B 发送数据
duplexA.write('你好,B!');
duplexA.write('这是一条测试消息');
duplexA.end();
/*
写入数据: 你好,B!
写入数据: 这是一条测试消息
收到数据: 你好,B!
收到数据: 这是一条测试消息
流结束
*/
</code></pre>
<h4 id="duplex和readablewritable相互转换">Duplex和readable&writable相互转换</h4>
<p><strong>Duplex和readable&writable互相转换</strong><br>
使用<code>stream.Duplex.fromWeb(pair[, options])</code>将readable和writable转为duplex。</p>
<pre><code class="language-js">import { Duplex } from 'node:stream';
import {
ReadableStream,
WritableStream,
} from 'node:stream/web';
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world'); //设置readable buffer的初始数据
},
});
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk); //writable hello
},
});
const pair = {
readable,
writable,
};
//encoding: 'utf8'表示以utf8编码工作,objectMode:true表示以对象模式工作
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
duplex.write('hello');
duplex.on('data', (chunk) => {
console.log('readable', chunk);//readable world
});
</code></pre>
<p>使用<code>stream.Duplex.toWeb(streamDuplex)</code>将duplex拆分成两个流</p>
<pre><code class="language-js">import { Duplex } from 'node:stream';
const duplex = Duplex({
objectMode: true,
read() {
this.push('world');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('writable', chunk);
callback();
},
});
const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');
const { value } = await readable.getReader().read();
console.log('readable', value);
</code></pre>
<h4 id="属于duplex流的node-api">属于Duplex流的Node API</h4>
<ul>
<li>TCP套接字</li>
<li>zlib 流</li>
<li>加密流</li>
</ul>
<p>TCP套接字示例:</p>
<pre><code class="language-js">import net from 'node:net';
/** 服务端 */
const server = net.createServer(function(clientSocket){
// clientSocket 就是一个 duplex 流
console.log('新的客户端 socket 连接');
clientSocket.on('data', function(data){
console.log(`服务端收到数据: ${data.toString()}`);
clientSocket.write('world!');
});
clientSocket.on('end', function(){
console.log('连接中断');
});
});
server.listen(6666, 'localhost', function(){
const address = server.address();
console.log('服务端启动,地址为:%j', address);
});
/** 客户端 */
// socket 就是一个 duplex 流
const socket = net.createConnection({
host: 'localhost',
port: 6666
}, () => {
console.log('连接到了服务端!');
socket.write('hello');
setTimeout(()=> {
socket.end();
}, 2000);
});
socket.on('data', (data) => {
console.log(`客户端收到数据: ${data.toString()}`);
});
socket.on('end', () => {
console.log('断开连接');
});
</code></pre>
<h3 id="transform-转换流">Transform 转换流</h3>
<h4 id="自定义transform流">自定义Transform流</h4>
<p><code>Transform</code> 流是一种双工流的<strong>特殊子类</strong>(和Duplex 双工流一样同时实现 <code>Readable</code> 和 <code>Writable</code> 接口)。那么<code>Transform</code>流和<code>Duplex</code>流的关联和区别?</p>
<p><strong>关联:</strong><code>stream.Transform</code>继承了<code>stream.Duplex</code>,并实现了自己的<code>_read</code>和<code>_write</code>方法。<br>
<strong>区别:</strong></p>
<table>
<thead>
<tr>
<th>类型</th>
<th>特点(区别)</th>
<th>用途</th>
<th>关键方法</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>Duplex 流</strong></td>
<td>读写互相独立,输入和输出没有直接关系</td>
<td>双向通信</td>
<td>数据处理</td>
</tr>
<tr>
<td><strong>Transform 流</strong></td>
<td>输入和输出相关:写入的数据经过处理后再输出</td>
<td><code>read()</code> / <code>write()</code></td>
<td><code>transform(chunk, encoding, callback)</code></td>
</tr>
</tbody>
</table>
<p>也就是说,Duplex是输入输出流两部分独立(不干扰,同时进行);而Transform同样有输入和输出流两部分,但是Node会自动将输出流缓冲区的内容写入输入流缓冲区。</p>
<pre><code>Writable Buffer
↓ (消费)
transform(chunk)
↓ (push)
Readable Buffer
</code></pre>
<p><strong>实现自定义的Transfrom流则需要实现_transfrom方法</strong>,举个例子:</p>
<pre><code class="language-js">import{Transform} from 'node:stream'
class UpTransform extends Transform {
constructor() {
super();
}
_transform(chunk, enc, next) {
console.log('enc', enc); // enc buffer
this.push(chunk.toString().toUpperCase());
next();
}
}
const t = new UpTransform();
t.write('abc'); // 写入 writable buffer
t.end();
// 从 readable buffer 读取数据
t.on('data', (chunk) => console.log('Read:', chunk.toString()));
</code></pre>
<p>也用<code>Transform</code>初始化传参的方式创建一个自定的Transfrom实例:</p>
<pre><code class="language-js">const t = new Transform({
transform(chunk, enc, next) {
console.log('enc', enc); // enc buffer
this.push(chunk.toString().toUpperCase());
next();
}
});
</code></pre>
<h4 id="_transform调用的时机"><code>_transform</code>调用的时机</h4>
<p>当使用<code>Transform流</code>往输入缓存区写入数据时,会调用<code>_transform</code>方法进行转换。<br>
比如上面UpTransform那个例子,当<code>t.write('abc');</code>时就会触发<code>_transform</code>。</p>
<p>在pipe方法中使用<code>Transform流</code>,会调用<code>_transform</code>方法进行转换。</p>
<h4 id="属于transfrom流的node-api">属于Transfrom流的Node API</h4>
<ul>
<li>zlib 流</li>
<li>加密流</li>
</ul>
<p>zlib流示例:</p>
<pre><code class="language-js">const fs = require('node:fs');
const zlib = require('node:zlib');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip(); //z是一个Transform流
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
</code></pre>
<h3 id="缓冲区高压线和背压问题">缓冲区、高压线和背压问题</h3>
<h4 id="缓冲区高压线">缓冲区、高压线</h4>
<p>首先介绍下缓冲区,<code>Readable/Writable</code>内部维护了一个队列数据叫缓冲区。<br>
高压线(<code>highWaterMark</code>)是<code>Readable/Writable</code>内部的一个阈值(可在初始化时修改)。用来告诉流缓冲区的数据大小不应该超过这个值。</p>
<h4 id="背压问题解释">背压问题解释</h4>
<p>背压问题:当内部缓冲区的大小超过<code>highWaterMark</code>阈值,然后持续扩大,占用原来越多内存,甚至最后出现内存溢出。大白话来说就是缓存积压问题。</p>
<blockquote>
<p>一旦内部读取缓冲区的总大小达到 <code>highWaterMark</code> 指定的阈值,则流将暂时停止从底层资源读取数据,直到可以消费当前缓冲的数据(也就是,流将停止调用内部的用于填充读取缓冲区 <code>readable._read()</code> 方法)。</p>
</blockquote>
<p>这里我们会有一个疑问就是:readable流不是会停止读数据到缓冲区吗,怎么还有背压问题?<br>
以下是GPT的解释,我消化总结下:</p>
<ol>
<li>Readable 流有两种操作模式:flowing 和 paused,flowing模式下无法有效处理背压问题,因为不能暂停流的读(不能停止调用<code>_read</code>)。所以<code>on('data')</code>这种方式是无法处理背压问题的,它会持续不断的把数据积压到缓冲区。</li>
<li>在paused模式下,可以调用<code>pause()</code>方法暂停流的读(pipe就是这个原理),从而可以做到处理背压问题,但是也只能是缓解。像文件流这种是可控制的,能立即停止从文件读取数据;但像Socket流,则不能立即停止数据的接受,但会:暂停从内核socket缓冲区中读取 & 在TCP层通过窗口机制通知发送端"别发那么快"。</li>
</ol>
<p>pipe() 如何处理 Readable 流的背压?</p>
<p><code>readable.pipe(writable)</code>:</p>
<ol>
<li>如果 Writable 流的缓冲区满了(返回 false),pipe() 会自动调用 Readable.pause()</li>
<li>当 Writable 流排空缓冲区并发出 'drain' 事件时,pipe() 会调用 Readable.resume()</li>
<li>这样就在两个流之间建立了一个自动的背压处理机制</li>
</ol>
<p>总结来说,虽然 Readable 流确实会在缓冲区达到 highWaterMark 时尝试暂停底层读取,但这只是背压处理的一部分。完整的背压处理需要整个流管道中的所有组件协同工作,而 pipe() 方法正是为了简化这种协同而设计的。</p>
<blockquote>
<p>当重复调用 <code>writable.write(chunk)</code> 方法时,数据会缓存在 <code>Writable</code> 流中。虽然内部的写入缓冲区的总大小低于 <code>highWaterMark</code> 设置的阈值,但对 <code>writable.write()</code> 的调用将返回 <code>true</code>。一旦内部缓冲区的大小达到或超过 <code>highWaterMark</code>,则将返回 <code>false</code>。</p>
</blockquote>
<p>下面这个例子就是<code>Writable</code>的背压问题解决(「Writable Stream」这节出现过的例子,来自官方文档)</p>
<pre><code class="language-js">// 一次性批量写入大量数据,大小达到highWaterMark,令write方法返回false
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Last time!
writer.write(data, encoding, callback);
} else {
// 当缓冲区满了,ok=false
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// 当drain了(即缓冲区被清空了),可以继续写入
writer.once('drain', write);
}
}
}
</code></pre>
<h4 id="如何解决背压问题">如何解决背压问题</h4>
<p>方式一:手动拉数据来控制</p>
<pre><code class="language-js">readable.on('readable', () => {
let chunk;
while (null !== (chunk = readable.read())) {
processChunk(chunk);
}
});
</code></pre>
<p>方式二:使用 <code>pipe()</code>(自动处理)</p>
<pre><code class="language-js">readable.pipe(writable);
</code></pre>
<p>方式三:使用 await 迭代(自动处理)</p>
<pre><code class="language-js">for await (const chunk of readable) {
await processChunk(chunk); // 每次 await 都自然暂停上游读取
}
</code></pre><br><br>
来源:https://www.cnblogs.com/fkcaikengren/p/19160802
頁:
[1]