JavaScript工具箱

p-limit源码阅读

2023-07-27  本文已影响0人  拜仁的月饼

p-limit源码

开篇

源码统共68行,硬要背也是可以背下来的,但更多时候要“知其然”。因此写一篇源码阅读。

正文

该库的用法: (源码摘自: 手写 p-limit,40 行代码实现并发控制--神光的编程秘籍)

import pLimit from 'p-limit';

const generator = pLimit(2);
const input = [
    generator(() => fetchSomething('foo')),
    generator(() => fetchSomething('bar')),
    generator(() => doSomething())
];

const result = await Promise.all(input);
console.log(result);

上面代码意义是多个异步逻辑并行执行,最大并发数为2. 那么这一逻辑是怎样实现呢,这就要翻看源码了。

源码第一行引用了第三方队列实现yocto-queue,在看源码中把它当一个数组就可以。

import Queue from 'yocto-queue';

快速翻看一遍源码,发现源码最终导出的只是一个pLimit函数,参数concurrencynumber类型,返回结果为一个generator函数。那么,我们可以按图索骥,看一下generator函数是如何实现的?

const generator = (fn, ...args) => new Promise(resolve => {
    // 从字面理解上来看,该函数的意思是"入队"
    enqueue(fn, resolve, args);
});

仅有一行,逻辑是返回一个Promise,初始化时将要执行的函数、resolve、参数都推到队列中去。继续按图索骥,看一下队列定义和入队都做了什么。

源码中除却前三行的错误处理逻辑外,马上就是队列定义和activeCount定义。

// 队列定义, 从yocto-queue引入的队列类
const queue = new Queue();

let activeCount = 0;

接下来跳到enqueue函数的实现:

// 直接copy源码
const enqueue = (fn, resolve, args) => {
    // run函数执行结果入队。
    // run函数传入fn, Promise.resov, fn的参数args,并绑定this为undefined
    queue.enqueue(run.bind(undefined, fn, resolve, args));

    (async () => {
        // This function needs to wait until the next microtask before comparing
        // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
        // when the run function is dequeued and called. The comparison in the if-statement
        // needs to happen asynchronously as well to get an up-to-date value for `activeCount`.

        // 上述为英文注释,英文好的同学可以一读。下面是我的翻译
        // 该IIFE (立即执行函数) 在activeCount与concurrency作比较之前须等待下一个微任务执行
        // 因为activeCount在run函数出队和调用时是异步更新的
        // 在if语句中 activeCount与concurrency作比较这一过程需要异步进行,以便获取最新的activeCount
        await Promise.resolve();

        if (activeCount < concurrency && queue.size > 0) {
            queue.dequeue()();
        }
    })();
};

这段代码看的是不是感觉不知所云?即使是读了一遍英文+中文注释后也感觉到这样?即使如此,还是让我的抽丝剥茧一下,直接看代码。enqueue函数只作了两件事:

  1. 将一个异步任务推到队列中
  2. 立即执行一个IIFE。

由此引出的两个问题是:

  1. run函数做了什么?
  2. IIFE中做了什么?

首先回答第一个问题。还是去看run函数源码。由于run函数与next函数密切相关,所以源码一并粘贴过来:

const next = () => {
    // activecount减1
    activeCount--;
    // 如果还有任务,让任务出队并执行
    if (queue.size > 0) {
        queue.dequeue()();
    }
};

const run = async (fn, resolve, args) => {
    // activecount加1
    activeCount++;
                
    // 异步任务执行结果
    const result = (async () => fn(...args))();
  
    // resolve获取执行结果
    resolve(result);

    try {
        await result;
    } catch {}
      
    // 当前任务执行完毕后下一个
    next();
};

第二个问题,IIFE中的逻辑其实与next函数大同小异, 都是只要activeCountconcurrency小,并且队列有任务,那么就让任务出队并执行。

以上是p-limit的核心实现。简单综述一下流程:

  1. 维护一个队列,队列任务数量上限是指定的参数;
  2. 待异步函数完毕后,立即让执行完的函数出队,然后下一个异步函数入队。但注意不要超过并发限制。
上一篇下一篇

猜你喜欢

热点阅读