Nodejs Stream 初识

2019-10-03  本文已影响0人  黑曼巴yk

linux文件

  1. 为了区别不同文件类型,会有一个type来进行区别
    • 普通文件:包含任意数据
    • 目录: 相关一组文件的索引
    • 套接字Socket:和另一台机器上的进程通信的类型
  2. 重定向
    每个进程都有自己的描述符表(fd)。

标准IO会使用流(stream)行松打开文件,所谓的流(stream)实际上是文件描述度(file descriptor)和缓冲区(buffer)在内存中的抽象

Nodejs中的Stream

Stream模块在nodejs中只是处理流数据的抽象接口,Steam模块提供了基础的API,使用者可以根据这些API构建流接口的对象

Stream流类型
Stream 事件
image.png

可读流 Readable Stream

创建一个可读流
const fs = require('fs');
const rs = fs.createReadStream('./index.js', {
  highWaterMark: 1024 // 缓存大小,每次on Data的大小,默认是16kb
});
let onDataCount = 0;
rs.on('data', chunk => {
  console.log(chunk.toString('utf-8'));
  console.log(onDataCount++);
})
rs.on('end', () => {
  console.log('end');
})
可读流中的flowing/paused模式

数据传输的时候需要考虑到数据的消费者的消费速度才能保证高利用率,而发送数据端就是生产者,接收数据端处理数据是消费者
消费者示例: Http请求处理,文件写入处理,数据库处理等

const fs = require('fs');
const rs = fs.createReadStream('./index.js' ,{
  hightWaterMark: 1024
});
let onCount = 0;
rs.on('data', data => {
  console.log(data.toString('utf-8'));
  console.log(onCount ++);
  rs.pause();
  console.log('on data => ', rs.isPaused())
  setTime(() => rs.resume(), 3000)
})
rs.on('end', () => {
  console.log('end');
})
Stream 背压问题

一个Stream数据生产的速度远大于Stream消费的速度,就会造成数据的堆积。比如不停增长的日志文件(100条/1s)作为生产者,有个服务处理日志文件(100条/3s)作为消费者,如果没有pause/flowing模式就会撑爆内存造成浪费,

readable事件精确控制可读流

readable区别于data事件,readable回调使用rs.read(size)方法好像从桶中再用瓢一点点取水一样,可以更加精确的控制流数据的读取。但此事件回调无法使用paused/flowing模式

rs.on('readable', () => {
  let dataChunk = rs.read(10);
  if(dataChunk) {
    console.log(dataChunk.toString())
  }
})

可写流 Writeable Stream

创建一个可写流
const fs = require('fs')
const ws = fs.createWriteStream('./out.txt', {
    highWaterMark: 10
})

for(let i = 0; i < 1000; i++) {
    const drained = ws.write('hahaha\n')
    console.log(drained);
}

drained为true,缓存池highWaterMark还没有满,可以继续写入。

实现和使用Stream的各种接口

可写流 Writeable
  1. writeble使用
const { Writable } = require('stream');
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});
process.stdin.pipe(outStream);

write方法有三个参数

  1. 自定义writeble实现
const { Writable } = require('stream');
const fs = require('fs');
class MyWriteable extends Writable {
    constructor(options) {
        super(options)
    }
    _write(chunk, encoding, callback) {
        // console.log(chunk.toString())
        fs.writeFileSync('./out.txt', chunk.toString() + '\n', {flag: 'a+'});
        setTimeout(() => callback(null), 100);
    }
}
const ws = new MyWriteable();
for(let i =0; i< 100; i++) {
    ws.write('hahaha');
}
可读流Readable
  1. readable使用
const { Readable } = require('stream');

const inStream = new Readable({
    read(size) {
        this.push(String.fromCharCode(this.currentCharCode++));
        if(this.currentCharCode > 90) {
            this.push(null);
        }
    }
})
inStream.currentCharCode = 0;
inStream.pipe(process.stdout);
  1. 自定义Readable实现
'use strict';

const { Readable } = require('stream');

class MyReadable extends Readable {
    constructor(options) {
      super(options)
      this.count = 65
    }
  
    _read() {
        this.count++
        this.push(String.fromCharCode(this.count++));
        if (this.count > 80) {
          return this.push(null) // push null 就结束了
        }
    }
}
  
let rs = new MyReadable(); 
rs.on('data', function(data) {  // 当 rs 注册 onData 时就开始调用 _reade()
    console.log(data.toString());
});
pipe原理和实现

pipe可以有不同的目的地,pipe就是解决了背压的问题,实现方式就是在可读流上注册一个onData事件,达到阈值后进行pause

  1. 使用pipe
const fs = require('fs');

const rs = fs.createReadStream('./package.json');
const ws = fs.createWriteStream('./out.txt');
rs.pipe(ws);
  1. 实现pipe
    Node源码中就是在可写流上注册onDataonDrain可以根据可写流的阈值和释放进行pause/flowing切换
pipe(ws) {
  this.on('data', chunk => {
    let drained = ws.write(chunk);
    if(!drained) {
      this.pause();
    }
  });
  ws.on('drain', () => {
    this.resume();
  })
}
上一篇 下一篇

猜你喜欢

热点阅读