技术栈

2019-04-04——Java并发包 Fork/Join框架

2019-04-07  本文已影响0人  烟雨乱平生
ForkJoinTask.png
该类是ForkJoinPool执行的任务,要使用ForkJoin框架,必须创建ForkJoinTask任务。该类提供了将任务分割成子任务的方法fork、等待子任务完成的方法join,通常情况下,我们将一个大的任务fork成两个子任务,再通过join等待子任务完成。ForkJoinTask是一个抽象类,它有两个子类:RecursiveTaskRecursiveActionCountedCompleter,这三个类也是抽象类。

在实际运用中,我们一般都会继承 RecursiveTask 、RecursiveAction 或 CountedCompleter 来实现我们的业务需求,而不会直接继承 ForkJoinTask 类。

ForkJoinTask 实现了 Future 接口,说明它也是一个可取消的异步运算任务,实际上ForkJoinTask 是 Future 的轻量级实现,主要用在纯粹是计算的函数式任务或者操作完全独立的对象计算任务。fork 是主运行方法,用于异步执行;而 join 方法在任务结果计算完毕之后才会运行,用来合并或返回计算结果。

核心参数
/** 任务运行状态 */
volatile int status; // 任务运行状态(小于0是完成状态)
static final int DONE_MASK   = 0xf0000000;  // 任务完成状态标志位
static final int NORMAL      = 0xf0000000;  // must be negative
static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16 等待信号
static final int SMASK       = 0x0000ffff;  //  低位掩码

任务的状态有四种:

状态 说明
NORMAL 0xf0000000 表示任务“正常”完成的状态
CANCELLED 0xc0000000 表示任务“取消”完成的状态
EXCEPTIONAL 0x80000000 表示任务“异常”完成的状态
SIGNAL 0x00010000 信号,有其他任务依赖当前任务,任务结束前,通知其他任务join当前任务的结果。

未完成状态包括初始状态0和SIGNAL

还有两个掩码

标识 说明
DONE_MASK 0xf0000000 任务完成状态标志位
SMASK 0x0000ffff 低位掩码,也是最大索引位(非任务状态)

分割子任务fork

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        /*如果当前线程是ForkJoinWorkerThread,直接把任务push到自己所拥有的队列的top位置。*/
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        /*如果是非工作线程,就是一个提交到Pool的过程。*/
        ForkJoinPool.common.externalPush(this);
    return this;
}

任务执行

final int doExec() {
    int s; boolean completed;
    /*如果任务已经完成了,直接返回任务状态*/
    if ((s = status) >= 0) {
        try {
            /*exec返回任务是否正常完成,exec是一个抽象方法,为了扩展,留给子类实现*/
            completed = exec();
        } catch (Throwable rex) {
            /*如果任务执行过程中出现异常,则设置为异常完成*/
            return setExceptionalCompletion(rex);
        }
        if (completed)
            /*如果任务正常执行完成,把任务状态设置为正常完成*/
            s = setCompletion(NORMAL);
    }
    return s;
}

异常处理

任务执行过程抛出异常时,调用者可以获取该异常,ForkJoinTask并没有直接将异常的任务保存起来,而是保存了异常任务的弱引用,在合适的时候,GC将会回收该异常任务,被回收对象对应的弱引用将会保存在弱引用队列中。

    private static final ExceptionNode[] exceptionTable;
    private static final ReentrantLock exceptionTableLock;
    private static final ReferenceQueue<Object> exceptionTableRefQueue;

    static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
        final Throwable ex;
        ExceptionNode next;
        final long thrower;  // use id not ref to avoid weak cycles
        final int hashCode;  // store task hashCode before weak ref disappears
        ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
            super(task, exceptionTableRefQueue);
            this.ex = ex;
            this.next = next;
            this.thrower = Thread.currentThread().getId();
            this.hashCode = System.identityHashCode(task);
        }
    }

任务完成

public void complete(V value) {
    try {
        /*设置结果*/
        setRawResult(value);
    } catch (Throwable rex) {
        setExceptionalCompletion(rex);
        return;
    }
    /*设置正常完成*/
    setCompletion(NORMAL);
}
private int setExceptionalCompletion(Throwable ex) {
    /*记录异常完成*/
    int s = recordExceptionalCompletion(ex);
    if ((s & DONE_MASK) == EXCEPTIONAL)
        /*内部传播异常*/
        internalPropagateException(ex);
    return s;
}


