node 使用 async/await 手写错误重试与并发数量控
很多人都曾经听说过,async / await 是 node 异步的终极解决方案,这句话确实没错,但是仅仅掌握最基本的 async 语法并不能让我们应付所有的异步需求,这次我们就来聊一下异步编程中两个非常常见的功能:错误重试 与 并发数量控制。
本文包含 async / await、Promise、迭代器、Set、闭包等知识点,绝对够你一次看个爽。
错误重试
在 node 开发中,很多异步请求都是有可能报错的,而错误重试则是解决这个问题最简单的方法,下面直接上代码:
/**
* 错误重试包装器
*
* @param {async function} asyncFunc 要包装的异步函数
* @param {number} defaultRetryTime 默认的重试次数
* @param {number} retryInterval 重试间隔时常
* @returns 会自动进行错误重试的异步函数
*/
const retryWarpper = function (asyncFunc, defaultRetryTime = 3, retryInterval = 1000) {
// 内部重试计数器
let retryTime = defaultRetryTime
const retryCallback = async function(...args) {
try {
return await asyncFunc(...args)
}
catch (e) {
if (retryTime <= 0) throw e
console.log(`${args} 查询失败,将在 ${retryInterval} 毫秒后重试,剩余重试次数 ${retryTime}`)
retryTime -= 1
await new Promise(reslove => setTimeout(reslove, retryInterval))
return await retryCallback(...args)
}
}
return retryCallback
}
通过这个 warpper 函数就可以使任意异步函数包装上错误重试功能,大概二十行,思路也非常简单,这个包装器里通过返回一个和原异步函数入参返回值保持一致的函数来闭包保存剩余重试次数 retryTime
,在执行异步方法时通过 try / catch 来检查有没有报错,一旦报错则减少剩余重试次数并尾递归调用自身来达到重试的目的。
这里需要注意的一点是,哪怕内部的异步调用报错并重试了,对于外部调用者来说,这个异步 promsie 的状态依旧是 pending,直到内部调用成功 resolve 或者达到重试上限后变为 rejected。之所以会这样,是因为外部调用者看到的是由 async 函数包装过的 promise(即 retryCallback
的返回值),而不是实际异步方法(asyncFunc
参数)返回的 promise。
下面上代码测试一下:
/** mock 接口,100% 返回调用失败 */
const fetchData = function () {
return new Promise((reslove, reject) => setTimeout(reject, 1000))
}
// 包装上错误重试
const fetchDataWithRetry = retryWarpper(fetchData)
const run = async function () {
const resp = await fetchDataWithRetry()
console.log('获取完成!', resp)
}
run()
可以看到,由于我们的 mock 函数一直报错并失败,所以将会一直重试,直到重试达到上限后报错退出。
然后再来测试下是否能正确返回:
/** mock 接口,10% 几率调用成功 */
const fetchData = function () {
return new Promise((reslove, reject) => {
setTimeout(() => Math.random() < 0.1 ? reslove(true) : reject(false), 1000)
})
}
// 包装上错误重试,注意这里指定了无限次重试
const fetchDataWithRetry = retryWarpper(fetchData, Infinity)
const run = async function () {
const resp = await fetchDataWithRetry()
console.log('获取完成!', resp)
}
run()
在下图里可以看到,在指定了无限次重试后,我们的接口会一直重试直到返回成功(当然不太推荐在实际业务里这么搞):
并发数量控制
虽然我们可以通过 Promise.all 来实现同时运行多个异步任务,但是有个问题,一旦 new 出来一个 promise 之后,这个任务就已经开始了。如果我们想从一个网站爬取一千张图片该怎么办呢,同时并发一千个 promise?这么做于人于己都不太好,这就引出了我们接下来要了解的并发数量控制,通过控制同时执行的异步任务数量,从而在减轻服务器压力的情况下最大程度的利用 node 的异步能力:
下面上代码:
/**
* 控制并发数量
*
* @param {any[]} collection 待执行的任务数组
* @param {number} limit 最大并发数量
* @param {async function} asyncCallback 要执行的异步回调
*/
const concurrent = async function (collection, limit, asyncCallback) {
const taskIterator = collection.entries();
const pool = new Set();
do {
const { done, value: [index, task] = [] } = taskIterator.next();
if (done) {
await Promise.allSettled(pool);
break;
};
const promise = asyncCallback(task).finally(() => pool.delete(promise))
// 同时并发的数量已经到了上限,等待其中的任务完成
if (pool.add(promise).size >= limit) {
await Promise.race(pool);
}
} while (true)
}
这个函数的入参可能有些难以理解,但是很简单,你可以将其想象成 lodash 的 map 函数,第一个参数 collection
是要迭代的数组,第二个参数就是我们要控制的最大并发数量,第三个参数就是每次迭代所要执行的回调,和 map 的区别就在于这个回调是异步的。
这么实现的原因就是刚才我们提到的 promise 一旦实例化之后任务就开始执行了,所以我们不能传入一个 promise 数组然后对其进行控制,这样是没有意义的。因此,我们可以通过迭代器的形式,由函数内部控制该何时开始新的异步调用。
接下来我们来看一下内部实现,主要有以下几个要点:
-
由 do / while 实现整体流程,由于 await 可以“暂停”同步代码的执行,所以我们可以通过在 do / while 内部 await 的方法暂停循环,并且使用 await Promise.race 的形式等待线程池中是否有任务完成,一旦有任务完成,我们就可以进行下一个 while 循环取出新的任务执行并加入线程池。
-
使用迭代器来弹出下一个任务:由于每个 while 循环都会执行一个新任务,所以我们要记录当前已经执行到了哪一个任务,最简单的方式就是新建一个 index 变量并保存当前执行到的任务索引,取出后再给这个变量 += 1。但是我们也可以通过
array.entries
方法生成该数组的迭代器,这样只需要执行 next 方法就可以取出下一个任务,并且还可以通过返回的done
属性来识别是否已经取完了所有的任务。 -
使用 Set 来作为线程池:一开始新建的 Set 对象
pool
可以说是整个函数的核心,可以看到 while 循环的中间部分,我们执行异步回调来获取该任务的 promise,并在其结束时将自身从池子中移除,随后我们将这个 promise 加入到池子中,并根据池子的当前存放数量来判断是否超过了设定的并发上限。这里需要注意的是,我们直接给 Promise.race 传入了这个 Set,这其实是没有问题的,我们可以在 MDN - Promise.race 中看到他的入参是一个可迭代对象而不仅仅是个数组。而 Set - JavaScript | MDN (mozilla.org) 告诉我们其实现了
@@iterator
方法,所以 Set 实例其实就是一个可迭代对象,可以直接传递给Promise.race
。 -
在
done
时使用Promise.allSettled
进行特判:这里主要有两个疑问,为什么不继续使用Promise.race
了?因为 race 是传入的 promise 里只要有一个结束了,那 race 返回的 promise 就会结束。而当迭代结束时,即所有的任务都已经执行并放到任务池里了,那么我们接下来的任务就是等待 所有 剩下的任务完成,自然就不能再用 race 了。另一个疑问是为什么不用
Promise.all
?答案是:all 会等待所有任务执行是没错,但是他是等待所有任务 成功,一旦其中有一个 promise 状态变为 rejected 了,那 Promise.all 就会直接失败并退出。而比较少见的 MDN - Promise.allSettled() 则可以很好的契合这个需求。
光分析可能会有些干巴巴的,下面我们来结合例子实践一下:计算从 0-9 的 +1 结果,首先请出我们的测试素材:
/**
* 异步任务,接受一个数字,并在随机时间后计算数字 + 1
*/
const plusNum = function (num) {
console.log('正在获取', num, '的计算结果')
const calcTime = Math.ceil(Math.random() * 4000)
return new Promise(resolve => {
setTimeout(() => {
console.log(`${num} + 1 的计算结果为 ${num + 1},用时 ${calcTime}ms`)
resolve()
}, calcTime);
});
}
/**
* 生成一个从 0 到 9 的步进数组
*/
const numArray = Array.from({ length: 10 }).map((_, index) => index)
注意其中的 mock 函数 plusNum
的异步时间是随机的(最大 4 秒),因为如果这个时间是固定的话就会产生同时开始同时结束的情况,不方便我们检查。现在我们加入并发数量控制,如下:
const run = async function () {
await concurrent(numArray, 3, plusNum)
console.log('任务执行完毕')
}
run()
很简单对吧,传入要迭代的数组,指定并发数量,最后传入要执行的异步回调,下面我们执行一下:
可以看到,由于我们设置的并发上限为 3,所以一开始直接启动了前三个任务,随后每完成一个任务就会取出新的任务并执行,最后等待所有任务都结束后,并发结束。
收集并发结果并结合错误重试
实际上上面这个例子并不完整,为什么呢?因为我们并没有收集到异步任务的执行结果并返回给调用者,就想我们更喜欢用 map 而不是 forEach 一样,接下来我们就实现这个功能,并顺便加上上面实现的错误重试(毕竟这兄弟俩基本都是同时出现的)。
老规矩先上代码再介绍:
const concurrent = async function (collection, limit, asyncCallback) {
const taskIterator = collection.entries();
const pool = new Set();
// 存放异步任务的执行结果或错误
const finalResult = [];
const finalError = [];
do {
const { done, value: [index, task] = [] } = taskIterator.next();
if (done) {
await Promise.allSettled(pool);
break;
};
// 完成或失败时将结果存起来
// 注意这里包装上了错误重试功能
const promise = retryWarpper(asyncCallback)(task)
.then(data => finalResult[index] = data)
.catch(error => finalError[index] = { error, task })
.finally(() => pool.delete(promise))
if (pool.add(promise).size >= limit) {
await Promise.race(pool);
}
} while (true)
// 将结果和异常一起返回出去
return [finalError, Array.from(finalResult)];
}
大致就是新建两个数组来存放最终的结果和异常,在异步回调执行之前先包装上我们的错误重试,然后在用 .then 和 .catch 来捕获并存放执行结果。注意这里要在 while 循环里对每次异步回调执行都执行一下错误重试的包装。
如果我们在一开始就包装了的话,在 while 循环里调用的所有异步回调都将会访问到同一个闭包中的错误重试计时器,就导致了所有的异步任务总共只有那么多重试次数,而非每个异步任务享有自己单独的重试次数。
这里还有一个小问题,有的同学可能会问:在 while 里执行异步回调的时候这么写行不行:
const promise = retryWarpper(asyncCallback)(task);
// 先加入任务池,再绑定回调
pool.add(promise);
promise.then(data => finalResult[index] = data)
.catch(error => finalError[index] = { error, task })
.finally(() => pool.delete(promise));
答案是不行的,虽然看起来只是流程上变了一下,但是两者是有本质上的区别的,问题在于 Promise 的 then / catch / finally 都会返回一个 新的 Promise。也就是说,一开始我们存放到任务池里的 promise 是 finally 返回的,所以在最后的 Promise.all 的等待时,他会等待这个 promise 绑定的 then / catch / finally 回调都执行完之后才结束。
而如果我们先将异步任务返回的 promise 加入任务池,然后再绑定回调,会发生什么呢:
- 迭代器返回 done,开始 await Promise.all
- 异步任务完成,Promise.all 结束退出
- while 循环结束,返回结果数组
是不是漏了什么,是的,then / catch 回调表示:
由于我们 Promise.all 等待的是异步任务的 promise 而非 finally 回调返回 promise,所以 Promise.all 并不会等你把结果塞到 finalResult / finalError 里,然后就会导致我们最后的返回数组里少了某一个或某几个任务的结果。
除此之外,还有个小知识点就是最后返回的时候将 finalResult
重建了一遍,而 finalError
却没有执行操作,这个的原因在于我们异步回调执行完存放结果的时候是直接使用 index 插入到数组里的,所以没有被插入的位置是空的(empty 而不是 undefind),这两者的区别在于 undefined 会被数组迭代器迭代到,而 empty 不会。
所以我们就可以通过这种方式,让我们对结果进行后续处理的时候得以发现有那些数据没有取到,而且对 error 结果进行遍历的时候也不需要再剔除掉结果为空的情况了。
ok,到这里本文的所有分享就结束了,下面放一下包含错误重试和并发数量控制的完整例子,你可以自己亲手试一下:
/**
* 计算入参数字 +1 的结果,50% 几率成功,50% 几率失败
* @param {number} num 要计算的数字
*/
const plusNum = async function (num) {
console.log('正在获取', num, '的计算结果')
return new Promise((resolve, reject) => {
setTimeout(() => {
if (Math.random() < 0.5) resolve(num + 1);
else reject(`无法计算 ${num} + 1`);
}, Math.floor(Math.random() * 5000));
});
}
/**
* 0-10 的步进数组
*/
const numArray = Array.from({ length: 10 }).map((_, index) => index)
/**
* 错误重试包装器
*
* @param {async function} asyncFunc 要包装的异步函数
* @param {number} defaultRetryTime 默认的重试次数
* @param {number} retryInterval 重试间隔时常
* @returns 会自动进行错误重试的异步函数
*/
const retryWarpper = function (asyncFunc, defaultRetryTime = 3, retryInterval = 1000) {
let retryTime = defaultRetryTime
const retryCallback = async function(...args) {
try {
return await asyncFunc(...args)
}
catch (e) {
if (retryTime <= 0) throw e
console.log(`${args} 查询失败,将在 ${retryInterval} 毫秒后重试,剩余重试次数 ${retryTime}`)
retryTime -= 1
await new Promise(reslove => setTimeout(reslove, retryInterval))
return await retryCallback(...args)
}
}
return retryCallback
}
/**
* 控制并发数量
*
* @param {any[]} collection 待执行的任务数组
* @param {number} limit 最大并发数量
* @param {async function} asyncCallback 要执行的异步回调
*/
const concurrent = async function (collection, limit, asyncCallback) {
// 用于在 while 循环中取出任务的迭代器
const taskIterator = collection.entries();
// 任务池
const pool = new Set();
// 最终返回的结果数组
const finalResult = [];
// 最终返回的异常数组
const finalError = [];
do {
const { done, value: [index, task] = [] } = taskIterator.next();
// 任务都已执行,等待最后的剩下的任务执行完毕
if (done) {
await Promise.allSettled(pool);
break;
};
// 将结果存入结果数组,并从任务池中移除自己
const promise = retryWarpper(asyncCallback)(task)
.then(data => finalResult[index] = data)
.catch(error => finalError[index] = { error, task })
.finally(() => pool.delete(promise))
// 达到上限后就等待某个任务完成
if (pool.add(promise).size >= limit) {
await Promise.race(pool);
}
} while (true)
return [finalError, Array.from(finalResult)];
}
const run = async function() {
const [errors, results] = await concurrent(numArray, 3, plusNum);
console.log('所有报错', errors)
console.log('所有结果', results)
}
run()
写在最后
其实实现类似功能的包在 npm 上已经有很多了,例如 es6-promise-pool - npm 和 async.mapLimit - github.io,你可以发现我上面例子里并发数量控制的入参和 async.mapLimit 是一致的,其实本来也是在研究 async 的时候产生的这个想法。这两个例子基本上就是对 Promise 和 async / await 的综合运用,认真思考一下还是对 js 异步编程的理解非常有帮助的,特别是 promise 回调链和 async 作为 promise 的语法糖究竟甜在哪里。
上面仅仅是我对这两个需求的基本实现,才疏学浅也没有考虑兼容性之类的问题,如果发现了什么问题欢迎评论区里指正。