Chapter 9 Advanced Asynchronous

2018-01-05  本文已影响0人  宫若石

现在我们了解到的设计模式能够应对很多情况,不过针对一些特定的问题,我们还有一些方案可以使⽤。就像做菜的菜谱⼀样,它提供了一个⼤概的步骤,我们还可以⾃己创造一些,在这个章节,我们主要提供以下情形的解决方案:

处理需要异步初始化模块的方案

我们在第⼆章提到过,require()和module.exports是同步工作的,这是同步API存在于核⼼模块和很多npm的库中的⼀个主要原因。这样的方案提供了了更方便的转换机制,这也是最初使用同步而不是异步方案开初始化模块的原型。
不幸的是,有时候同步API并不不好用,特别是在需要连接⽹网络进⾏初始化的时候,需要花费精力去配置参数、使⽤握⼿进⾏连接等等。这种场景存在于许多数据库驱动和类似消息队列这样的中间件里。

标准解决方案

举例,我们需要通过请求,连接⼀个叫db的远程数据库,我们通常有两种选择:

const db = require('aDb');  //  The async module
module.exports = function findAll(tyoe, callback) {
  if (db.connected) {
    runFild();
  } else {
    db.once('connected', runFild);
  }
  function runFild() {
    db.findAll(type, callback);
  }
};
//  in module app.js
const db = require('aDb');
const findAllFactory = require('./findAll');
db.on('connected', function(){
  const findAll = findAllFactory(db);
});
//  in module findAll.js
module.exports = db => {
  return function findAll(type, callback) {
    db.findAll(type, callback);
  }
};

显然第一种⽅方案就很不受欢迎,因为包含很多引⽤模板(boilerplate)
第⼆种⽅案有时候也不太好,在大型项目中使用依赖注入会很复杂,特别是需要手动去初始化一些模块的时候。不过使⽤用一些⽀持异步初始化的依赖注⼊容器会有些帮助。
下⾯的第三种⽅案,能够轻易的帮助我们从初始化依赖里分离出需要的模块

预初始化队列方案

方案的主要思想:把含未完成的初始化的模块操作保存下来,一旦初始化步骤完成,就执行行这些操作。下⾯是⼀个例⼦:

const asyncModule = module.exports;

asyncModule.initialized = false;
asyncModule.initialize = callback => {  //  一个延迟10s的初始化操作
  setTimeout(() => {
    asyncModule.initialized = true;
    callback();
  }, 10000);
};

asyncModule.tellMeSomething = callback => {
  process.nextTick(() => {
    if(!asyncModule.initialized) {
      return callback(
        new Error('I don\'t have anything to say right now') //如果还没初始化
      );
    }
    callback(null, 'Current time is: ' + new Date());
  });
};
const asyncModule = require('./asyncModule');

module.exports.say = (req, res) => {
  asyncModule.tellMeSomething((err, something) => {
    if(err) {
      res.writeHead(500);
      return res.end('Error:' + err.message);
    }
    res.writeHead(200);
    res.end('I say: ' + something);
  });
};

这个HTTP处理模块(handler)主要是触发刚刚的tellMeSomething()方法并将结果写入HTTP响应头。可以看到这⾥,我们require()这个异步的asyncModule时并没有检测它是不是已经初始化了,这就可能导致问题。
让我们⽤基本的http核⼼模块,创建⼀个HTTP server

const http = require('http');
const routes = require('./routes');
const asyncModule = require('./asyncModule.js');

asyncModule.initialize(() => {
  console.log('Async module initialized');
});

http.createServer((req, res) => {
  if (req.method === 'GET' && req.url === '/say') {
    return routes.say(req, res);
  }
  res.writeHead(404);
  res.end('Not found');
}).listen(8000, () => console.log('Started'));

这个app.js是程序的入⼝,它触发了AsyncModule的初始化、利用routes.say()创建了一个
HTTPServer。运⾏一下,正如想象,如果我们在运行server时,快一点打开http://localhost:8000/say (也就是调⽤了say()的时候,AsyncModule还没有异步初始化完成),我们会在浏览器上看到:

image.png
本例的显示取决于异步初始化的细节,如果慢⼀点等初始化结束,我们就能看到正确的结果了。我们应该尽量避免这种问题,因为这可能导致丢失信息、程序崩溃等更严重的问题。虽然初始化一般很快,我们⼀般不会去注意失败的请求,但是对于加载频繁和自动设置(autoscale)的云服务情况来说,这可能导致阻塞等更多问题。
⽤用预加载队列列去包装模块

