Stream.Readable可写流类

2021-02-25  本文已影响0人  静昕妈妈芦培培

例: readable.read([size])
从内部缓冲拉取并返回数据。 如果没有可读的数据,则返回 null。
默认情况下, readable.read([size]) 返回的数据是 Buffer 对象,size 参数指定要读取的特定字节数。如果没有指定 size 参数,则返回内部缓冲中的所有数据。

const {Readable } = require('stream')
const fs = require('fs')

//创建一个可读流
// const reader = new Readable({
//     highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//
//     //实现read方法
//     read(size) {
//         //size <number> 要异步读取的字节数。
//         console.log('一次读取%d个字节', size)
//     }
// })

const reader = fs.createReadStream('foo.txt');



reader.on('close', () => {
    console.log('触发了close事件')
})


//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
    console.log('当流中没有数据可供消费')
})

//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
    console.error(err)
})

//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
    console.log('触发了暂停事件')
})

//当有数据可从流中读取时,就会触发 'readable' 事件。
//当到达流数据的尽头时, 'readable' 事件也会触发,但是在 'end' 事件之前触发。
//'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。
//对于前者,stream.read() 会返回可用的数据。 对于后者,stream.read() 会返回 null
reader.on('readable', function(){
    console.log('触发了readable事件')
    //有数据可读取
    let data;


    //可以使用this.read()的前提时,所在的匿名函数非箭头函数,this就指向reader
    while (data = this.read()) {
        //data为buffer对象
        console.log(data);
    }
})


//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
    console.log('触发了resume事件')
})

执行结果:


image.png

修改代码,给this.read(6),传参6,一次性读取六个字节

const {Readable } = require('stream')
const fs = require('fs')

//创建一个可读流
// const reader = new Readable({
//     highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//
//     //实现read方法
//     read(size) {
//         //size <number> 要异步读取的字节数。
//         console.log('一次读取%d个字节', size)
//     }
// })

const reader = fs.createReadStream('foo.txt');



reader.on('close', () => {
    console.log('触发了close事件')
})


//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
    console.log('当流中没有数据可供消费')
})

//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
    console.error(err)
})

//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
    console.log('触发了暂停事件')
})

//当有数据可从流中读取时,就会触发 'readable' 事件。
//当到达流数据的尽头时, 'readable' 事件也会触发,但是在 'end' 事件之前触发。
//'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。
//对于前者,stream.read() 会返回可用的数据。 对于后者,stream.read() 会返回 null
reader.on('readable', function(){
    console.log('触发了readable事件')
    //有数据可读取
    let data;


    //可以使用this.read()的前提时,所在的匿名函数非箭头函数,this就指向reader
    while (data = this.read(6)) {
        //data为buffer对象
        console.log(data);
    }
})


//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
    console.log('触发了resume事件')
})

执行结果:


image.png

修改代码:使用 readable.setEncoding() 指定字符编码,readbale.read(3)返回的不再是buffer对象,而是字符串,此时参数3代表的是一次性读取三个字符串

const {Readable } = require('stream')
const fs = require('fs')

//创建一个可读流
// const reader = new Readable({
//     highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//
//     //实现read方法
//     read(size) {
//         //size <number> 要异步读取的字节数。
//         console.log('一次读取%d个字节', size)
//     }
// })

const reader = fs.createReadStream('foo.txt');



reader.on('close', () => {
    console.log('触发了close事件')
})


//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
    console.log('当流中没有数据可供消费')
})

//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
    console.error(err)
})

//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
    console.log('触发了暂停事件')
})

//当有数据可从流中读取时,就会触发 'readable' 事件。
//当到达流数据的尽头时, 'readable' 事件也会触发,但是在 'end' 事件之前触发。
//'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。
//对于前者,stream.read() 会返回可用的数据。 对于后者,stream.read() 会返回 null
reader.on('readable', function(){
    console.log('触发了readable事件')
    //有数据可读取
    let data;


    //可以使用this.read()的前提时,所在的匿名函数非箭头函数,this就指向reader
    //设置了下面的语句后,this.read()返回的数据就不再是buffer对象,是字符串
    //此时参数size代表的是一次性读取size个字符串
    this.setEncoding('utf8')
    while (data = this.read(3)) {
        //data为字符串
        console.log(data);
    }
})


