收藏一些收藏

ForkjoinPool -3

2022-03-17  本文已影响0人  程序员札记

任务分割与等待:fork和join

fork和join是ForkJoinTask的方法,也是整个框架的设计灵魂:fork把任务切分为小任务,join则等待任务结果。

fork

ForkJoinTask fork的实现异常简单:

    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
         // 放到当前线程的workqueue
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

common 是一个静态的ForkjoinPool,不可关闭。即使你不new 一个ForkjoinPool ,直接调用fork ,此任务就直接交给common 了。 可以这么理解说,ForkjoinPool 有一个default , common 的构造在Forkjoinpool的static 代码段中。

    static {
        // initialize field offsets for CAS etc
        try {
...

        common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
        int par = common.config & SMASK; // report 1 even if threads disabled
        commonParallelism = par > 0 ? par : 1;
    }


join

    public final V join() {
        int s;
   
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }

doJoin 是一个等待任务执行结束的方法,当任务执行完毕的时候就会返回。这里虽然等待,确不一定返回。

    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }

fork方法逻辑:

doJoin&join方法逻辑:

state 状态

    volatile int status; // accessed directly by pool and workers
    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
    static final int NORMAL      = 0xf0000000;  // must be negative
    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
    static final int SMASK       = 0x0000ffff;  // short bits for tags

所以status <0 代表任务已经完成,取消或者异常,总之就是任务结束了。
上面这段代码的意思就是当前任务是否结束,没有就看是不是ForkJoinWorkerThread,

tryUnpush

工作线程在合并结果时,如果这个任务被fork到了栈顶/队尾,那么执行该任务返回即可。但如果不在栈顶,有可能是被其他fork出的任务压下去了或者其他线程被窃取了,那么则会进入awaitJoin()方法。
externalAwaitDone 基本上是个wait notify 方法


    private int externalAwaitDone() {
        int s = ((this instanceof CountedCompleter) ? // try helping
                 ForkJoinPool.common.externalHelpComplete(
                     (CountedCompleter<?>)this, 0) :
                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
        if (s >= 0 && (s = status) >= 0) {
            boolean interrupted = false;
            do {
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                    synchronized (this) {
                        if (status >= 0) {
                            try {
                                wait(0L);
                            } catch (InterruptedException ie) {
                                interrupted = true;
                            }
                        }
                        else
                            notifyAll();
                    }
                }
            } while ((s = status) >= 0);
            if (interrupted)
                Thread.currentThread().interrupt();
        }
        return s;
    }

awaitJoin

awaitJoin就相当复杂,要帮助当然执行worker 尽快偷到一个任务来执行。

    final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
        int s = 0;
        if (task != null && w != null) {
         //把currentJoin 仅仅在这个方法内使用,currentJoin赋给 prevJoin 说明task 不能是循环依赖的,必须是个有向无环图。
        //即使有循环依赖,在下面的程序里也有办法解决,就是设置超时时间。所以wait 和internalWait 设置了超时时间。
            ForkJoinTask<?> prevJoin = w.currentJoin;
            U.putOrderedObject(w, QCURRENTJOIN, task);
            CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
                (CountedCompleter<?>)task : null;
            for (;;) {
                // 如果task 完成
                if ((s = task.status) < 0)
                    break;
                if (cc != null)
                    helpComplete(w, cc, 0);
                //如果当前没有任务,现在task 未完成,则task 被窃取了
                else if (w.base == w.top || w.tryRemoveAndExec(task))
                //帮助stealer执行任务,毕竟当前自身队列已经执行完了,所以帮助stealer 执行任务
                // 以尽快返回
                    helpStealer(w, task);
                if ((s = task.status) < 0)
                    break;
                long ms, ns;
                if (deadline == 0L)
                    ms = 0L;
                else if ((ns = deadline - System.nanoTime()) <= 0L)
                    break;
                else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                    ms = 1L;
               //尝试增加一个线程作为补偿,因为当前线程准备进入等待,能到达这里表示任务完成,并且队列里还有任务
                if (tryCompensate(w)) {
                    task.internalWait(ms);
                    U.getAndAddLong(this, CTL, AC_UNIT);
                }
            }
            U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
        }
        return s;
    }

