Stream - 流
2020-06-29 本文已影响0人
littleyu
stream 释义
- stream 是水流,但默认没有水
- stream.write 可以让水流中有水(数据)
- 每次写的小数据叫做 chunk (块)
- 产生数据的一段叫做 source (源头)
- 得到数据的一段叫做 sink (水池)
实用栗子:
用 stream 和不用 stream 传输大文件时,node 占用内存区别很大
用 stream 时,读一个 150m 的文件,基本不会高于 30m
但是不用 stream,内存占用 100m+,要知道分配给nodeJs的内存有限,所以用 stream 就可以一点一点慢慢传
管道 释义
- 两个流可以用一个管道相连
- stream1 的末尾连接上 stream2 的开端
- 只要 stream1 有数据,就会流到 stream2
pipe 也可以通过事件实现
// stream1 有数据就塞给 stream2
stream1.on('data', chunk => {
stream2.write(chunk)
})
// stream1 停了,就停掉 stream2
stream1.on('end', () => {
stream2.end()
})
Stream 对象的原型链
- 自身属性(由fs.Readable.prototype 构造)
- 原型:stream.Readable.peototype
- 二级原型: stream.Stream.prototype
- 三级原型:events.EventEmitter.prototype
- 四级原型:Object.prototype
- Stream 对象都继承了 EventEmitter
支持的事件和方法
Stream 分类
- Readable 可读
- Writeable 可写
- Duplex 可读可写(双向)(读写一般不交叉)
- Transform 可读可写(变化)(自己写,自己读,webpack常用,比如把 scss 转换成 css,es6 编译成 es5)
手动实现一个 Readable 的流
const { Readable } = require('stream')
// 先推再读
// const inStream = new Readable()
// inStream.push('zch')
// inStream.push('233333333333333333')
// inStream.push(null) // no more data
// // inStream.pipe(process.stdout)
// // 等价于
// inStream.on('data', chunk => {
// console.log('push')
// console.log(chunk.toString())
// })
// 等读了再推
const inStream = new Readable({
read(size) {
const char = String.fromCharCode(this.charCode++)
console.log('push\n')
this.push(char)
if (this.charCode > 90) {
this.push(null)
}
}
})
inStream.charCode = 65
inStream.pipe(process.stdout)
手动实现一个 writable 的流
const { Writable } = require('stream')
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString())
callback()
}
})
process.stdin.pipe(outStream)
手动实现一个 duplex 的流
// 只需要把 writable 和 readable 结合起来,同事实现 read 和 write 方法即可
手动实现一个 transform 的流
const { Transform } = require('stream')
const upperTransform = new Transform({
transform (chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase())
callback()
}
})
process.stdin.pipe(upperTransform).pipe(process.stdout)
transform 的流的栗子(实现gzip):
const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'))
升级(加点进度条)
const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]
fs.createReadStream(file)
.pipe(zlib.createGzip())
.on('data', () => console.log('.'))
.pipe(fs.createWriteStream(file + '.gz'))
.on('finish', () => console.log('Done'))
再次升级(升级为一个 transform 方法)
const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]
const { Transform } = require('stream')
const transform = new Transform({
transform (chunk, encoding, callback) {
console.log('.')
callback(null, chunk)
}
})
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(transform) // 这里可以做进度条,webpack 就是这样的原理,vue-loader => scss-loader => css-loader => style-loader
.pipe(fs.createWriteStream(file + '.gz'))
.on('finish', () => console.log('Done'))
再次升级(我们还可以加密内容)
const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]
const crypto = require('crypto')
const { Transform } = require('stream')
const transform = new Transform({
transform (chunk, encoding, callback) {
console.log('.')
callback(null, chunk)
}
})
fs.createReadStream(file)
.pipe(crypto.createCipher('aes192', '123456'))
.pipe(zlib.createGzip())
.pipe(transform) // 这里可以做进度条,webpack 就是这样的原理,vue-loader => scss-loader => css-loader => style-loader
.pipe(fs.createWriteStream(file + '.gz'))
.on('finish', () => console.log('Done'))