//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
    console.log('触发了resume事件')
})

执行结果:


image.png

例2:给可读流绑定data事件读取数据,会先触发resume事件,再触发data事件,data事件读取数据默认读取的是buffer对象

const {Readable } = require('stream')
const fs = require('fs')

//创建一个可读流
// const reader = new Readable({
//     highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//
//     //实现read方法
//     read(size) {
//         //size <number> 要异步读取的字节数。
//         console.log('一次读取%d个字节', size)
//     }
// })

const reader = fs.createReadStream('foo.txt');



reader.on('close', () => {
    console.log('触发了close事件')
})

//当调用 readable.pipe(), readable.resume() 或绑定监听器到 'data' 事件时,流会转换到流动模式。
// 当调用 readable.read() 且有数据块返回时,也会触发 'data' 事件。
reader.on('data', (chunk) => {
    //如果使用 readable.setEncoding() 为流指定了默认的字符编码,则监听器回调传入的数据为字符串,否则传入的数据为 Buffer
    console.log('监听data事件读取到的数据',chunk)
})


//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
    console.log('当流中没有数据可供消费')
})

//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
    console.error(err)
})

//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
    console.log('触发了暂停事件')
})


//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
    console.log('触发了resume事件')
})

执行结果:


image.png

修改代码:使用reader.setEncoding('utf8')指定字符编码,data事件读取到的数据不再是buffer,而是字符串

const {Readable } = require('stream')
const fs = require('fs')

//创建一个可读流
// const reader = new Readable({
//     highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//
//     //实现read方法
//     read(size) {
//         //size <number> 要异步读取的字节数。
//         console.log('一次读取%d个字节', size)
//     }
// })

const reader = fs.createReadStream('foo.txt');

//指定字符编码
reader.setEncoding('utf8')

reader.on('close', () => {
    console.log('触发了close事件')
})

//当调用 readable.pipe(), readable.resume() 或绑定监听器到 'data' 事件时,流会转换到流动模式。
// 当调用 readable.read() 且有数据块返回时,也会触发 'data' 事件。
reader.on('data', (chunk) => {
    //如果使用 readable.setEncoding() 为流指定了默认的字符编码,则监听器回调传入的数据为字符串,否则传入的数据为 Buffer
    console.log('监听data事件读取到的数据',chunk)
})


//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
    console.log('当流中没有数据可供消费')
})

//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
    console.error(err)
})

//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
    console.log('触发了暂停事件')
})


//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
    console.log('触发了resume事件')
})

执行结果:


image.png

如果同时使用 'readable' 事件和 'data' 事件,则 'readable' 事件会优先控制流,
也就是说,当调用 stream.read() 时才会触发 'data' 事件。readableFlowing 属性会变成 false。
当移除 'readable' 事件时,如果存在 'data' 事件监听器,则流会开始流动,也就是说,无需调用 .resume() 也会触发 'data' 事件。

const {Readable } = require('stream')
const fs = require('fs')

//创建一个可读流
// const reader = new Readable({
//     highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//
//     //实现read方法
//     read(size) {
//         //size <number> 要异步读取的字节数。
//         console.log('一次读取%d个字节', size)
//     }
// })

const reader = fs.createReadStream('foo.txt');

//指定字符编码
reader.setEncoding('utf8')

reader.on('close', () => {
    console.log('触发了close事件')
})



//当调用 readable.pipe(), readable.resume() 或绑定监听器到 'data' 事件时,流会转换到流动模式。
// 当调用 readable.read() 且有数据块返回时,也会触发 'data' 事件。
reader.on('data', (chunk) => {
    //如果使用 readable.setEncoding() 为流指定了默认的字符编码,则监听器回调传入的数据为字符串,否则传入的数据为 Buffer
    console.log('监听data事件读取到的数据',chunk)
})


//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
    console.log('当流中没有数据可供消费')
})

//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
    console.error(err)
})