我们⽤队列去存放未初始化的操作,这有点像状态模式(State Pattern)。存在两种状态:⼀一种是把未初始化的操作排队,另一种是当初始化完成时,再把最后的方法委托给原模块(也就是AsyncModule)

const asyncModule = require('./asyncModule');

//The wrapper 给 activeState 设置分发操作
const asyncModuleWrapper = module.exports;
asyncModuleWrapper.initialized = false;
asyncModuleWrapper.initialize = function() {
  activeState.initialize.apply(activeState, arguments);
};

asyncModuleWrapper.tellMeSomething = function() {
  activeState.tellMeSomething.apply(activeState, arguments);
};

//Module没有初始化成功的时候的state
let pending = [];
let notInitializedState = {
  initialize: function(callback) {
    asyncModule.initialize(function() {\
      asyncModuleWrapper.initalized = true;
      activeState = initializedState; //  [1]更更新activeState的变量量

      pending.forEach(function(req) { //  [2]运⾏行行存储队列列的操作
        asyncModule[req.method].apply(null, req.args);
      });
      pending = [];

      callback(); //  [3]触发原来的回调
    });
  },

  tellMeSomething: function(callback) {
    return pending.push({
      method: 'tellMeSomething',
      args: arguments
    });
  }
};

//  The state to use when the module is initialized
let initializedState = asyncModule;

//  Set the initial state to the notInitializedState
let activeState = notInitializedState; // 设置更新activeState

当开始初始化的时候,我们提供了一个回调代理,它让我们的warpper知道什么时候已经初始化好了,应该进⾏接下来的操作了。
可⻅代码,如果没有初始化完成,tellMeSomething会放在pending队列里等完成初始化再调⽤。
而且,通过这个模式我们可以清晰的知道AsyncModule这个模块有没有被初始化完成,因为完成之后会切换状态为initializedState(倒数第二⾏代码)
运行一下:

image.png
通过队列我们可以把为可能含初始化模块的操作给挂起,这是⼀个更稳健的行为。
核⼼在于:如果要初始化异步模块,我们让需要的操作排队等待直到初始化完成。
在自然情况下
刚刚这种模式在很多数据库驱动和ORM库中经常使用,最著名的就是MongoDB的ORM库
Moogoose了,这样就不需要等待数据库的连接情况,因为这都⾃动被放入队列里,并且在初始化建立完成后执⾏,这种方案增强了了API的可用性。

处理批量异步缓存的方案

在高负载的应用中,缓存在web里里起到很重要的作用。无论是静态的文件资源还是数据库的查询结果。在这个章节我们了解高吞吐量下的异步操作缓存。

暂时没有缓存和批量量处理理的Server

这是一个管理理销售情况的情景,JSON数据格式:transactionId{ amount, item}

const level = require('level');
const sublevel = require('level-sublevel');

const db = sublevel(level('example-db', {valueEncoding: 'json'}));
const salesDb = db.sublevel('sales');

module.exports = function totalSales(item, callback) {
    console.log('totalSales() invoked');
    let sum = 0;
    salesDb.createValueStream() // [1] 从数据库流式输⼊入数据
    .on('data', data => {
      if(!item || data.item === item) { // [2] 如果同样商品,求和
        sum += data.amount;
      }
    })
    .on('end', () => {
      callback(null, sum); // [3]返回刚刚的求和结果
    });
};

这种方案显然表现不大好,我们没有用index去区分transaction,求和计算也没有⽤更⽅便的map或者reduce。接下来暴露接⼝给HTTP server:

const http = require('http');
const url = require('url');
const totalSales = require('./totalSales');
//const totalSales = require('./totalSalesBatch');
//const totalSales = require('./totalSalesCache');

http.createServer((req, res) => {
  const query = url.parse(req.url, true).query;
  totalSales(query.item, (err, sum) => {
    res.writeHead(200);
    res.end(`Total sales for item ${query.item} is ${sum}`);
  });
}).listen(8000, () => console.log('Started'));

接下来访问http://localhost:8000/?item=book,顺⼿造一点假数据更方便查看

批量处理异步请求

最基本的方案是使用相同的API处理⼀系列的调用,通过回调的方式:


image.png

