Stream.Readable可写流类
例: readable.read([size])
从内部缓冲拉取并返回数据。 如果没有可读的数据,则返回 null。
默认情况下, readable.read([size]) 返回的数据是 Buffer 对象,size 参数指定要读取的特定字节数。如果没有指定 size 参数,则返回内部缓冲中的所有数据。
const {Readable } = require('stream')
const fs = require('fs')
//创建一个可读流
// const reader = new Readable({
// highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//
// //实现read方法
// read(size) {
// //size <number> 要异步读取的字节数。
// console.log('一次读取%d个字节', size)
// }
// })
const reader = fs.createReadStream('foo.txt');
reader.on('close', () => {
console.log('触发了close事件')
})
//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
console.log('当流中没有数据可供消费')
})
//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
console.error(err)
})
//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
console.log('触发了暂停事件')
})
//当有数据可从流中读取时,就会触发 'readable' 事件。
//当到达流数据的尽头时, 'readable' 事件也会触发,但是在 'end' 事件之前触发。
//'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。
//对于前者,stream.read() 会返回可用的数据。 对于后者,stream.read() 会返回 null
reader.on('readable', function(){
console.log('触发了readable事件')
//有数据可读取
let data;
//可以使用this.read()的前提时,所在的匿名函数非箭头函数,this就指向reader
while (data = this.read()) {
//data为buffer对象
console.log(data);
}
})
//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
console.log('触发了resume事件')
})
执行结果:

修改代码,给this.read(6),传参6,一次性读取六个字节
const {Readable } = require('stream')
const fs = require('fs')
//创建一个可读流
// const reader = new Readable({
// highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//
// //实现read方法
// read(size) {
// //size <number> 要异步读取的字节数。
// console.log('一次读取%d个字节', size)
// }
// })
const reader = fs.createReadStream('foo.txt');
reader.on('close', () => {
console.log('触发了close事件')
})
//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
console.log('当流中没有数据可供消费')
})
//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
console.error(err)
})
//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
console.log('触发了暂停事件')
})
//当有数据可从流中读取时,就会触发 'readable' 事件。
//当到达流数据的尽头时, 'readable' 事件也会触发,但是在 'end' 事件之前触发。
//'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。
//对于前者,stream.read() 会返回可用的数据。 对于后者,stream.read() 会返回 null
reader.on('readable', function(){
console.log('触发了readable事件')
//有数据可读取
let data;
//可以使用this.read()的前提时,所在的匿名函数非箭头函数,this就指向reader
while (data = this.read(6)) {
//data为buffer对象
console.log(data);
}
})
//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
console.log('触发了resume事件')
})
执行结果:

修改代码:使用 readable.setEncoding() 指定字符编码,readbale.read(3)返回的不再是buffer对象,而是字符串,此时参数3代表的是一次性读取三个字符串
const {Readable } = require('stream')
const fs = require('fs')
//创建一个可读流
// const reader = new Readable({
// highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//
// //实现read方法
// read(size) {
// //size <number> 要异步读取的字节数。
// console.log('一次读取%d个字节', size)
// }
// })
const reader = fs.createReadStream('foo.txt');
reader.on('close', () => {
console.log('触发了close事件')
})
//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
console.log('当流中没有数据可供消费')
})
//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
console.error(err)
})
//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
console.log('触发了暂停事件')
})
//当有数据可从流中读取时,就会触发 'readable' 事件。
//当到达流数据的尽头时, 'readable' 事件也会触发,但是在 'end' 事件之前触发。
//'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。
//对于前者,stream.read() 会返回可用的数据。 对于后者,stream.read() 会返回 null
reader.on('readable', function(){
console.log('触发了readable事件')
//有数据可读取
let data;
//可以使用this.read()的前提时,所在的匿名函数非箭头函数,this就指向reader
//设置了下面的语句后,this.read()返回的数据就不再是buffer对象,是字符串
//此时参数size代表的是一次性读取size个字符串
this.setEncoding('utf8')
while (data = this.read(3)) {
//data为字符串
console.log(data);
}
})
//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
console.log('触发了resume事件')
})
执行结果:

