p-limit源码阅读
2023-07-27 本文已影响0人
拜仁的月饼
开篇
源码统共68行,硬要背也是可以背下来的,但更多时候要“知其然”。因此写一篇源码阅读。
正文
该库的用法: (源码摘自: 手写 p-limit,40 行代码实现并发控制--神光的编程秘籍)
import pLimit from 'p-limit';
const generator = pLimit(2);
const input = [
generator(() => fetchSomething('foo')),
generator(() => fetchSomething('bar')),
generator(() => doSomething())
];
const result = await Promise.all(input);
console.log(result);
上面代码意义是多个异步逻辑并行执行,最大并发数为2. 那么这一逻辑是怎样实现呢,这就要翻看源码了。
源码第一行引用了第三方队列实现yocto-queue
,在看源码中把它当一个数组就可以。
import Queue from 'yocto-queue';
快速翻看一遍源码,发现源码最终导出的只是一个pLimit
函数,参数concurrency
为number
类型,返回结果为一个generator
函数。那么,我们可以按图索骥,看一下generator
函数是如何实现的?
const generator = (fn, ...args) => new Promise(resolve => {
// 从字面理解上来看,该函数的意思是"入队"
enqueue(fn, resolve, args);
});
仅有一行,逻辑是返回一个Promise
,初始化时将要执行的函数、resolve
、参数都推到队列中去。继续按图索骥,看一下队列定义和入队都做了什么。
源码中除却前三行的错误处理逻辑外,马上就是队列定义和activeCount
定义。
// 队列定义, 从yocto-queue引入的队列类
const queue = new Queue();
let activeCount = 0;
接下来跳到enqueue
函数的实现:
// 直接copy源码
const enqueue = (fn, resolve, args) => {
// run函数执行结果入队。
// run函数传入fn, Promise.resov, fn的参数args,并绑定this为undefined
queue.enqueue(run.bind(undefined, fn, resolve, args));
(async () => {
// This function needs to wait until the next microtask before comparing
// `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
// when the run function is dequeued and called. The comparison in the if-statement
// needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
// 上述为英文注释,英文好的同学可以一读。下面是我的翻译
// 该IIFE (立即执行函数) 在activeCount与concurrency作比较之前须等待下一个微任务执行
// 因为activeCount在run函数出队和调用时是异步更新的
// 在if语句中 activeCount与concurrency作比较这一过程需要异步进行,以便获取最新的activeCount
await Promise.resolve();
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
}
})();
};
这段代码看的是不是感觉不知所云?即使是读了一遍英文+中文注释后也感觉到这样?即使如此,还是让我的抽丝剥茧一下,直接看代码。enqueue
函数只作了两件事:
- 将一个异步任务推到队列中
- 立即执行一个IIFE。
由此引出的两个问题是:
-
run
函数做了什么? - IIFE中做了什么?
首先回答第一个问题。还是去看run
函数源码。由于run
函数与next
函数密切相关,所以源码一并粘贴过来:
const next = () => {
// activecount减1
activeCount--;
// 如果还有任务,让任务出队并执行
if (queue.size > 0) {
queue.dequeue()();
}
};
const run = async (fn, resolve, args) => {
// activecount加1
activeCount++;
// 异步任务执行结果
const result = (async () => fn(...args))();
// resolve获取执行结果
resolve(result);
try {
await result;
} catch {}
// 当前任务执行完毕后下一个
next();
};
第二个问题,IIFE中的逻辑其实与next
函数大同小异, 都是只要activeCount
比concurrency
小,并且队列有任务,那么就让任务出队并执行。
以上是p-limit的核心实现。简单综述一下流程:
- 维护一个队列,队列任务数量上限是指定的参数;
- 待异步函数完毕后,立即让执行完的函数出队,然后下一个异步函数入队。但注意不要超过并发限制。