程序员前端开发那些事我是程序员;您好程先生;叫我序员就好了

Node.js流模式编程详解

2016-04-22  本文已影响3129人  朱耀锋

本文是Node.js设计模式的笔记, 代码都是来自 <Node.js Design Patterns> by Mario Casciaro.

流的重要性

一般我们处理数据有两种模式, 一种是buffer模式, 一种是stream模式, buffer模式就是取完数据一次性操作, stream模式就是边取数据边操作.
举个例子, 如果打开一个2G的文件, 用buffer模式就是先分配2G的内存, 把文件全部读出来, 然后开始操作内存, 而用流模式的方法就是边读数据, 边开始处理.

从这里看出stream模式无论是在空间和时间上都优于buffer模式:
在空间上, 内存只会占用当前需要处理的一块数据区域的大小, 而不是整个文件.
在时间上, 因为不需要全部的数据就可以开始处理, 时间就相当于节约了, 从串行变成了并行操作(这里的并行不是多线程的并行, 而是生产者和消费者并行).

还有一个好处就是链式调用, 也就是可组合操作, 大大增加了代码的可重用性.
比如下面这个代码(中间的pipe可以很方便的增删):

fs.createReadStream(file)
     .pipe(zlib.createGzip())
     //.pipe(crypto.createCipher('aes192', 'secret'))
     .pipe(req)
     .on('finish', function() {
       console.log('File succesfully sent');
     });

开始编码

nodejs里面的stream一般分四种, 其中转换流是一种特殊的读写流.

另外, nodejs里面的流有两种模式, 二进制模式和对象模式.

输入流(stream.Readable)

先看一下怎么使用输入流, 这里一般有两种方法, 一个是非流动模式, 一个是流动模式.
非流动模式就是直接调用read()方法, 被动模式就是监听data事件.
下面直接看代码:

// 非流动模式
process.stdin
    .on('readable', function() {
        // 有数据到了, 拼命读, 直到读完.
        var chunk;
        console.log('New data available');
        while((chunk = process.stdin.read()) !== null) {
            console.log(
            'Chunk read: (' + chunk.length + ') "' + chunk.toString() + '"'
            );
        }})
    .on('end', function() {
       process.stdout.write('End of stream');
    });

接下来看流动模式怎么玩

// 流动模式
process.stdin
     .on('data', function(chunk) {
       console.log('New data available');
       console.log(
         'Chunk read: (' + chunk.length + ')" ' +
         chunk.toString() + '"'
       );
     })
     .on('end', function() {
       process.stdout.write('End of stream');
     });

同样实现一个输入流也很简单, 主要是

  1. 继承Readable类
  2. 实现_read(size)接口(一般带下划线的表示内部函数, 调用者不要直接调用, 相当于C++里面的protect方法, 只是javascript里面没有对方法做区分, 只能是命名上面区分一下了).
    下面看示例代码:
// randomStream.js
var stream = require('stream');
var util = require('util');
var chance = require('chance').Chance();

function RandomStream(options) {
    // option支持3个参数
    // encoding String 用于转换Buffer到String的编码类型(默认null)
    // objMode Boolean 用户指定是否是对象模式(默认false)
    // highWaterMark Number 最高水位(可读的最大数据量), 默认是16K
    stream.Readable.call(this, options);
}
util.inherits(RandomStream, stream.Readable);
RandomStream.prototype._read = function(size) {
    // 这是一个随机产生数据的流, 5%的概率输出null, 也就是流停止.
    var chunk = chance.string();
    console.log('Pushing chunk of size:' + chunk.length);
    this.push(chunk, 'utf8');
    if(chance.bool({likelihood: 5})) {
       this.push(null);
    }
}
module.exports = RandomStream;

好, 接下来是如何使用:

// generateRandom.js
var RandomStream = require('./randomStream');
var randomStream = new RandomStream();
randomStream.on('readable', function() {
    var chunk;
    while((chunk = randomStream.read()) !== null) {
        console.log("Chunk received: " + chunk.toString());
    }
});

输出流(stream.Writable)

先是怎么用:

// 写数据
writable.write(chunk, [encoding], [callback]);
// 结束流
writable.end([chunk], [encoding], [callback])

回压(back-pressure)