awaitJoin方法的总体逻辑还算简单,如下:

tryRemoveAndExec

tryRemoveAndExec从队列中取出task执行,或者已经取消的task


        final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] a; int m, s, b, n;
            if ((a = array) != null && (m = a.length - 1) >= 0 &&
                task != null) {
                while ((n = (s = top) - (b = base)) > 0) {
                    for (ForkJoinTask<?> t;;) {      // traverse from s to b
                       //从top的第一位置,开始遍历
                        long j = ((--s & m) << ASHIFT) + ABASE;
                     //获取到的top 为空, 并且s+1 = top, 则当然队列为空,否则非空,如果队列非空,说明所有的元素都遍历了
                        if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                            return s + 1 == top;     // shorter than expected
                        else if (t == task) {
                            boolean removed = false;
                            if (s + 1 == top) {      // pop
                                if (U.compareAndSwapObject(a, j, task, null)) {
                                    U.putOrderedInt(this, QTOP, s);
                                    removed = true;
                                }
                            }
//如果到这里,说明t 在base 和base-2 之间, base=b 表示没有任何task被偷
//如果可能被偷,则很坑就是t, +1 判断
                            else if (base == b)      // replace with proxy
                                removed = U.compareAndSwapObject(
                                    a, j, task, new EmptyTask());
                            if (removed)
                                task.doExec();
                            break;
                        }
                        else if (t.status < 0 && s + 1 == top) {
                            if (U.compareAndSwapObject(a, j, t, null))
                                U.putOrderedInt(this, QTOP, s);
                            break;                  // was cancelled
                        }
                        if (--n == 0)
                            return false;
                    }
                    if (task.status < 0)
                        return false;
                }
            }
            return true;
        }


tryRemoveAndExec方法的逻辑,如下:
判断队列中是否有任务:

tryRemoveAndExec方法比较简单,该方法主要作用是遍历当前线程的WorkQueue,在队列中查找要join合并的任务执行。而在执行过程中,如果队列为空或者任务在栈顶但cas失败以及遍历完整个队列都没找到要join的任务,这三种情况代表任务被偷了,对于前两种情况下,会进入helpStealer帮助窃取者执行任务,而对于最后一种被窃取任务的情况,则会直接退出阻塞(个人猜测:可能是因为遍历完整个队列会导致一段时间的开销,被窃取走的任务很有可能在这段时间内已经执行完了或快执行完了。所以与其去帮助窃取者执行任务,还不如阻塞等待一会儿)。

helpStealer

在看Forkjoinpool 的helpStealer方法

