node之stream - 流

2018-11-06  本文已影响0人  imakan

流(stream)是Nodejs中出处理流式数据的抽象接口。stream模块提供了一些API,用于构建实现了流接口的对象。
Node.js提供了多种流对象。例如,HTTP服务器的请求和process.stdout都是流的实例。
流可以是可读的,可写的,或者是可读可写的。所有的流都是EventEmitter的实例
stream模块可以通过以下方式使用

const stream = require('stream')

尽管理解流的工作方式很重要,但是stream模块本身主要用于开发者创建新类型的流实例,对于以消费对象为主的开发者,极少需要直接使用stream模块

流的类型

对象模式

node创建的流都是运作在字符串和Buffer(或者uint8Array)上,当然,流的实现也可以使用其他的类型的JavaScript值(除了null)。这些流会以“对象模式”进行操作。当创建流时,可以使用objectMode 选项把流实例切换到对象模式。将已存在的流切换到对象模式是不安全的

用于消费流的API

几乎所有的Nodejs应用都在某种程度上使用了流。
比如:

const http = require('http');

const server = http.createServer((req, res) => {
  // req 是一个 http.IncomingMessage 实例,它是可读流。
  // res 是一个 http.ServerResponse 实例,它是可写流。

  let body = '';
  // 接收数据为 utf8 字符串,
  // 如果没有设置字符编码,则会接收到 Buffer 对象。
  req.setEncoding('utf8');

  // 如果添加了监听器,则可读流会触发 'data' 事件。
  req.on('data', (chunk) => {
    body += chunk;
  });

  // 'end' 事件表明整个请求体已被接收。 
  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // 响应信息给用户。
      res.write(typeof data);
      res.end();
    } catch (er) {
      // json 解析失败。
      res.statusCode = 400;
      return res.end(`错误: ${er.message}`);
    }
  });
});

server.listen(1337);

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// 错误: Unexpected token o in JSON at position 1

可写流(比如例子中的 res)会暴露了一些方法,比如 write()end() 用于写入数据到流。
当数据可以从流读取时,可读流会使用 EventEmitter API 来通知应用程序。 从流读取数据的方式有很多种。
可写流可读流都通过多种方式使用 EventEmitter API 来通讯流的当前状态。
Duplex 流和 Transform 流都是可写又可读的。
对于只需写入数据到流或从流消费数据的应用程序,并不需要直接实现流的接口,通常也不需要调用 require('stream')。

可写流

可写流是对数据要被写入的目的地的一种抽象。

可写流的例子包括:

上面的一些例子事实上是实现了可写流接口的 Duplex 流。

所有可写流都实现了 stream.Writable 类定义的接口。

可读流

可读流的例子包括:

所有可读流都实现了 stream.Readable 类定义的接口。

可写流的例子

下面的例子是一个简单的可写流的实现

const {Writable} = require('stream')
class MyWritable extends Writable {
    constructor (options) {
      super(options);
      // todo
    }
  _write(chunk,encoding,callback){
    if(chunk.toString().index('a') >= 0){
         callback(new Error('无效的数据块'))
    }else {
      callback();
    }
  }
}

在可写流中解码buffer

const { Writable } = require('stream');
const { StringDecoder } = require('string_decoder');

class StringWritable extends Writable {
  constructor(options) {
    super(options);
    this._decoder = new StringDecoder(options && options.defaultEncoding);
    this.data = '';
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk);
    }
    this.data += chunk;
    callback();
  }
  _final(callback) {
    this.data += this._decoder.end();
    callback();
  }
}

const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();

w.write('货币: ');
w.write(euro[0]);
w.end(euro[1]);

console.log(w.data); // 货币: €

可读流的例子

class Counter extends stream.Readable {
  constructor(opt){
    super(opt)
    this._max = 1000000
    this._index = 1
  }
  _read(){
    const i = this._index++
    if(i > this._max){
      this.push(null)
    }else{
      const str = String(i)
      const buf = Buffer.from(str,'ascii')
      this.push(buf)
    }
  }
}

双工流的例子

下面是一个双工流的例子,封装了一个可读可写的底层资源对象。

const { Duplex } = require('stream');
const kSource = Symbol('source');

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options);
    this[kSource] = source;
  }

  _write(chunk, encoding, callback) {
    // 底层资源只处理字符串。
    if (Buffer.isBuffer(chunk))
      chunk = chunk.toString();
    this[kSource].writeSomeData(chunk);
    callback();
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding));
    });
  }
}

在下面的例子中,创建了一个变换流(双工流的一种),对象模式的可写端接收 JavaScript 数值,并在可读端转换为十六进制字符串。

const { Transform } = require('stream');

// 转换流也是双工流。
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // 强制把 chunk 转换成数值。
    chunk |= 0;

    // 将 chunk 转换成十六进制。
    const data = chunk.toString(16);

    // 推送数据到可读队列。
    callback(null, '0'.repeat(data.length % 2) + data);
  }
});

myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));

myTransform.write(1);
// 打印: 01
myTransform.write(10);
// 打印: 0a
myTransform.write(100);
// 打印: 64

上一篇下一篇

猜你喜欢

热点阅读