例2:给可读流绑定data事件读取数据,会先触发resume事件,再触发data事件,data事件读取数据默认读取的是buffer对象
const {Readable } = require('stream')
const fs = require('fs')
//创建一个可读流
// const reader = new Readable({
// highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//
// //实现read方法
// read(size) {
// //size <number> 要异步读取的字节数。
// console.log('一次读取%d个字节', size)
// }
// })
const reader = fs.createReadStream('foo.txt');
reader.on('close', () => {
console.log('触发了close事件')
})
//当调用 readable.pipe(), readable.resume() 或绑定监听器到 'data' 事件时,流会转换到流动模式。
// 当调用 readable.read() 且有数据块返回时,也会触发 'data' 事件。
reader.on('data', (chunk) => {
//如果使用 readable.setEncoding() 为流指定了默认的字符编码,则监听器回调传入的数据为字符串,否则传入的数据为 Buffer
console.log('监听data事件读取到的数据',chunk)
})
//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
console.log('当流中没有数据可供消费')
})
//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
console.error(err)
})
//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
console.log('触发了暂停事件')
})
//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
console.log('触发了resume事件')
})
执行结果:

修改代码:使用reader.setEncoding('utf8')指定字符编码,data事件读取到的数据不再是buffer,而是字符串
const {Readable } = require('stream')
const fs = require('fs')
//创建一个可读流
// const reader = new Readable({
// highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//
// //实现read方法
// read(size) {
// //size <number> 要异步读取的字节数。
// console.log('一次读取%d个字节', size)
// }
// })
const reader = fs.createReadStream('foo.txt');
//指定字符编码
reader.setEncoding('utf8')
reader.on('close', () => {
console.log('触发了close事件')
})
//当调用 readable.pipe(), readable.resume() 或绑定监听器到 'data' 事件时,流会转换到流动模式。
// 当调用 readable.read() 且有数据块返回时,也会触发 'data' 事件。
reader.on('data', (chunk) => {
//如果使用 readable.setEncoding() 为流指定了默认的字符编码,则监听器回调传入的数据为字符串,否则传入的数据为 Buffer
console.log('监听data事件读取到的数据',chunk)
})
//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
console.log('当流中没有数据可供消费')
})
//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
console.error(err)
})
//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
console.log('触发了暂停事件')
})
//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
console.log('触发了resume事件')
})
执行结果:

如果同时使用 'readable' 事件和 'data' 事件,则 'readable' 事件会优先控制流,
也就是说,当调用 stream.read() 时才会触发 'data' 事件。readableFlowing 属性会变成 false。
当移除 'readable' 事件时,如果存在 'data' 事件监听器,则流会开始流动,也就是说,无需调用 .resume() 也会触发 'data' 事件。
const {Readable } = require('stream')
const fs = require('fs')
//创建一个可读流
// const reader = new Readable({
// highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//
// //实现read方法
// read(size) {
// //size <number> 要异步读取的字节数。
// console.log('一次读取%d个字节', size)
// }
// })
const reader = fs.createReadStream('foo.txt');
//指定字符编码
reader.setEncoding('utf8')
reader.on('close', () => {
console.log('触发了close事件')
})
//当调用 readable.pipe(), readable.resume() 或绑定监听器到 'data' 事件时,流会转换到流动模式。
// 当调用 readable.read() 且有数据块返回时,也会触发 'data' 事件。
reader.on('data', (chunk) => {
//如果使用 readable.setEncoding() 为流指定了默认的字符编码,则监听器回调传入的数据为字符串,否则传入的数据为 Buffer
console.log('监听data事件读取到的数据',chunk)
})
//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
console.log('当流中没有数据可供消费')
})
//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
console.error(err)
})
//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
console.log('触发了暂停事件')
})
//当有数据可从流中读取时,就会触发 'readable' 事件。
//当到达流数据的尽头时, 'readable' 事件也会触发,但是在 'end' 事件之前触发。
//'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。
//对于前者,stream.read() 会返回可用的数据。 对于后者,stream.read() 会返回 null
//如果同时使用 'readable' 事件和 'data' 事件,则 'readable' 事件会优先控制流,
// 也就是说,当调用 stream.read() 时才会触发 'data' 事件。readableFlowing 属性会变成 false。
// 当移除 'readable' 事件时,如果存在 'data' 事件监听器,则流会开始流动,也就是说,无需调用 .resume() 也会触发 'data' 事件。
reader.on('readable', function(){
console.log('触发了readable事件')
//有数据可读取
let data;
//可以使用this.read()的前提时,所在的匿名函数非箭头函数,this就指向reader
while (data = this.read(3)) {
//data为buffer对象
console.log('read方法返回的数据',data);
}
})
//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
console.log('触发了resume事件')
})
执行结果:

例3:
当可读流处在暂停模式时,使用 readable.push() 添加的数据
可以在触发 'readable' 事件时通过调用 readable.read() 读取。
当可读流处于流动模式时,使用 readable.push() 添加的数据可以通过触发 'data' 事件读取。
const {Readable } = require('stream')
//创建一个可读流
const reader = new Readable({
highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//实现read方法
//一旦 readable._read() 方法被调用,将不会再次调用它,直到更多数据通过 readable.push() 方法被推送
read(size) {
//size <number> 要异步读取的字节数。
console.log('一次读取%d个字符串', size)
},
destroy(err, callback) {
console.log(err)
callback()
}
})
//当可读流处在暂停模式时,使用 readable.push() 添加的数据
// 可以在触发 'readable' 事件时通过调用 readable.read() 读取。
//当可读流处于流动模式时,使用 readable.push() 添加的数据可以通过触发 'data' 事件读取。
reader.push('你好啊李银河,你那里下雪了么')
//指定字符编码
reader.setEncoding('utf8')
reader.on('close', () => {
console.log('触发了close事件')
})
//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
console.log('当流中没有数据可供消费')
})
//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
console.error(err)
})
//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
console.log('触发了暂停事件')
})
//当有数据可从流中读取时,就会触发 'readable' 事件。
//当到达流数据的尽头时, 'readable' 事件也会触发,但是在 'end' 事件之前触发。
//'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。
//对于前者,stream.read() 会返回可用的数据。 对于后者,stream.read() 会返回 null
//如果同时使用 'readable' 事件和 'data' 事件,则 'readable' 事件会优先控制流,
// 也就是说,当调用 stream.read() 时才会触发 'data' 事件。readableFlowing 属性会变成 false。
// 当移除 'readable' 事件时,如果存在 'data' 事件监听器,则流会开始流动,也就是说,无需调用 .resume() 也会触发 'data' 事件。
reader.on('readable', function(){
console.log('触发了readable事件')
//有数据可读取
let data;
//readable.read()
//从内部缓冲拉取并返回数据。 如果没有可读的数据,则返回 null。
// 默认情况下, readable.read() 返回的数据是 Buffer 对象,除非使用 readable.setEncoding() 指定字符编码或流处于对象模式。
//size 参数指定要读取的特定字节数。如果没有指定 size 参数,则返回内部缓冲中的所有数据。
//可以使用this.read()的前提时,所在的匿名函数非箭头函数,this就指向reader
while (data = this.read(3)) {
//data为buffer对象
console.log('read方法返回的数据',data);
}
})
//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
console.log('触发了resume事件')
})
执行结果:

修改代码:去掉指定字符编码
const reader = new Readable({
highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//实现read方法
//一旦 readable._read() 方法被调用,将不会再次调用它,直到更多数据通过 readable.push() 方法被推送
read(size) {
//size <number> 要异步读取的字节数。
//当没有使用 readable.setEncoding() 方法设置 encoding 属性,size的单位为字节
//如果使用 readable.setEncoding('utf8') 方法设置 encoding 属性,size的单位为个
console.log('一次读取%d个字节', size)
},
destroy(err, callback) {
console.log(err)
callback()
}
})
//当可读流处在暂停模式时,使用 readable.push() 添加的数据
// 可以在触发 'readable' 事件时通过调用 readable.read() 读取。
//当可读流处于流动模式时,使用 readable.push() 添加的数据可以通过触发 'data' 事件读取。
reader.push('你好啊李银河,你那里下雪了么')
reader.on('close', () => {
console.log('触发了close事件')
})
//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
console.log('当流中没有数据可供消费')
})
//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
console.error(err)
})
//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
console.log('触发了暂停事件')
})
//当有数据可从流中读取时,就会触发 'readable' 事件。
//当到达流数据的尽头时, 'readable' 事件也会触发,但是在 'end' 事件之前触发。
//'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。
//对于前者,stream.read() 会返回可用的数据。 对于后者,stream.read() 会返回 null
//如果同时使用 'readable' 事件和 'data' 事件,则 'readable' 事件会优先控制流,
// 也就是说,当调用 stream.read() 时才会触发 'data' 事件。readableFlowing 属性会变成 false。
// 当移除 'readable' 事件时,如果存在 'data' 事件监听器,则流会开始流动,也就是说,无需调用 .resume() 也会触发 'data' 事件。
reader.on('readable', function(){
console.log('触发了readable事件')
//有数据可读取
let data;
//readable.read()
//从内部缓冲拉取并返回数据。 如果没有可读的数据,则返回 null。
// 默认情况下, readable.read() 返回的数据是 Buffer 对象,除非使用 readable.setEncoding() 指定字符编码或流处于对象模式。
//size 参数指定要读取的特定字节数。如果没有指定 size 参数,则返回内部缓冲中的所有数据。
//可以使用this.read()的前提时,所在的匿名函数非箭头函数,this就指向reader
while (data = this.read(3)) {
//data为buffer对象
console.log('read方法返回的数据',data);
}
})
//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
console.log('触发了resume事件')
})
执行结果:

例:当可读流处于流动模式时,使用 readable.push() 添加的数据可以通过触发 'data' 事件读取。
const {Readable } = require('stream')
//创建一个可读流
const reader = new Readable({
highWaterMark: 3, //从底层资源读取数据并存储在内部缓冲区中的最大字节数。 默认值: 16384
//实现read方法
//一旦 readable._read() 方法被调用,将不会再次调用它,直到更多数据通过 readable.push() 方法被推送
read(size) {
//size <number> 要异步读取的字节数。
//当没有使用 readable.setEncoding() 方法设置 encoding 属性,size的单位为字节
//如果使用 readable.setEncoding('utf8') 方法设置 encoding 属性,size的单位为个
console.log('一次读取%d个字节', size)
},
destroy(err, callback) {
console.log(err)
callback()
}
})
//当可读流处在暂停模式时,使用 readable.push() 添加的数据
// 可以在触发 'readable' 事件时通过调用 readable.read() 读取。
//当可读流处于流动模式时,使用 readable.push() 添加的数据可以通过触发 'data' 事件读取。
reader.push('你好啊李银河,你那里下雪了么')
reader.on('close', () => {
console.log('触发了close事件')
})
//当调用 readable.pipe(), readable.resume() 或绑定监听器到 'data' 事件时,流会转换到流动模式。
// 当调用 readable.read() 且有数据块返回时,也会触发 'data' 事件。
reader.on('data', (chunk) => {
//如果使用 readable.setEncoding() 为流指定了默认的字符编码,则监听器回调传入的数据为字符串,否则传入的数据为 Buffer
console.log('监听data事件读取到的数据',chunk)
})
//'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。
reader.on('end', () => {
console.log('当流中没有数据可供消费')
})
//监听器回调将会传入一个 Error 对象。
reader.on('error', (err) => {
console.error(err)
})
//当调用 stream.pause() 并且 readsFlowing 不为 false 时,就会触发 'pause' 事件。
reader.on('pause', () => {
console.log('触发了暂停事件')
})
//当调用 stream.resume() 并且 readsFlowing 不为 true 时,将会触发 'resume' 事件。
reader.on('resume', () => {
console.log('触发了resume事件')
})
执行结果:
