node.js中的流

2018-02-26  本文已影响0人  柳贤_e8c5

node.js中的流是一种数据传输手段,流是有顺序的。流不关心整体流程,只管取出数据,获取数据后的操作

流有四种基本的类型

Readable - 可读的流 (例如 fs.createReadStream()). 

Writable - 可写的流 (例如 fs.createWriteStream()).

Duplex - 可读写的流 (例如 net.Socket). 

Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).

可读流有两种模式

flowing 流动模式

paused 暂停模式

流动模式:flowing 没有缓存区。读一点数据,发射一点数据。当数据全部读完了触发一个end事件。例如:pipe(),resume()方法不走缓存.

data事件,当你一旦开始监听data事件的时候,流就可以读文件的内容并且发射data 。默认请况下,当你监听data事件之后,会不停的读数据,然后触发data事件,触发完data事件后再次读数据。

let rs=fs.createReadStream('./11.txt',{

    highWaterMark:3

});

rs.setEncoding('utf8');rs.on('data',function(data){

    //data获取到的是个buffer,要想获取字符需要设置编码

    console.log(data);

});

rs.on('end',function(){

    console.log('文件读完了');

});

pipe是可读流 的方法

ReadStream.prototype.pipe = function (dest) {

    this.on('data', (data)=>{

        let flag = dest.write(data);//写入数据,返回true,说明缓存区没满还可以继续写。返回

        false暂停一下。监听drain事件,等到触发drain事件说明数据消化完了,再继续读取数据

        if(!flag){

            this.pause();

        }

    });

    dest.on('drain', ()=>{

        this.resume();

    });

    this.on('end', ()=>{

        dest.end();

    });

}

ReadStream.prototype.pause = function(){

    this.flowing = false;

}

ReadStream.prototype.resume = function(){

    this.flowing = true;

    this.read();

}

dest 数据写入目标

可以在单个可读流上绑定多个可写流。

const r = fs.createReadStream('file.txt');

const z = zlib.createGzip();

const w = fs.createWriteStream('file.txt.gz');

r.pipe(z).pipe(w);

暂停模式:paused (初始化模式) 内部设置一个缓存区,缓存区默认大小64kb.实际大小以highWaterMark的值为准。当你监听 readable事件的时候,会进入暂停模式。读取highWaterMark的值放入缓存区,触发readable事件。

let fs = require('fs');

let rs = fs.createReadStream('./1.txt',{

    highWaterMark:3

});

rs.on('readable',()=>{

  onsole.log(rs._readableState.length);//3

  let ch = rs.read(1);

  //当你读了一个字节后,发现只剩下2个字节,不够highWaterMark,会再次读取highWaterMark个字节并填到

缓存区内

      console.log(rs._readableState.length);//2

      let ch = rs.read(3);

      setTimeout(()=>{

          console.log(rs._readableState.length);//5

      },200)

});

自定义流

let {Writable,Readable,Duplex,Transform} = require('stream');

自定义可读流

为了实现可读流,引用Readable接口并用它构造新对象。

    我们可以直接把供使用的数据push出去。

    当push一个null对象就意味着我们想发出信号——这个流没有更多数据了。

var stream = require('stream');

var util = require('util');

util.inherits(Counter, stream.Readable);

function Counter(options) {

    stream.Readable.call(this, options);

    this._index = 0;

}

Counter.prototype._read = function() {

    if(this._index++<3){

        this.push(this._index+'');

    }else{

        this.push(null);

    }

};

var counter = new Counter();

counter.on('data', function(data){

    console.log("读到数据: " + data.toString());//no maybe

});

counter.on('end', function(data){

    console.log("读完了");

});

自定义可写流

为了实现可写流,我们需要使用流模块中的Writable构造函数。 我们只需给Writable构造函数传递一些选项并创建一个对象。唯一需要的选项是write函数,该函数揭露数据块要往哪里写。

1.chunk通常是一个buffer,除非我们配置不同的流。

2.encoding是在特定情况下需要的参数,通常我们可以忽略它。

3.callback是在完成处理数据块后需要调用的函数。这是写数据成功与否的标志。若要发出故障信号,请用错误对象调用回调函数

var stream=require('stream');

var util=require('util');

util.inherits(Writer,stream.Writable);

letstock=[];

function Writer(opt) {

stream.Writable.call(this,opt);

}

Writer.prototype._write=function(chunk,encoding,callback) {

setTimeout(()=>{

stock.push(chunk.toString('utf8'));

console.log("增加: "+chunk);

callback();

},500)

};

var w=newWriter();

for(vari=1;i<=5;i++){

w.write("项目:"+i,'utf8');

}

w.end("结束写入",function(){

console.log(stock);

});

双工流

双工流(可读可写流)是可读流和可写流的实现。例如:net.Socket

let {Duplex} = require('stream');

let index = 0;

let s = Duplex({

    read(){

        if(index++<3)

          this.push('a');

          else

      this.push(null); 

    },

    write(chunk,encoding,cb){

      console.log(chunk.toString().toUpperCase());

      cb();

    }

});

//process.stdin 标准输入流

//proces.stdout标准输出流

process.stdin.pipe(s).pipe(process.stdout);

Transform转换流

转换流是实现数据转换的,(可读流可写流)只能实现一种。

let {Transform} = require('stream');

let t = Transform({

    transform(chunk,encoding,cb){

        this.push(chunk.toString().toUpperCase());

        cb();

    }

});

process.stdin.pipe(t).pipe(process.stdout);

上一篇 下一篇

猜你喜欢

热点阅读