private void helpStealer(WorkQueue w, ForkJoinTask<?> task) { // 帮助偷取者(偷取自己任务的线程)执行任务
        WorkQueue[] ws = workQueues;
        int oldSum = 0, checkSum, m;
        if (ws != null && (m = ws.length - 1) >= 0 && w != null && task != null) {
            do { // restart point
                checkSum = 0; // for stability check
                ForkJoinTask<?> subtask;
                WorkQueue j = w, v; // v是子任务的偷取者
                descent: for (subtask = task; subtask.status >= 0;) { // 从目标任务开始,记录当前Join的任务,也包括偷取者当前Join的任务,递归帮助
                    for (int h = j.hint | 1, k = 0, i;; k += 2) { // 每次跳2个,遍历奇数位索引
                        if (k > m) // 如果遍历一遍没有找到偷取者,跳出循环
                            break descent;
                        if ((v = ws[i = (h + k) & m]) != null) {
                            if (v.currentSteal == subtask) { // 定位到偷取者,更新hint为偷取者索引,方便下次定位
                                j.hint = i;
                                break;
                            }
                            checkSum += v.base; // 若没有定位到,则累加工作队列的base值,继续遍历
                        }
                    }
                    for (;;) { // 帮助偷取者执行任务
                        ForkJoinTask<?>[] a; // 偷取者的任务数组
                        int b;
                        checkSum += (b = v.base); // 累加偷取者的base值
                        ForkJoinTask<?> next = v.currentJoin; // 记录偷取者Join的任务
                        if (subtask.status < 0 || j.currentJoin != subtask || v.currentSteal != subtask) // subtask结束,或者数据不一致了(j.currentJoin != subtask || v.currentSteal != subtask)
                            break descent; // 跳出外层循环重来
                        if (b - v.top >= 0 || (a = v.array) == null) { // 偷取者的任务列表为空
                            if ((subtask = next) == null) // 偷取者的Join任务为空,跳出外层循环
                                break descent;
                            j = v; // 否则,j取v的值(j指向被偷者,v指向偷取者),且subtask指向next Join任务
                            break; // 继续遍历,寻找偷取者的偷取者(递归)
                        }
                        int i = (((a.length - 1) & b) << ASHIFT) + ABASE; // 偷取者的base内存地址
                        ForkJoinTask<?> t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i)); // 获取base位置任务
                        if (v.base == b) {
                            if (t == null) // 任务为空,跳出外层循环(可能被别的线程拿走了)
                                break descent;
                            if (U.compareAndSwapObject(a, i, t, null)) { // poll(从base位置取出任务)
                                v.base = b + 1; // 更新base的值
                                ForkJoinTask<?> ps = w.currentSteal; // 记录调用者之前偷取的任务
                                int top = w.top; // 记录调用者的top值
                                do {
                                    U.putOrderedObject(w, QCURRENTSTEAL, t); // 更新currentSteal为刚刚偷取到的任务
                                    t.doExec(); // 指向任务
                                } while (task.status >= 0 && w.top != top && (t = w.pop()) != null); // 如果任务未结束,且自己任务队列不为空,优先处理自己队列里的任务
                                U.putOrderedObject(w, QCURRENTSTEAL, ps); // 把之前偷取的任务设置回currentSteal
                                if (w.base != w.top) // 自己队列来新任务了,直接返回
                                    return;
                            }
                        }
                    }
                }
            } while (task.status >= 0 && oldSum != (oldSum = checkSum)); // Join的任务未结束,且任务在流动中,继续帮助执行
        }
    }

}
该方法是ForkJoin框架实现“工作窃取思想”的核心体现。它与scan扫描方法完成了整个框架“工作窃取”实现。在scan方法之后的runTask方法中,会对currentSteal赋值,而helpStealer方法就是依赖于该成员与currentJoin成员形成的一条窃取链,实现了帮助窃取者执行任务。总而言之,helpStealer方法的核心思想是帮助执行,帮助窃取者执行它的任务,但它不仅仅只会帮助窃取者执行,还会基于currentSteal与currentJoin成员形成的窃取链帮助窃取者的窃取者执行、帮助窃取者的窃取者的窃取者执行、帮助窃取者.....的窃取者执行任务。上个例子理解,如下:

tryCompensate补偿活跃线程方法

再来看看为线程池补偿活跃线程的tryCompensate方法:

// ForkJoinPool类 → tryCompensate()方法
private boolean tryCompensate(WorkQueue w) {
    boolean canBlock;
    WorkQueue[] ws; long c; int m, pc, sp;
    // 如果线程池已经停止,处于terminate状态,不能阻塞,也不需要阻塞
    if (w == null || w.qlock < 0 ||           // caller terminating
        (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
        (pc = config & SMASK) == 0)           // parallelism disabled
        canBlock = false;
    // 如果ctl的低32位中有挂起的空闲线程,那么尝试唤醒它,成功则阻塞自己
    // 唤醒后在一定程度上也许会执行到自己被偷的任务fork出的子任务
    // tryRelease第二个参数为0,当唤醒成功后,代表当前线程将被阻塞,
    // 新的空闲线程被唤醒,所以没必要先减少活跃线程数,然后再加上
    else if ((sp = (int)(c = ctl)) != 0)      // release idle worker
        canBlock = tryRelease(c, ws[sp & m], 0L);
    // 如果没有空闲线程,就要创建新的线程
    // 这里会导致线程池中的线程数,在一段时间内会超过创建时指定的并行数
    else {
        // 获取池中的活跃线程数
        int ac = (int)(c >> AC_SHIFT) + pc;
        // 获取池中的总线程数
        int tc = (short)(c >> TC_SHIFT) + pc;
        int nbusy = 0;       // validate saturation
        for (int i = 0; i <= m; ++i) {  // two passes of odd indices
            WorkQueue v;
            // 找奇数位置的队列,循环m次就是执行了两遍。
            // 为什么执行两遍呢?主要是为了判断稳定性,有可能第二遍
            //  的时候,正在执行任务的活跃线程会变少
            if ((v = ws[((i << 1) | 1) & m]) != null) {
                // 检查工作线程是否正在处理任务,
                // 如果不在处理任务表示空闲,可以获取其他任务执行
                if ((v.scanState & SCANNING) != 0)
                    break;
                ++nbusy;
            }
        }
        // 如果线程池状态不稳定,那么则不能挂起当前线程
        // 如果nbusy!=tc*2 说明还存在空闲或者还在扫描任务的工作线程
        // 如果ctl!=c 代表ctl发生了改变,有可能线程执行完任务后,
        // 没有扫描到新的任务被失活,这种情况下先不挂起,先自旋一段时间
        if (nbusy != (tc << 1) || ctl != c)
            canBlock = false;         // unstable or stale
        
        // tc:池内总线程数  pc:并行数 ac:池内活跃线程数
        // tc>=pc 代表此时线程数已经够多了,当然并不代表不会创建新线程
        // ac>1 代表除了自己外还有其他活跃线程
        // w.isEmpty() 当前工作线程队列为空,其中没有任务需要执行
        // 如果满足如上三个条件,那么则可以直接阻塞,不需要补偿
        else if (tc >= pc && ac > 1 && w.isEmpty()) {
            long nc = ((AC_MASK & (c - AC_UNIT)) |
                       (~AC_MASK & c));      // uncompensated
            //cas ctl
            canBlock = U.compareAndSwapLong(this, CTL, c, nc);
        }
        // 这是对于commonPool 公共线程池的特殊处理
        // 如果总线程数超出MAX_CAP则会抛出异常
        else if (tc >= MAX_CAP ||
                 (this == common && tc >= pc + commonMaxSpares))
            throw new RejectedExecutionException(
                "Thread limit exceeded replacing blocked worker");
        else {                                // similar to tryAddWorker
            boolean add = false; int rs;      // CAS within lock
            // 准备创建新的工作线程(这里只加总线程数,不加活跃线程数)
            //      因为当前工作线程将在创建补偿线程成功之后阻塞
            // 但是这里会导致总线程数超出并行数
            long nc = ((AC_MASK & c) |
                       (TC_MASK & (c + TC_UNIT)));
            // 线程池没有停止的情况下才允许创建新的工作线程
            if (((rs = lockRunState()) & STOP) == 0)
                add = U.compareAndSwapLong(this, CTL, c, nc);
            unlockRunState(rs, rs & ~RSLOCK);
            // 创建新的工作线程
            canBlock = add && createWorker(); // throws on exception
        }
    }
    return canBlock;
}

该方法内的逻辑也算比较简单:

externalAwaitDone方法

前面分析doJoin逻辑提到过:如果是外部线程调用join方法时,会调用externalAwaitDone方法,接着再来看看这个方法:

// ForkJoinPool类 → externalAwaitDone()方法
private int externalAwaitDone() {
    // 如果任务是CountedCompleter类型,尝试使用common池去外部帮助执行,
    // 执行完成后并将完成任务状态返回
    int s = ((this instanceof CountedCompleter) ? // try helping
             ForkJoinPool.common.externalHelpComplete(
                 (CountedCompleter<?>)this, 0) :
                 // 当前task不是CountedCompleter,尝试从栈顶获取到当前
                 // join的任务交给common池执行,如果不在栈顶,s变为0
             ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
    // 如果s>=0,那代表任务是未结束的状态,需要阻塞
    if (s >= 0 && (s = status) >= 0) {
        boolean interrupted = false;
        do {
            // 先设置SIGNAL信号标记,通知其他线程当前需要被唤醒
            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                // 通过synchronized.wait()挂起线程
                synchronized (this) {
                    if (status >= 0) { // 双重检测
                        try {
                            wait(0L);   // 挂起线程
                        } catch (InterruptedException ie) {
                            interrupted = true;
                        }
                    }
                    else
                        // 如果发现已完成,则唤醒所有等待线程
                        notifyAll();
                }
            }
        // task未完成会一直循环
        } while ((s = status) >= 0);
        // 响应中断操作
        if (interrupted)
            Thread.currentThread().interrupt();
    }
    // 执行完成后返回执行状态
    return s;
}

externalAwaitDone方法最简单,如果任务在栈顶,那么直接弹出执行,如果不在则挂起当前线程,直至任务执行结束,其他线程唤醒

任务拆分合并原理总结

任务的fork操作比较简单,只需要将拆分好的任务push进入自己的工作队列即可。而对于任务结果的合并:join操作,实现就略显复杂了,大体思想是首先在自己队列中找需要join的任务,如果找到了则执行它并合并结果。如果没找到就是被偷了,需要去找窃取者线程,并且在join任务执行结束之前,会根据窃取链一直帮助窃取者执行任务,如果窃取链断了但是join任务还未执行完,那么挂起当前工作线程,不过在挂起之前会根据情况来决定是否为线程池补偿一条活跃线程代替自己工作,防止整个线程池所有的线程都阻塞,产生线程池“假死”状态。当然,如果是外部线程执行的join操作,如果要被join的任务还未执行完的情况下,那么则需要把这任务交给commonPool公共池来处理

ForkJoin中任务取消实现原理

任务取消的cancel方法是实现于Future接口的,逻辑比较简单,源码如下:

// ForkJoinTask类 → cancel()方法
public boolean cancel(boolean mayInterruptIfRunning) {
    // 尝试将任务状态修改为CANCELLED,成功返回true,失败返回false
    return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}

// ForkJoinTask类 → setCompletion()方法
private int setCompletion(int completion) {
    // 开启自旋(死循环)
    for (int s;;) {
        // 如果任务已经完成,则直接返回执行后的状态
        if ((s = status) < 0)
            return s;
        // 如果还未完成则尝试通过cas机制修改状态为入参:completion状态
        if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
            if ((s >>> 16) != 0)
                synchronized (this) { notifyAll(); }
            return completion;
        }
    }
}

