EventStream

2020-04-06  本文已影响0人  胖子罗

背景:使用EventStream读取大文件,获取文件总行数

//使用event stream
const fs = require('fs')
const path = require('path')
const es = require('event-stream')
let total = 0
fs.createReadStream(path.join(__dirname, './test.json'))
.pipe(es.split()) //defaults to lines.
.pipe(
  es.map(function (line, cb) {
    //do something with the line
   console.log(line)
   total++
  })
  )
  .pipe(
    es.wait(function(){//相当于监听前一个 stream 的 end 事件
      console.log('total',total)
    })
  )

问题:es.wait并没有监听到end事件,始终无法输出total。

尝试排查

思路一:是否和es.wait函数有关?

//
// wait. callback when 'end' is emitted, with all chunks appended as string.
//

es.wait = function (callback) {
  var arr = []
  return es.through(function (data) { arr.push(data) },
    function () {
      var body = Buffer.isBuffer(arr[0]) ? Buffer.concat(arr)
        : arr.join('')
      this.emit('data', body)
      this.emit('end')
      if(callback) callback(null, body)
    })
}

es.wait这个函数返回的是through对象(也是一个stream),传了2个function参数进去,第二个function实际会回调触发 this.emit('end'),所以继续排查这个function是否被调用

function through (write, end, opts) {
  write = write || function (data) { this.queue(data) }
  end = end || function () { this.queue(null) }

  var ended = false, destroyed = false, buffer = [], _ended = false
  var stream = new Stream()
  stream.readable = stream.writable = true
  stream.paused = false

//  stream.autoPause   = !(opts && opts.autoPause   === false)
  stream.autoDestroy = !(opts && opts.autoDestroy === false)

  stream.write = function (data) {
    write.call(this, data)
    return !stream.paused
  }

through构造函数的上面这个end参数就是之前es.wait的第二个参数,看看它什么时候触发调用,找到相关代码:

stream.on('end', function () {
    //end()
    stream.readable = false
    if(!stream.writable && stream.autoDestroy)
      process.nextTick(function () {
        stream.destroy()
      })
  })

  function _end () {
    stream.writable = false
    end.call(stream)//只有这个地方涉及end的触发
    if(!stream.readable && stream.autoDestroy)
      stream.destroy()
  }

  stream.end = function (data) {
    if(ended) return
    ended = true
    if(arguments.length) stream.write(data)
    _end() // will emit or queue
    return stream
  }

在_end函数断点debug发现执行end.call(stream)并没有跳回es.wait的第二个回调函数,显然这个地方出现异常,联想我们这次用的pipe和stream,那可能是上层的stream调用异常了。

思路二:是否和上一个pipe的 es.map有关?
es.map对应的实际是mapStream类,找到这个流中和end相关代码:

function end (data) {
    //if end was called with args, write it, 
    ended = true //write will emit 'end' if ended is true
    stream.writable = false
    if(data !== undefined) {
      return queueData(data, inputs)
    } else if (inputs == outputs) { //wait for processing //满足这个条件才可以
      stream.readable = false, stream.emit('end'), stream.destroy() 
    }
  }

  stream.end = function (data) {
    if(ended) return
    end(data)
  }

只有满足inputs === ouputs才可以触发end事件,仔细跟踪代码,outputs在queueData函数被递增修改,queueData -> next -> stream.write,跟踪到:

// Wrap the mapper function by calling its callback with the order number of
  // the item in the stream.
  function wrappedMapper (input, number, callback) {
    return mapper.call(null, input, function(err, data){
      callback(err, data, number)
    })
  }

  stream.write = function (data) {
    if(ended) throw new Error('map stream is not writable')
    inNext = false
    inputs ++

    try {
      //catch sync errors and handle them like async errors
      var written = wrappedMapper(data, inputs, next)//next在这里被调用
      paused = (written === false)
      return !paused
    } catch (err) {
      //if the callback has been called syncronously, and the error
      //has occured in an listener, throw it again.
      if(inNext)
        throw err
      next(err)
      return !paused
    }
  }

仔细看wrappedMapper这个函数,next实际就是它的callback参数,这个callback的调用依赖于mapper.call的第三个参数,mapper又是什么呢?

//map stream 导出代码
module.exports = function (mapper, opts) {

  var stream = new Stream()
    , inputs = 0
    , outputs = 0
    , ended = false
    , paused = false
    , destroyed = false
    , lastWritten = 0
    , inNext = false

  opts = opts || {};
  var errorEventName = opts.failures ? 'failure' : 'error';
//使用map stream的代码
  es.map(function (line, cb) {
    //do something with the line
   console.log(line)
   total++
  })

显然mapper就是这个函数:

function (line, cb) {
    //do something with the line
   console.log(line)
   total++
  })

mapper的第三个参数就是cb了,cb没有调用,那当然导致后面一连串函数没有调用,最终影响end事件的触发,那我们加上cb调用看看:

//使用event stream
const fs = require('fs')
const path = require('path')
const es = require('event-stream')
let total = 0
fs.createReadStream(path.join(__dirname, './test.json'))
.pipe(es.split()) //defaults to lines.
.pipe(
  es.map(function (line, cb) {
    //do something with the line
    //console.log(line)
   total++
   cb(null,line)//这个回调很重要
  })

  )
  .pipe(
    es.wait(function(){//相当于监听前一个 stream 的 end 事件
      console.log('total',total)
    })
  )

结果能输出total了:


image.png

总结

使用stream和pipe搭配使用的时候,需要注意前后stream的关联影响。

上一篇 下一篇

猜你喜欢

热点阅读