final int recordExceptionalCompletion(Throwable ex) {
    int s;
    /*获取任务状态,如果任务状态已经完成了,直接返回任务状态*/
    if ((s = status) >= 0) {
        /*
         * System.identityHashCode和Object.hashCode返回的值一样,
         * 都是根据对象在内存中的地址计算出来的哈希码
         * */
        int h = System.identityHashCode(this);
        /*操作异常任务表之前先获取锁*/
        final ReentrantLock lock = exceptionTableLock;
        lock.lock();
        try {
            /*
             * 删除已经被回收对象对应的弱引用,该方法会遍历exceptionTableRefQueue,
             * 并删除exceptionTable中对应的弱引用
             * */
            expungeStaleExceptions();
            /*
             * 将执行过程抛出异常的任务弱引用保存到exceptionTable,这里其实是将
             * exceptionTable当作哈希表使用,i就是保存的位置
             * */
            ExceptionNode[] t = exceptionTable;
            int i = h & (t.length - 1);
            /*
            * 遍历哈希表索引i处的链表,如果遍历过程中发现已经存在该任务,
            * 跳出循环,否则遍历到链表末尾时,创建新的ExceptionNode,
            * 并将该节点放到链表的头部
            * */
            for (ExceptionNode e = t[i]; ; e = e.next) {
                if (e == null) {
                    t[i] = new ExceptionNode(this, ex, t[i]);
                    break;
                }
                if (e.get() == this) // already present
                    break;
            }
        } finally {
            lock.unlock();
        }
        /*设置任务的完成状态为EXCEPTIONAL*/
        s = setCompletion(EXCEPTIONAL);
    }
    return s;
}
/*设置任务完成状态,completion可选项为:NORMAL、CANCELLED、EXCEPTIONAL*/
private int setCompletion(int completion) {
    for (int s;;) {
        /*状态值为负数,说明任务已经完成*/
        if ((s = status) < 0)
            return s;
        if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
            /*如果原状态为SIGNAL,通知其他在该任务上等待的线程join该任务的结果*/
            if ((s >>> 16) != 0)
                synchronized (this) { notifyAll(); }
            return completion;
        }
    }
}

合并结果join

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        /*如果执行doJoin方法后不是正常完成,就报告异常信息*/
        reportException(s);
    /*如果执行doJoin方法后正常完成就返结果*/
    return getRawResult();
}

private void reportException(int s) {
    if (s == CANCELLED)
        throw new CancellationException();
    if (s == EXCEPTIONAL)
        rethrow(getThrowableException());
}

private int doJoin() {
    int s;
    Thread t;
    ForkJoinWorkerThread wt;
    ForkJoinPool.WorkQueue w;
    /*
     * 如果当前任务状态已经完成,直接返回任务状态;
     * 否则的话,判断当前线程是不是ForkJoinWorkerThread线程,如果不是执行externalAwaitDone方法
     * 如果是的话,判断如果需要join的任务刚刚好是当前线程所拥有的队列的top位置,
     * 这意味着当前工作线程下一个就将执行到它,则执行它。
     * 否则的话调用awaitJoin方法
     * */
    return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                    (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                            tryUnpush(this) && (s = doExec()) < 0 ? s :
                            wt.pool.awaitJoin(w, this, 0L) :
                    externalAwaitDone();
}

join的执行流程

image.png

externalAwaitDone的执行首先会设置任务的status为signal状态,这样该任务执行结束之后会调用notifyAll来唤醒自己;其次,阻塞自己,直到任务执行完成后把自己唤醒。

获取结果

public final V invoke() {
    int s;
    /*执行doInvoke,如果结果不是正常完成,则执行reportException*/
    if ((s = doInvoke() & DONE_MASK) != NORMAL)
        reportException(s);
    /*否则的话执行getRawResult获取结果*/
    return getRawResult();
}

private int doInvoke() {
    int s; Thread t; ForkJoinWorkerThread wt;
    /*
     * 如果当前任务执行后状态完成了,则返回执行后的状态
     * 否则的话,判断当前线程是否为ForkJoinWorkerThread线程
     * 如果是的话执行awaitJoin
     * 否则的话执行externalAwaitDone
     * */
    return (s = doExec()) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                    (wt = (ForkJoinWorkerThread)t).pool.
                            awaitJoin(wt.workQueue, this, 0L) :
                    externalAwaitDone();
}

ForkJoinTaskjoin()invoke()方法都可以用来获取任务的执行结果(另外还有get方法也是调用了doJoin来获取任务结果,但是会响应运行时异常),它们对外部提交任务的执行方式一致,都是通过externalAwaitDone方法等待执行结果。不同的是invoke()方法会直接执行当前任务;而join()方法则是在当前任务在队列 top 位时(通过tryUnpush方法判断)才能执行,如果当前任务不在 top 位或者任务执行失败调用ForkJoinPool.awaitJoin方法帮助执行或阻塞当前 join 任务。(所以在官方文档中建议了我们对ForkJoinTask任务的调用顺序,一对 fork-join操作一般按照如下顺序调用:a.fork(); b.fork(); b.join(); a.join();。因为任务 b 是后面进入队列,也就是说它是在栈顶的(top 位),在它fork()之后直接调用join()就可以直接执行而不会调用ForkJoinPool.awaitJoin方法去等待。)


上一篇下一篇

猜你喜欢

热点阅读