Obeta

读例子入门NodeJS中的Stream和Buffer

NodeJS中将文件、网络数据都当作stream和buffer来进行处理,因此熟悉streams是进阶NodeJS必不可少的一个环节,此文就是通过几个列子来认识stream和buffer

在进入例子之前我们需要了解一些基本的概念:什么是 stream(流)?

平常工作中我们也会经常接触到 stream,比如 linux 中很多命令都支持 stream,而且通过 steram 我们可以组装各种命令轻松实现原本很繁琐的事情,比如:

# 找到所有包含"vi"字符串的曾经执行过的命令
history | grep vi

history的输出数据通过|(管道)流到了下一个命令,成了grep命令的输入。

在说到 NodeJS 中的 stream 之前需要提一下 buffer,它是一块临时内存,stream 使用 buffer 来保存数据直到 buffer 被消费掉,因此我们使用 stream 就是在使用其中存储的 buffer。

可以使用一种比喻来理解 stream 与 buffer 的关系:想象一个有水龙头的水箱,此时这个水箱就是一个 stream,其中的水是 buffer,当你打开水龙头的时候就是在消费其中的 buffer。

由于 stream 所具有这种“数据不需要一次性读取到内存“的特性,因此使用 stream 处理一些大型文件和网络文件的时候非常便利。

在 NodeJS 中 streams 有四种类型:

  1. Readable streams:可读 stream
  2. Writable streams:可写 stream
  3. Duplex streams:全双工 stream(可读写)
  4. Transform streams:可修改 stream

NodeJS 中有非常多的内置库支持 stream:

img
img

简单介绍一下Duplex StreamTransform Stream,因为后面不会涉及太多它两。

  • Duplex Stream有 TCP Socket 这类,可同时读和写。
  • Transform Stream常用的有 zlib 压缩,从这个角度看Transform Stream又是Duplex Stream

Readable Streams

一个 stream 用来读取器中的数据的时候就叫做 Readable Stream,比如读取本地文件、网络视频、服务器数据。

下面我们会使用 fs 模块来打开一个非常大的本地文件,此时会创建一个 Readable Stream,通过这个 Readable Stream 来读取这个文件的数据:

// ReadableStream.js
const { createReadStream } = require('fs');

const readStream = createReadStream('./large_file.txt');
console.log('Buffer size(bytes): ', readStream.readableHighWaterMark);

readStream.on('data', chunk => {
	console.log('---------------------------------');
	console.log(chunk);
	console.log('---------------------------------');
});

readStream.on('open', () => {
	console.log('Stream opened...');
});

readStream.on('end', () => {
	console.log('Stream Closed...');
});

现在我们来分析一下上面这个例子:

  1. 如果你看过 stream 源码,那么你应该知道 stream 是基于 EventEmitters 的,因此可以监听创建的 Readable Stream 读取其中的数据
  2. 上面中的chunk是一块 buffer 能存储的最大数据块。
  3. createReadStream()接受一个字符串、buffer、内置的 URL 对象,如果参数为字符串,那么必须使用file://协议。

执行后你可以看到类似以下的输出:

Stream opened...
Buffer size(Kb):  65536
---------------------------------
<Buffer 74 68 69 73 20 69 73 20 61 20 76 65 72 79 20 6c 61 72 67 65 20 66 69 6c 65 0a 74 68 69 73 20 69 73 20 61 20 76 65 72 79 20 6c 61 72 67 65 20 66 69 6c ... 65486 more bytes>
---------------------------------
---------------------------------
<Buffer 6c 61 72 67 65 20 66 69 6c 65 0a 74 68 69 73 20 69 73 20 61 20 76 65 72 79 20 6c 61 72 67 65 20 66 69 6c 65 0a 74 68 69 73 20 69 73 20 61 20 76 65 72 ... 65486 more bytes>
---------------------------------
---------------------------------
<Buffer 20 69 73 20 61 20 76 65 72 79 20 6c 61 72 67 65 20 66 69 6c 65 0a 74 68 69 73 20 69 73 20 61 20 76 65 72 79 20 6c 61 72 67 65 20 66 69 6c 65 0a 74 68 ... 64034 more bytes>
---------------------------------
Stream Closed...

如果你的文件没有超过 buffer 能存储的最大值(这个例子是 65536bytes),那么你只能看到一个输出(上面输出了三次)。

当然还有另一种方式读取 stream 内的数据(在下面的暂停、流动小章节会介绍):

// ReadableStreamNonFlowing.js
const { createReadStream } = require('fs');

const readStream = createReadStream('./large_file.txt');

setTimeout(() => {
	console.log(readStream.readableLength);
	console.log(readStream.read(10)); // 消费buffer中10bytes的数据
	console.log(readStream.readableLength);
	console.log(readStream.read()); // 读取当前buffer剩下的所有的数据
	console.log(readStream.readableLength);
}, 10);

与上面ReadablesStream.js中的使用data事件读取不一样,这个例子中是调用read()方法,并明确的知道需要读取多少 bytes 数据,若不传读取大小数据,默认会读取当前 buffer 中剩下的所有数据。打印结果如下:

65536
<Buffer 74 68 69 73 20 69 73 20 61 20>
65526
<Buffer 76 65 72 79 20 6c 61 72 67 65>
0

你可能会疑惑为什么需要使用setTimeout定时器,这是因为创建了 stream 后需要一点时间将文件数据填入到 buffer 中,这种方式可能非常的 trick,你可以使用readable事件:

readStream.on('readable', () => {
	// 当stream中当前buffer可读的时候会触发这个readable事件
	let chunk;
	while (null !== (chunk = readStream.read())) {
		console.log(`Received ${chunk.length} bytes of data`);
	}
});

打印如下:

Received 65536 bytes of data
Received 65536 bytes of data
Received 64084 bytes of data

Readable Stream 的两种模式

Readable Stream 有两种模式:pauseflowing,也就是暂停和流动。可读流默认为pause模式,两种模式之间可以自由切换。

// 扩展上面例子 ReadableStream.js
const { createReadStream } = require('fs');

const readStream = createReadStream('./large_file.txt');
console.log('Buffer size(bytes): ', readStream.readableHighWaterMark);

readStream.on('data', chunk => {
	console.log('---------------------------------');
	console.log(chunk);
	console.log('---------------------------------');
});

readStream.on('open', () => {
	console.log('Stream opened...');
});

readStream.on('end', () => {
	console.log('Stream Closed...');
});

// 以下代码为新增
readStream.pause();

setTimeout(() => {
	readStream.resume();
}, 5000);

由于默认情况下模式为pause,因此这时候需要使用read()方法读取 stream,但是由于我们使用了data事件,因此被默认转化为了flowing模式。

而代码的后面新增的 4 行代码使用了pause()方法,所以手动的将模式改为了pause,因此data事件并不会被触发,观感上觉得被"暂停"了,实际上只是切换了模式而已,最后在 5 秒后我们恢复成flowing模式,data事件被重新触发了。

readable stream pause and flowing
readable stream pause and flowing

Writeable Stream

Writeable Stream 是一种可写入流,废话不说,看例子:

const { createWriteStream } = require('fs');

const writeStream = createWriteStream('write_file.txt');

writeStream.write('hello\n');
writeStream.write('hello world\n');

上面创建了一个write_file.txt文件的写入流,然后写入了两行字符串,下面是文件内容:

hello
hello world

write()方法返回一个布尔值,表示当前是否可以继续写入,当写入的数据很多的时候我们需要使用事件drain,下面是基于 NodeJS 文档中的一段代码:

// Write the data to the supplied writable stream one million times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
	let i = 1000000;
	write();
	function write() {
		let ok = true;
		do {
			i--;
			if (i === 0) {
				// Last time!
				writer.write(data, encoding, callback);
			} else {
				// See if we should continue, or wait.
				// Don't pass the callback, because we're not done yet.
				ok = writer.write(data, encoding);
			}
		} while (i > 0 && ok);
		if (i > 0) {
			// Had to stop early!
			// Write some more once it drains.
			writer.once('drain', () => {
				console.log('next buffer was ready');
				write();
			});
		}
	}
}

writeOneMillionTimes(
	createWriteStream('./test.txt'),
	'you are the best\n',
	'utf-8',
	() => {
		console.log('final writed done');
	}
);

上面例子中函数writeOneMillionTimes职责是将一百万行数据写入到写入流中,其中最主要的是在 19 行到 26 行的这段判断,当i大于 0,说明数据还未写完,但是当前的 buffer 已经写满,因此需要使用drain事件等待下一个 buffer ready 后继续写入。

Writeable Stream 还有另一个最常用的场景,使用 NodeJS 搭建一个 http 服务器:

const http = require('http');
const { createReadStream } = require('fs');

http
	.createServer(function(req, res) {
		const fileStream = createReadStream('./write_file.txt');

		res.writeHead(200, { 'Content-Type': 'text/plain' });
		fileStream.on('data', chunk => {
			res.write(chunk);
		});
		fileStream.on('end', () => {
			res.end();
		});
	})
	.listen(8080);

上面例子也是很简单,使用http模块创建了一个服务器,当一个请求过来的时候创建一个可读流,然后写入到 response 中,我们还可以使用pipe方法简化一下上面的代码:

http
	.createServer(function(req, res) {
		const fileStream = createReadStream('./write_file.txt');

		res.writeHead(200, { 'Content-Type': 'text/plain' });
		fileStream.pipe(res);
	})
	.listen(8080);

与文章开头 linux 的例子类似,可读流fileStream经过管道pipe流入写入流中res

也可以使用unpipe终止流之间的连接,注意unpipe调用时机,如果未等到可读流结束就调用unpipe,那么写入流res中的数据会不完整。

这里需要多介绍一下pipe方法,使用它可以使多个 stream 连接起来,首先源是Readable Stream,目标源是Writeable Stream,而Duplex StreamTransform Stream在任何位置都可,看代码:

// 方式1
readableStreamSrc
	.pipe(transformStream1)
	.pipe(transformStream2)
	.pipe(writeableStreamDest);