图示两个客户端(可能是两个不同的对象或者两个不同的web请求)触发了相同输入的的异步操作,在这第一个图中,它们单独发起⾃己的异步操作执⾏完毕后通过回调返回。第⼆张图针对这种相同输⼊的异步操作进⾏了批量处理,当操作完成时,通知两个客户端,这种方式优化了应⽤的加载,避免了了使⽤复杂的缓存机制,也避免了这种机制带来的内存管理和失效问题。


image.png

在这个web server使用批量处理请求

主要采⽤队列的方案,把回调加⼊队列,等异步操作完成之后⼀块儿触发

const totalSales = require('./totalSales');
const queues = {};

module.exports = function totalSalesBatch(item, callback) {
  if(queues[item]) { // [1] 队列列存在,请求正在执⾏行行,回调加⼊入队列列
    console.log('Batching operation');
    return queues[item].push(callback);
  }
  queues[item] = [callback]; // [2]我们需建⽴立新请求因为队列列有内容了了
  totalSales(item, (err, res) => {
    const queue = queues[item]; // [3]获得操作结果后⼀一个个触发回调
    queues[item] = null;
    queue.forEach(cb => cb(err, res));
  });
};
缓存异步请求

刚刚的批量处理方案在着快速API和少量批处理请求的情况下会有点问题。在这种情况下,采⽤缓存模式是比较好的方案。
缓存模式的核心思想是:请求完成后我们就把结果保存在缓存里(可以是一个变量、⼀个数据库入口等),下一次就不请求了,直接从缓存里取出
缓存模式很常见,但真正厉害的是把缓存模式和批处理模式结合起来使用:


image.png

由图可知,没有缓存时,和批处理模式是差不多的,不过当请求完成后,结果会被放入缓存里,方便下次直接从缓存里提取。

在这个web server使用缓存请求

const totalSales = require('./totalSales');
const queues = {};
const cache = {};

module.exports = function totalSalesBatch(item, callback) {
    const cached = cache[item];
    if (cached) { //如果缓存,⽤用回调返回,注意⽤用的nextTick哦
      console.log('Cache hit');
      return process.nextTick(callback.bind(null, null, cached));
    }
    if (queues[item]) {
      console.log('Batching operation');
      return queues[item].push(callback);
    }
    queues[item] = [callback];
    totalSales(item, (err, res) => {
    if (!err) {
      cache[item] = res;
        setTimeout(() => {
        delete cache[item];
      }, 30 * 1000); //30 seconds expiry 设置了了超时时间
    }
    const queue = queues[item];
    queues[item] = null;
    queue.forEach(cb => cb(err, res));
  });
};

缓存函数调⽤结果是实现记忆的⼀种方式,在npm中你可以找到很多方便用于异步存储记忆的⼯具,比如memoizee

缓存机制的具体实现

在实际应用中,我们需要更高级的存储和释放机制,有以下的原因:

使用Promise进行批处理和缓存

在这种情况下,使用Promise有以下几种优势:

// [1]引⼊入promisification模块,使返回Promise⽽而不不是回调
const pify = require('pify');
const totalSales = pify(require('./totalSales'));
const cache = {};

module.exports = function totalSalesPromises(item) {
  if (cache[item]) { // [2]检测缓存情况
    return cache[item];
  }
  cache[item] = totalSales(item) // [3]新建Promise
    .then(res => {
      // [4]resolve这个Promise之后,设计缓存清理理
      setTimeout(() => {delete cache[item]}, 30 * 1000);
      //30 seconds expiry
      return res;
    })
    .catch(err => { // [5]Promise被reject,删除缓存抛出错误
      delete cache[item];
      throw err;
    });
    return cache[item]; // [6]返回缓存结果
};

可见使用Promise之后代码变得优雅简洁,并且同时使用了批处理和缓存

处理计算密集型(CPU-bound)任务的方案

在第⼆章我们知道,通过触发异步操作我们让堆栈回到事件循环,从而可以自由处理其它的请求。
但是如果运⾏一个同步请求时间很⻓(比如计算密集型任务),它可能久久不返回给事件循环,因为它在重度利用CPU而不是在频繁使⽤I/O操作

解决子集求和问题(subset sum problem)

子集求和问题,即求一个集合的非空子集满足和为0
最简单的解决方案是把⼦集进行排列组合,这有2^n的复杂度,这就很计算复杂了