取消任务的逻辑比较简单,任务取消只能发生在任务还未被执行的情况下,如果任务已经完成则会直接返回执行状态。如果任务还未执行,则会尝试使用自旋+CAS机制修改任务状态为CANCELLED状态,成功则代表任务取消成功。

POOL 终止

private boolean tryTerminate(boolean now, boolean enable) {
        int rs;
        if (this == common) // 公共线程池,不能shutdown
            return false;
        if ((rs = runState) >= 0) {
            if (!enable)
                return false;
            rs = lockRunState(); // 进入SHUTDOWN阶段
            unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
        }

        if ((rs & STOP) == 0) { // 准备进入STOP阶段
            if (!now) { // 必要的检查
                for (long oldSum = 0L;;) {
                    WorkQueue[] ws;
                    WorkQueue w;
                    int m, b;
                    long c;
                    long checkSum = ctl;
                    if ((int) (checkSum >> AC_SHIFT) + (config & SMASK) > 0)
                        return false; // 如果有活动的工作线程,还不能停止
                    if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
                        break;
                    for (int i = 0; i <= m; ++i) {
                        if ((w = ws[i]) != null) {
                            if ((b = w.base) != w.top || w.scanState >= 0 || w.currentSteal != null) {
                                tryRelease(c = ctl, ws[m & (int) c], AC_UNIT);
                                return false; // 有任务在执行,还不能停止
                            }
                            checkSum += b; // 累加base值
                            if ((i & 1) == 0)
                                w.qlock = -1; // 偶数索引工作队列,qlock = -1, 拦截从外部提交的任务
                        }
                    }
                    if (oldSum == (oldSum = checkSum)) // 稳定了,退出
                        break;
                }
            }
            if ((runState & STOP) == 0) { // 如果now等于true,立刻进入STOP结点
                rs = lockRunState();
                unlockRunState(rs, (rs & ~RSLOCK) | STOP);
            }
        }

        int pass = 0;
        for (long oldSum = 0L;;) {
            WorkQueue[] ws;
            WorkQueue w;
            ForkJoinWorkerThread wt;
            int m;
            long checkSum = ctl;
            if ((short) (checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 || (ws = workQueues) == null
                    || (m = ws.length - 1) <= 0) { // 可以终止了
                if ((runState & TERMINATED) == 0) {
                    rs = lockRunState();
                    unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED); // 进入TERMINATED阶段
                    synchronized (this) {
                        notifyAll(); // 唤醒所有线程
                    }
                }
                break; // 跳出
            }
            for (int i = 0; i <= m; ++i) {
                if ((w = ws[i]) != null) {
                    checkSum += w.base;
                    w.qlock = -1; // 设置不可用
                    if (pass > 0) {
                        w.cancelAll(); // 取消所有的任务
                        if (pass > 1 && (wt = w.owner) != null) {
                            if (!wt.isInterrupted()) {
                                try {
                                    wt.interrupt(); // 中断线程
                                } catch (Throwable ignore) {
                                }
                            }
                            if (w.scanState < 0)
                                U.unpark(wt); // 唤醒线程
                        }
                    }
                }
            }
            if (checkSum != oldSum) { // 不稳定,重来
                oldSum = checkSum;
                pass = 0;
            } else if (pass > 3 && pass > m) // 退出
                break;
            else if (++pass > 1) { // 出队
                long c;
                int j = 0, sp;
                while (j++ <= m && (sp = (int) (c = ctl)) != 0)
                    tryRelease(c, ws[sp & m], AC_UNIT);
            }
        }
        return true;
    }