// 方式2
readableStreamSrc.pipe(transformStream1);
transformStream1.pipe(transformStream2);
transformStream2.pipe(writeableStreamDest);

readableStreamSrc 与 writeableStreamDest 都可以使用 Duplex Stream 代替

以上两种方式都是等价的,当然推荐使用第一种方式,更优雅一些。通常使用pipe之后我们就不需要事件监听的方式了,不过如果你需要更灵活的方式,那么事件也是个好选择。

至此 steram 大部分的东西都介绍完了,使用各种事件和函数可以实现复杂的应用和工具了。用一张图总结一下:

readable-stream and write-stream events-functions
readable-stream and write-stream events-functions

等会,corkuncork还未介绍,再来看一个例子吧:

writeableStream.cork();

writeableStream.write('one line\n');
wirteableStream.write('two line\n');

setTimeout(() => {
	writeableStream.uncork();
	// or
	// writeableStream.end();
}, 3000);

之前我们使用write()方法都是立即写入的,但是在这之前我们调用了cork,这个单词翻译过来就是木塞、软木,这不就是非常像红酒的塞子嘛,堵住口子不让酒流出去,因此就很容易理解了,这是使写入的数据暂时存在 buffer 中。使用的场景一般是前面写入的数据非常慢,比如网络不好的情况下,这时候我们想得到一个完整的 buffer 后再处理给下一个 pipe,而不是挤牙膏一样一点点的给。

有意思的是官方文档专门提示到uncork()方法调用的次数需要和cork()次数相对应,否则uncork无效,这需要特别注意,否则很容易写出 bug:

stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
	stream.uncork();
	// The data will not be flushed until uncork() is called a second time.
	stream.uncork();
});

实现自己的 Writeable Stream

为了实现一个可写入流,我们需要使用到stream库中的Writable类:

const { Writable } = require('stream');
const { createReadStream } = require('fs');

const readStream = createReadStream('./write_file.txt');

class MyWriteableStream extends Writable {
	write(chunk, cb) {
		console.log('-----------');
		console.log(chunk.toString());
		cb && cb();
		return true;
	}
}

const myWriteableStream = new MyWriteableStream();
readStream.pipe(myWriteableStream);

myWriteableStream.write('first line', () => {
	console.log('last line');
});

MyWriteableStream类继承了Writable类,并实现了wirte()方法,值得注意的是此方法有两种调用方式:

class Writable {
	write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean;
	write(
		chunk: any,
		encoding: string,
		cb?: (error: Error | null | undefined) => void
	): boolean;
}
  • chunk:通常是 buffer,除非直接调用write()方法传递了其它值
  • encoding:编码
  • callback:处理完成 chunk 后需要调用的函数,如果发生错误,可以传递一个 error 对象

encoding在当前例子中可以忽略。上面代码输出如下:

-----------
first line
last line
-----------
hello
hello world

可以观察到pipe()方法没有传递第二个参数,下面再看一个例子,使用构造函数:

const outStream = new Writable({
	write(chunk, encoding, callback) {
		console.log(chunk.toString());
		callback();
	},
});

process.stdin.pipe(outStream);

process.stdin是一个 Readable Stream

执行这段代码后你在终端的输入都会等效的输出,与process.stdout类似,因此上面代码其实等于process.stdin.pipe(process.stdout)

实现自己的 Readable Stream

我们需要使用到Readable类:

const { Readable } = require('stream');

class MyReadableStream extends Readable {
	read(size) {
		return super.read(size);
	}
}

const myReadableStream = new MyReadableStream();
myReadableStream.push('first line\n');
myReadableStream.push('second line\n');
myReadableStream.push(null);

myReadableStream.pipe(process.stdout);

可以看到此 steam 没有实现,默认父类的read()方法足够使用,read()方法会被process.stdout读取,值得注意的是我们通过push()方法推入数据到可读流中(push()接收类型为 string、buffer、unit8Array 的参数),在最后的时候推入了null,告诉process.stdout已经没有更多的数据了,否则你会看到程序一直在等待。

以下是输出:

first line
second line

这个例子有点过于简单,下面再来一个:

class MyReadableStream extends Readable {
	num = 0;

	_read(size) {
		this.push(this.num.toString());
		if (this.num++ > 8) {
			this.push(null);
		}
	}
}

const myReadableStream = new MyReadableStream();
myReadableStream.pipe(process.stdout);

根据 NodeJS 官方文档,Readable 的子类必须要实现_read()方法,且只能内部调用,此内部方法职责是获取源数据push到读取队列中,直到手动push(null)或者push()方法返回了false

运行代码返回:

0123456789

上面代码可能不够优雅,可以使用构造函数重构:

const myReadableStream = new Readable({
	read(size) {
		this.push(this.num.toString());
		if (this.num++ > 8) {
			this.push(null);
		}
	},
});

myReadableStream.sum = 0;
myReadableStream.pipe(process.stdout);

限于篇幅,DuplexTransform类型下一篇再继续。

引用

  1. Node.js Streams: Everything you need to know
  2. Streams, Piping, and Their Error Handling in Node.js

个人随笔记录,内容不保证完全正确,若需要转载,请注明作者和出处.