const EventEmitter = require('events').EventEmitter;
//继承EventEmitter,每次实现匹配就就触发⼀一下事件
class SubsetSum extends EventEmitter {
  constructor(sum, set) {
    super();
    this.sum = sum;
    this.set = set;
    this.totalSubsets = 0;
  }
  //实现可能的组合,注意它是同步的
  _combine(set, subset) {
    for(let i = 0; i < set.length; i++) {
      let newSubset = subset.concat(set[i]);
      this._combine(set.slice(i + 1), newSubset);
      this._processSubset(newSubset);
    }
  }
  //⼀一旦实现组合之后,确定是否匹配,匹配就emit⼀一个match事件
  _processSubset(subset) {
    console.log('Subset', ++this.totalSubsets, subset);
    const res = subset.reduce((prev, item) => (prev + item), 0);
    if(res == this.sum) {
      this.emit('match', subset);
    }
  }

  start() {
    this._combine(this.set, []); //触发同步的排列列组合
    this.emit('end'); //所有排列列组合结果计算之后触发end
  }
}
module.exports = SubsetSum;
const http = require('http');
const SubsetSum = require('./subsetSum');
//const SubsetSum = require('./subsetSumDefer');
//const SubsetSum = require('./subsetSumFork');

http.createServer((req, res) => {
  const url = require('url').parse(req.url, true);
  if(url.pathname === '/subsetSum') {
    const data = JSON.parse(url.query.data);
    res.writeHead(200);
    const subsetSum = new SubsetSum(url.query.sum, data);
    subsetSum.on('match', match => {
      res.write('Match: ' + JSON.stringify(match) + '\n');
    });
    subsetSum.on('end', () => res.end());
    subsetSum.start();
  } else {
    res.writeHead(200);
    res.end('I\m alive!\n');
  }
}).listen(8000, () => console.log('Started'));

通过事件机制,我们知道,当算法执行完毕时,也就是要等⼀会儿,会⾃自动返回结果:


image.png

可以看到,在算法返回结果时请求⼀直被挂起,也就是暂时都不会显示I'm alive,Node.js的单线程就这样被一个⻓的同步计算操作阻塞。

使⽤用setImmediate实现交错

通常来说,计算密集型(CPU-bound)的算法都是由一系列步骤组成,可能是递归调⽤、循环或者是和其他的组合。所以⼀个简单的解决⽅案就是让每一个(或者一定数量)步骤执行完成之后把执行权交给事件循环。
核心思想是:通过setImmediate实现异步任务和密集计算的交错执⾏

采⽤交错执⾏的⼦集求和问题

class SubsetSumDefer extends EventEmitter {
  constructor(sum, set) {
    super();
    this.sum = sum;
    this.set = set;
    this.totalSubsets = 0;
  }
  //实现交错执行的核心函数
  _combineInterleaved(set, subset) {
    this.runningCombine++; //需要新的参数来计数
    setImmediate(() => {
      this._combine(set, subset);
      if(--this.runningCombine === 0) {
        this.emit('end');
      }
    });
  }

  _combine(set, subset) {
    for(let i = 0; i < set.length; i++) {
      let newSubset = subset.concat(set[i]);
      this._combineInterleaved(set.slice(i + 1), newSubset); //替换为step
      this._processSubset(newSubset);
    }
  }

  _processSubset(subset) {
    console.log('Subset', ++this.totalSubsets, subset);
    const res = subset.reduce((prev, item) => prev + item, 0);
    if(res == this.sum) {
      this.emit('match', subset);
    }
  }

  start() {
    this.runningCombine = 0; //计数参数确认计算完毕
    this._combineInterleaved(this.set, []);
  }
}

交错执行模式的思考

尽管刚刚的模式解决了阻塞的问题,但是它并不不是一个很好的模式,因为延迟同步操作会带来事件开销,这可能导致⼤大的影响。特别是当我们想尽快响应用户时,不不希望等待太久。我们可以设置step的数量,但这并不能从根本上解决问题。
不过也不能因为它有时间开销就完全放弃这种模式,实际上,如果同步操作顺利,偶尔执行行异步的操作,这种利用setImmediate的模式还是可以被认可的
注意process.nextTick() 不适用与这种交错的模式,正如第一章讲到,它在I/O之前执⾏操作,频繁的调用可能导致I/O饥饿。

使⽤多进程

另⼀个防止事件循环阻塞的方案是使用⼦进程,从而不把昂贵的计算密集型任务运行在主进程上,这样做有以下好处:

把子集求和任务交付其他进程处理

