node之stream - 流
流(stream)是Nodejs中出处理流式数据的抽象接口。stream模块提供了一些API,用于构建实现了流接口的对象。
Node.js提供了多种流对象。例如,HTTP服务器的请求和process.stdout都是流的实例。
流可以是可读的,可写的,或者是可读可写的。所有的流都是EventEmitter
的实例
stream
模块可以通过以下方式使用
const stream = require('stream')
尽管理解流的工作方式很重要,但是stream模块本身主要用于开发者创建新类型的流实例,对于以消费对象为主的开发者,极少需要直接使用stream模块
流的类型
- Writable 可写流
- Readable 可读流
- Duplex 双公流,可读又可写流
- Transform 在读写过程中可以修改或者转换数据的Duplex
对象模式
node创建的流都是运作在字符串和Buffer(或者uint8Array)上,当然,流的实现也可以使用其他的类型的JavaScript值(除了null)。这些流会以“对象模式”进行操作。当创建流时,可以使用objectMode 选项把流实例切换到对象模式。将已存在的流切换到对象模式是不安全的
用于消费流的API
几乎所有的Nodejs应用都在某种程度上使用了流。
比如:
const http = require('http');
const server = http.createServer((req, res) => {
// req 是一个 http.IncomingMessage 实例,它是可读流。
// res 是一个 http.ServerResponse 实例,它是可写流。
let body = '';
// 接收数据为 utf8 字符串,
// 如果没有设置字符编码,则会接收到 Buffer 对象。
req.setEncoding('utf8');
// 如果添加了监听器,则可读流会触发 'data' 事件。
req.on('data', (chunk) => {
body += chunk;
});
// 'end' 事件表明整个请求体已被接收。
req.on('end', () => {
try {
const data = JSON.parse(body);
// 响应信息给用户。
res.write(typeof data);
res.end();
} catch (er) {
// json 解析失败。
res.statusCode = 400;
return res.end(`错误: ${er.message}`);
}
});
});
server.listen(1337);
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// 错误: Unexpected token o in JSON at position 1
可写流(比如例子中的 res
)会暴露了一些方法,比如 write()
和 end()
用于写入数据到流。
当数据可以从流读取时,可读流会使用 EventEmitter
API 来通知应用程序。 从流读取数据的方式有很多种。
可写流和可读流都通过多种方式使用 EventEmitter
API 来通讯流的当前状态。
Duplex
流和 Transform
流都是可写又可读的。
对于只需写入数据到流或从流消费数据的应用程序,并不需要直接实现流的接口,通常也不需要调用 require('stream')。
可写流
可写流是对数据要被写入的目的地的一种抽象。
可写流的例子包括:
- 客户端的 HTTP 请求
- 服务器的 HTTP 响应
- fs 的写入流
- zlib 流
- crypto 流
- TCP socket
- 子进程 stdin
-
process.stdout
、process.stderr
上面的一些例子事实上是实现了可写流接口的 Duplex
流。
所有可写流都实现了 stream.Writable
类定义的接口。
可读流
可读流的例子包括:
所有可读流都实现了 stream.Readable
类定义的接口。
可写流的例子
下面的例子是一个简单的可写流的实现
const {Writable} = require('stream')
class MyWritable extends Writable {
constructor (options) {
super(options);
// todo
}
_write(chunk,encoding,callback){
if(chunk.toString().index('a') >= 0){
callback(new Error('无效的数据块'))
}else {
callback();
}
}
}
在可写流中解码buffer
const { Writable } = require('stream');
const { StringDecoder } = require('string_decoder');
class StringWritable extends Writable {
constructor(options) {
super(options);
this._decoder = new StringDecoder(options && options.defaultEncoding);
this.data = '';
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk);
}
this.data += chunk;
callback();
}
_final(callback) {
this.data += this._decoder.end();
callback();
}
}
const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();
w.write('货币: ');
w.write(euro[0]);
w.end(euro[1]);
console.log(w.data); // 货币: €
可读流的例子
class Counter extends stream.Readable {
constructor(opt){
super(opt)
this._max = 1000000
this._index = 1
}
_read(){
const i = this._index++
if(i > this._max){
this.push(null)
}else{
const str = String(i)
const buf = Buffer.from(str,'ascii')
this.push(buf)
}
}
}
双工流的例子
下面是一个双工流的例子,封装了一个可读可写的底层资源对象。
const { Duplex } = require('stream');
const kSource = Symbol('source');
class MyDuplex extends Duplex {
constructor(source, options) {
super(options);
this[kSource] = source;
}
_write(chunk, encoding, callback) {
// 底层资源只处理字符串。
if (Buffer.isBuffer(chunk))
chunk = chunk.toString();
this[kSource].writeSomeData(chunk);
callback();
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding));
});
}
}
在下面的例子中,创建了一个变换流(双工流的一种),对象模式的可写端接收 JavaScript 数值,并在可读端转换为十六进制字符串。
const { Transform } = require('stream');
// 转换流也是双工流。
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// 强制把 chunk 转换成数值。
chunk |= 0;
// 将 chunk 转换成十六进制。
const data = chunk.toString(16);
// 推送数据到可读队列。
callback(null, '0'.repeat(data.length % 2) + data);
}
});
myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));
myTransform.write(1);
// 打印: 01
myTransform.write(10);
// 打印: 0a
myTransform.write(100);
// 打印: 64