SHUTDOWN(!common) -> STOP(无任务执行) -> TERMINATED(over)
线程池关闭的实现逻辑也比较简单,首先会将线程池标记为SHUTDOWN状态,然后根据情况进行下一步处理,如果线程池中没啥活跃线程了,同时任务也不多了,将状态改为STOP状态,在STOP状态中会处理四件事:

fork-join

1.有一个大的任务Task(8), 最终被分解成8个小任务Task(1)

image

2.将Task(8)加入到任务队列里面(偶数索引,图中未显示),线程A偷取到Task(8),fork了2个Task(4),push到任务队列里面

image

3.pop出Task(4), fork出2个Task(2),push到任务队列里面

image

4. pop出Task(2), fork出2个Task(1), push到任务队列里面

image

5.pop出任务Task(1),此刻已经达到最小粒度,开始执行该任务;与此同时,线程B从底部(base)位置steal走了Task(4)

image

6.线程B拿到Task(4)之后,fork出了2个Task(2),push到任务队列里面

image

7.线程A执行完自己的任务后,由于Task(4).join(),索性定位到偷走自己任务的线程B所在的工作队列,帮助其执行任务,整体加快任务进度,帮助的方式也是steal

image

以上是最简单的一种fork.join方式。

最后的总结:

上一篇下一篇

猜你喜欢

热点阅读