核⼼思想是创建⼀个子进程去处理同步任务,让时间循环自由处理⽹络请求,下面提出可行的⽅案:

  1. 创建processPool.js 模块来建立进程池。因为开新进程要花时间,所以就让他们在进程池⾥里运行来等待请求,这样可以节约时间和CPU。另外限制一下进程的数量,可以防止Dos攻击
const fork = require('child_process').fork; //创建⼦子进程
class ProcessPool {
  constructor(file, poolMax) {
    this.file = file;
    this.poolMax = poolMax;
    this.pool = []; //pool是⼀系列将被使⽤用的运⾏的进程
    this.active = []; //active包含现在正在使用的进程
    this.waiting = []; //因为缺少进程资源 等待的回调队列
  }

  acquire(callback) {
  let worker;
  if(this.pool.length > 0) { // [1] 可⽤用的进程就把它变为active
    worker = this.pool.pop();
    this.active.push(worker);
    return process.nextTick(callback.bind(null, null, worker));
  }
  if(this.active.length >= this.poolMax) {
    // [2]缺少进程资源 等待
    return this.waiting.push(callback);
  }
  worker = fork(this.file); // [3]如果进程数还没到最⼤大就再开⼀一个进程
    this.active.push(worker);
    process.nextTick(callback.bind(null, null, worker));
  }
  
  release(worker) {
    if(this.waiting.length > 0) { // [1]如果请求队列列⾥里里有,则赋给worker
      const waitingCallback = this.waiting.shift();
      waitingCallback(null, worker);
    }
    this.active = this.active.filter(w => worker !== w);
    // [2]完成worker之后,放回池中
    this.pool.push(worker);
  }
}

module.exports = ProcessPool;

进程⼀直在运行不停止,想要减少常驻内存的使用、增强代码的健壮性,可以:

  1. 接下来的subsetSumFork.js把⼦集求和的任务分发到子进程。它的作用是和⼦进程交流然后传输结果
const EventEmitter = require('events').EventEmitter;
const ProcessPool = require('./processPool');
//初始化进程池,并且设置最⼤可用的进程数为2
const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2);

class SubsetSumFork extends EventEmitter {
  constructor(sum, set) { //接收sum和set的EventEmitter
    super();
    this.sum = sum;
    this.set = set;
  }

  start() { //触发算法运⾏行行
    workers.acquire((err, worker) => { // [1]从进程池获取⼀一个⼦子进程
    worker.send({sum: this.sum, set: this.set}); //进程通信的通道

    const onMessage = msg => {
      // [3]监听任务完成 删掉onmessage然后放回进程池
      if (msg.event === 'end') {
        worker.removeListener('message', onMessage);
        workers.release(worker);
      }
      this.emit(msg.event, msg.data); // [4]⽆无缝传递信息
    };
      worker.on('message', onMessage); // [2]监听信息
    });
  }
}

module.exports = SubsetSumFork;

send()⽅法在子进程也是可用的,这也被cluster模块用来实现多线程分发HTTP server

  1. 最后我们的subsetSumWorker.js 需要⼀一个worker(子进程)来执⾏子集求和的算法,并且把它的算法结果传给⽗进程。
const SubsetSum = require('./subsetSum');

process.on('message', msg => {
  // [1]从⽗进程监测信息,⼀旦有信息我们就新建实例,然后说明匹配
  const subsetSum = new SubsetSum(msg.sum, msg.set);

  subsetSum.on('match', data => {
    // [2]⽤一个对象封装匹配结果传给父进程
    process.send({event: 'match', data: data});
  });
  
  subsetSum.on('end', data => {
    process.send({event: 'end', data: data});
  });
  
  subsetSum.start();
});

可以看到,我们重⽤了原来的subsetSum(同步版本),但是这次由于我们单独开了进程,所以不⽤担心事件循环被阻塞。
综上,可以看到,应用程序的一部分可以交付外部进程去处理的。
不过当子进程不是Node.js程序时,进程间通信的通道可能就不可⽤了。我们可以⾃己通过流式输入输出协议设计接口,可以参考child_process的实现

多进程模式的思考

我们可以并发的开两个⾃己求和任务,如果开3个会挂起⼀个直到其中一个任务完成,这是因为我们之前设置了进程的最大数。
可⻅多进程模式比交错执⾏模式更高效有力,不过,因为单一设备对资源有硬性限制,它并不不⽅便扩展。所以我们可以通过多个设备去实现分发加载任务。这也是我们接下来会提到的分布式架构模式。

上一篇下一篇

猜你喜欢

热点阅读