@IT·互联网程序员

【Node】让服务调用更简单优雅地高效并发

2017-05-14  本文已影响216人  蛋先生DX
高效并发

使用Node时,我们经常需要在Node端调用其它远程服务,比如SOA,比如MongoDB等,假设我们是这样封装远程服务调用的:

function getDataFromRemoteServer(param) {
  // 以下模拟远程服务执行
  return new Promise(resolve => {
    console.log('call getDataFromRemoteServer');
    setTimeout(() => {
      resolve(`Receive param: ${param}`);
    }, 1000);
  })
}

我们模拟下并发请求:

getDataFromRemoteServer('daniel').then(data => console.log(data));
getDataFromRemoteServer('daniel').then(data => console.log(data));
getDataFromRemoteServer('daniel').then(data => console.log(data));

结果打印:

call getDataFromRemoteServer
call getDataFromRemoteServer
call getDataFromRemoteServer
Receive param: daniel
Receive param: daniel
Receive param: daniel

3次调用返回相同的结果,所以有两次是浪费的(浪费网络资源,服务器的计算资源等),我们应该发挥节约美德,拒绝浪费。But How???

利用Javascript单线程特点,我们可以利用状态锁来限制后续方法的调用

我们引入状态锁来改造下原来的方法:

var status = 'ready';

function getDataFromRemoteServer(param) {
  if (status === 'ready') {
    status = 'pending'; 
      // 以下模拟远程服务执行
      return new Promise(resolve => {
        console.log('call getDataFromRemoteServer');
        setTimeout(() => {
          resolve(`Receive param: ${param}`);
        }, 1000);
      }).then(data => {
        status = 'ready';
        return data;
      })
  }
}

getDataFromRemoteServer('daniel').then(data => console.log(data));
getDataFromRemoteServer('daniel').then(data => console.log(data));
getDataFromRemoteServer('daniel').then(data => console.log(data));

打印信息(你能看出我悄悄隐藏了错误信息吗?)

call getDataFromRemoteServer
Receive param: daniel

现在只打印了一个调用信息,但结果也只有一个,其它2个调用呢?
不好意思,其它2个报错了,因为除了第一次调用是正常返回后,其它请求都没有返回,所以也就没有.then的方法
为了避免报错,每一次返回都需要返回promise,我们引入事件机制,对代码再修改一下:

var EventEmitter = require('events');
var proxy = new EventEmitter();

var status = 'ready';

function getDataFromRemoteServer(param) {
  if (status === 'ready') {
      status = 'pending'; 
      // 以下模拟远程服务执行
      return new Promise(resolve => {
        console.log('call getDataFromRemoteServer');
        setTimeout(() => {
          resolve(`Receive param: ${param}`);
        }, 1000);
      }).then(data => {
        proxy.emit('finish', data);
        status = 'ready';
        return data;
      })
  } else {
    return new Promise(resolve => {
        proxy.on('finish', data => {
            resolve(data);
        })
    });
  }
}

getDataFromRemoteServer('daniel').then(data => console.log(data));
getDataFromRemoteServer('daniel').then(data => console.log(data));
getDataFromRemoteServer('daniel').then(data => console.log(data));

打印如下:

call getDataFromRemoteServer
Receive param: daniel
Receive param: daniel
Receive param: daniel

Yeah!结果显示只调用了一次远程服务,而3次调用都正常返回了正确的结果。

别太得意,有潜在的问题,你看得出吗?

我们试下这样调用:

getDataFromRemoteServer('daniel').then(data => console.log(data));
getDataFromRemoteServer('daniel').then(data => console.log(data));
getDataFromRemoteServer('daniel').then(data => console.log(data));
// 在三次并发并且结果返回后再调用一次
setTimeout(() => {
    getDataFromRemoteServer('daniel').then(data => console.log(data));
}, 2000)

打印如下:

call getDataFromRemoteServer
==> on finish event
==> on finish event
Receive param: daniel
Receive param: daniel
Receive param: daniel
call getDataFromRemoteServer
==> on finish event
==> on finish event
Receive param: daniel

是不是又看出浪费问题了。因为我们前3次并发,有2次监听了事件,但执行完后并没有移除事件,所以有两次并不需要的事件监听被触发,随着并发请求越来越多,监听队列也越积越多,这是浪费,也是隐患

所以我们是不是每次都要手动移除事件监听就可以?
当然,可以,不过我们可以用更优雅更简单的方式,就是用once
我们把上面的on换成once,再见证一下奇迹吧

==> on finish event
==> on finish event
Receive param: daniel
Receive param: daniel
Receive param: daniel
call getDataFromRemoteServer
Receive param: daniel

好了,开香槟庆祝一下吧。

喂喂,还没完呢,难道每一个远程方法的调用,我们都要这样麻烦地处理吗?而且statusproxy在这里好像是"全局"的吧,其它方法昨办?难道一个方法用一个名称:"iStatus", "jStatus", .....

我们是聪明的程序猿,怎么可以干这种stupid的事呢。
前人种树,后人乘凉。我们现在就来“种树”,至于后人乘不乘凉,反正我们是要乘凉的
所以接下来,我们来把代码封装一下,让每一个远程调用都可以简单优雅地高效并发

const EventEmitter = require('events');
const crypto = require('crypto');


// 已封装 并发
function concurrent(promiseFn) {
  return function (...params) {
    var key = crypto.createHash('md5').update(JSON.stringify(params)).digest("hex"); // 用参数作为键, 因为每个参数的调用结果返回都不一样, 所以应当不同的请求
    var keyObj = promiseFn[key] = promiseFn[key] || {status: 'ready', proxy: new EventEmitter()};

    // 加状态锁和once事件机制, 避免并发访问重复调用
    if (keyObj.status === 'ready') {
      keyObj.status = 'pending';
      promiseFn.apply(null, params).then(result => {
        keyObj.proxy.emit('finish', result);
        delete keyObj.status;
      }).catch(err => {
        keyObj.proxy.emit('error', err);
      })
    }

    return new Promise((resolve, reject) => {
      keyObj.proxy.once('finish', result => {
        delete keyObj.proxy;
        delete promiseFn[key];
        resolve(result);
      });
      keyObj.proxy.once('error', err => {
        delete keyObj.proxy;
        delete promiseFn[key];
        reject(err);
      });
    });
  }
}

使用起来灰常方便,让我们还原远程服务最初的样子,演示如下:

function getDataFromRemoteServer(param) {
  // 以下模拟远程服务执行
  return new Promise(resolve => {
    console.log('call getDataFromRemoteServer');
    setTimeout(() => {
      resolve(`Receive param: ${param}`);
    }, 1000);
  })
}

var $getDataFromRemoteServer = concurrent(getDataFromRemoteServer);
$getDataFromRemoteServer('daniel').then(data => console.log(data));
$getDataFromRemoteServer('daniel').then(data => console.log(data));
$getDataFromRemoteServer('daniel').then(data => console.log(data));
// 在三次并发并且结果返回后再调用一次
setTimeout(() => {
    $getDataFromRemoteServer('daniel').then(data => console.log(data));
}, 1500)

至此该文告一段落了。
这里声明下应用场景:如果并发访问远程服务,相同的参数一定返回相同的结果,那么就可以放心使用(如通过provinceId去取所有的cities)

--EOF--

上一篇下一篇

猜你喜欢

热点阅读