这里涉及一个概念, 回压(back-pressure), 意思就是当生产者速度大于消费者的时候, 输出流的水位会不断上升, 当到达设定的最高水位时候, 就会写入失败, 这时候也就是产生了back-pressure, 那如何处理呢, 此时输入流在水位降低到零点的时候会有一个drain事件发送, 只要监听这个事件, 在事件发生的时候就可以继续向流写入数据了.
直接看代码:

var chance = require('chance').Chance();
require('http').createServer(function (req, res) {
    res.writeHead(200, {'Content-Type': 'text/plain'});
    function generateMore() {             //[1]
        while(chance.bool({likelihood: 95})) {
            var shouldContinue = res.write(
                chance.string({length: (16 * 1024) – 1})
            );
            if(!shouldContinue) {             //[3]
                console.log('Backpressure');
                return res.once('drain', generateMore);
            }
        }
        res.end('\nThe end...\n', function() {
            console.log('All data was sent');
        });
    }
    generateMore();
}).listen(8080, function () {
  console.log('Listening');
});

同样, 实现一个输出流也很简单, 只要继承Writable类, 实现_write()接口即可.
示例代码:

// toFileStream.js
var stream = require('stream');
var fs = require('fs');
var util = require('util');
var path = require('path');
var mkdirp = require('mkdirp');

function ToFileStream() {
    // 这次我们用对象模式
    stream.Writable.call(this, {objectMode: true});
};
util.inherits(ToFileStream, stream.Writable);
ToFileStream.prototype._write = function(chunk, encoding, callback) {
    var self = this;
    mkdirp(path.dirname(chunk.path), function(err) {
        if(err) {
            return callback(err);
        }
        fs.writeFile(chunk.path, chunk.content, callback);
    });
}
module.exports = ToFileStream;

下面是调用的代码

var ToFileStream = require('./toFileStream');
var tfs = new ToFileStream();
tfs.write({path: "file1.txt", content: "Hello"});
tfs.write({path: "file2.txt", content: "Node.js"});
tfs.write({path: "file3.txt", content: "Streams"});
tfs.end(function() {
    console.log("All files created");
});

读写流(stream.Duplex)

就是把输入流和输出流的接口都实现了.
注意:
此时option参数是同时传给了内部的Readable和Writeable, 如果要使用不同的选项, 就要分开配置,
像这样:
this._writableState.objectMode
this._readableState.objectMode
同时, Duplex又多了一个选项allowHalfOpen, 这个选项的意思是, 当其中一个流关闭的时候, 另外一条流是否也同时关闭, 默认是true, 也就是不同时关闭.

转换流(stream.Transform)

对于读写流来说, 要实现的是 _read() 和 _write() 接口, 而转换流要实现的是 _transform() 和 _flush()接口.
区别是什么, 转换流一般在transform的过程中把读写都做了, 也就是在处理输入的时候, 直接就输出了. 最后在输入结束的时候_flush() 会被调用, 就可以把剩余的内部数据一并输出了.

示例代码:

// 这代码写的很漂亮, 解决的问题是在流中操作替换操作
// 其中替换的部分可以仔细看一下, stream和buffer一个很大的区别就是stream会被切割
// 导致要替换的数据也有可能被切割, 这个例子就提供了一种解决方法, 
// 这个在后续实践中肯定也会遇到的.
var stream = require('stream');
var util = require('util');
function ReplaceStream(searchString, replaceString) {
    stream.Transform.call(this, {decodeStrings: false});
    this.searchString = searchString;
    this.replaceString = replaceString;
    this.tailPiece = '';
}
util.inherits(ReplaceStream, stream.Transform);
ReplaceStream.prototype._transform = function(chunk, encoding, callback) {
    var pieces = (this.tailPiece + chunk).split(this.searchString);
    var lastPiece = pieces[pieces.length - 1];
    var tailPieceLen = this.searchString.length - 1;
    this.tailPiece = lastPiece.slice(-tailPieceLen);
    pieces[pieces.length - 1] = lastPiece.slice(0, -tailPieceLen);
    this.push(pieces.join(this.replaceString));       //[3]
    callback();
}
ReplaceStream.prototype._flush = function(callback) {
    this.push(this.tailPiece);
    callback();
}
module.exports = ReplaceStream;

几个流操作相关的有用包

程序员就是懒, 有这几个包就可以少写一些代码了.

上一篇下一篇

猜你喜欢

热点阅读