//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
    console.log('触发了暂停事件')
})

//当有数据可从流中读取时,就会触发 'readable' 事件。
//当到达流数据的尽头时, 'readable' 事件也会触发,但是在 'end' 事件之前触发。
//'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。
//对于前者,stream.read() 会返回可用的数据。 对于后者,stream.read() 会返回 null
//如果同时使用 'readable' 事件和 'data' 事件,则 'readable' 事件会优先控制流,
// 也就是说,当调用 stream.read() 时才会触发 'data' 事件。readableFlowing 属性会变成 false。
// 当移除 'readable' 事件时,如果存在 'data' 事件监听器,则流会开始流动,也就是说,无需调用 .resume() 也会触发 'data' 事件。
reader.on('readable', function(){
    console.log('触发了readable事件')
    //有数据可读取
    let data;

    //可以使用this.read()的前提时,所在的匿名函数非箭头函数,this就指向reader
    while (data = this.read(3)) {

        //data为buffer对象
        console.log('read方法返回的数据',data);
    }
})


//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
    console.log('触发了resume事件')
})

执行结果:


image.png

例3:
当可读流处在暂停模式时,使用 readable.push() 添加的数据
可以在触发 'readable' 事件时通过调用 readable.read() 读取。
当可读流处于流动模式时,使用 readable.push() 添加的数据可以通过触发 'data' 事件读取。

const {Readable } = require('stream')

//创建一个可读流
const reader = new Readable({
    highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
    //实现read方法
    //一旦 readable._read() 方法被调用,将不会再次调用它,直到更多数据通过 readable.push() 方法被推送
    read(size) {
        //size <number> 要异步读取的字节数。
        console.log('一次读取%d个字符串', size)
    },
    destroy(err, callback) {
        console.log(err)
        callback()
    }
})

//当可读流处在暂停模式时,使用 readable.push() 添加的数据
// 可以在触发 'readable' 事件时通过调用 readable.read() 读取。
//当可读流处于流动模式时,使用 readable.push() 添加的数据可以通过触发 'data' 事件读取。
reader.push('你好啊李银河,你那里下雪了么')


//指定字符编码
reader.setEncoding('utf8')

reader.on('close', () => {
    console.log('触发了close事件')
})


//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
    console.log('当流中没有数据可供消费')
})

//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
    console.error(err)
})

//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
    console.log('触发了暂停事件')
})

//当有数据可从流中读取时,就会触发 'readable' 事件。
//当到达流数据的尽头时, 'readable' 事件也会触发,但是在 'end' 事件之前触发。
//'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。
//对于前者,stream.read() 会返回可用的数据。 对于后者,stream.read() 会返回 null
//如果同时使用 'readable' 事件和 'data' 事件,则 'readable' 事件会优先控制流,
// 也就是说,当调用 stream.read() 时才会触发 'data' 事件。readableFlowing 属性会变成 false。
// 当移除 'readable' 事件时,如果存在 'data' 事件监听器,则流会开始流动,也就是说,无需调用 .resume() 也会触发 'data' 事件。
reader.on('readable', function(){
    console.log('触发了readable事件')
    //有数据可读取
    let data;

    //readable.read()
    //从内部缓冲拉取并返回数据。 如果没有可读的数据,则返回 null。
    // 默认情况下, readable.read() 返回的数据是 Buffer 对象,除非使用 readable.setEncoding() 指定字符编码或流处于对象模式。
    //size 参数指定要读取的特定字节数。如果没有指定 size 参数,则返回内部缓冲中的所有数据。
    //可以使用this.read()的前提时,所在的匿名函数非箭头函数,this就指向reader
    while (data = this.read(3)) {

        //data为buffer对象
        console.log('read方法返回的数据',data);
    }
})


//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
    console.log('触发了resume事件')
})

执行结果:


image.png

修改代码:去掉指定字符编码

const reader = new Readable({
    highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
    //实现read方法
    //一旦 readable._read() 方法被调用,将不会再次调用它,直到更多数据通过 readable.push() 方法被推送
    read(size) {
        //size <number> 要异步读取的字节数。
        //当没有使用 readable.setEncoding() 方法设置 encoding 属性,size的单位为字节
        //如果使用 readable.setEncoding('utf8') 方法设置 encoding 属性,size的单位为个
        console.log('一次读取%d个字节', size)
    },
    destroy(err, callback) {
        console.log(err)
        callback()
    }
})

//当可读流处在暂停模式时,使用 readable.push() 添加的数据
// 可以在触发 'readable' 事件时通过调用 readable.read() 读取。
//当可读流处于流动模式时,使用 readable.push() 添加的数据可以通过触发 'data' 事件读取。
reader.push('你好啊李银河,你那里下雪了么')

reader.on('close', () => {
    console.log('触发了close事件')
})


//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
    console.log('当流中没有数据可供消费')
})

//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
    console.error(err)
})

//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
    console.log('触发了暂停事件')
})

//当有数据可从流中读取时,就会触发 'readable' 事件。
//当到达流数据的尽头时, 'readable' 事件也会触发,但是在 'end' 事件之前触发。
//'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。
//对于前者,stream.read() 会返回可用的数据。 对于后者,stream.read() 会返回 null
//如果同时使用 'readable' 事件和 'data' 事件,则 'readable' 事件会优先控制流,
// 也就是说,当调用 stream.read() 时才会触发 'data' 事件。readableFlowing 属性会变成 false。
// 当移除 'readable' 事件时,如果存在 'data' 事件监听器,则流会开始流动,也就是说,无需调用 .resume() 也会触发 'data' 事件。
reader.on('readable', function(){
    console.log('触发了readable事件')
    //有数据可读取
    let data;

    //readable.read()
    //从内部缓冲拉取并返回数据。 如果没有可读的数据,则返回 null。
    // 默认情况下, readable.read() 返回的数据是 Buffer 对象,除非使用 readable.setEncoding() 指定字符编码或流处于对象模式。
    //size 参数指定要读取的特定字节数。如果没有指定 size 参数,则返回内部缓冲中的所有数据。
    //可以使用this.read()的前提时,所在的匿名函数非箭头函数,this就指向reader
    while (data = this.read(3)) {

        //data为buffer对象
        console.log('read方法返回的数据',data);
    }
})


//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
    console.log('触发了resume事件')
})

执行结果:


image.png

例:当可读流处于流动模式时,使用 readable.push() 添加的数据可以通过触发 'data' 事件读取。

const {Readable } = require('stream')

//创建一个可读流
const reader = new Readable({
    highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
    //实现read方法
    //一旦 readable._read() 方法被调用,将不会再次调用它,直到更多数据通过 readable.push() 方法被推送
    read(size) {
        //size <number> 要异步读取的字节数。
        //当没有使用 readable.setEncoding() 方法设置 encoding 属性,size的单位为字节
        //如果使用 readable.setEncoding('utf8') 方法设置 encoding 属性,size的单位为个
        console.log('一次读取%d个字节', size)
    },
    destroy(err, callback) {
        console.log(err)
        callback()
    }
})

//当可读流处在暂停模式时,使用 readable.push() 添加的数据
// 可以在触发 'readable' 事件时通过调用 readable.read() 读取。
//当可读流处于流动模式时,使用 readable.push() 添加的数据可以通过触发 'data' 事件读取。
reader.push('你好啊李银河,你那里下雪了么')

reader.on('close', () => {
    console.log('触发了close事件')
})


//当调用 readable.pipe(), readable.resume() 或绑定监听器到 'data' 事件时,流会转换到流动模式。
// 当调用 readable.read() 且有数据块返回时,也会触发 'data' 事件。
reader.on('data', (chunk) => {
    //如果使用 readable.setEncoding() 为流指定了默认的字符编码,则监听器回调传入的数据为字符串,否则传入的数据为 Buffer
    console.log('监听data事件读取到的数据',chunk)
})


//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
    console.log('当流中没有数据可供消费')
})

//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
    console.error(err)
})

//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
    console.log('触发了暂停事件')
})



//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
    console.log('触发了resume事件')
})

执行结果:


image.png
上一篇 下一篇

猜你喜欢

热点阅读