并发编程

Fork/Join框架运行原理

2019-08-05  本文已影响47人  xiaolyuh

Fork/Join框架的入门可以参考Fork/Join框架。Fork/Join框架的核心类有两个一个是ForkJoinPool,它主要负责执行任务;一个是ForkJoinTask,主要负责任务的拆分和结果的合并;

ForkJoinPool

它和ThreadPoolExecutor一样也是一个线程池的实现,并且同样实现了Executor和ExecutorServiceie接口,类图如下:


ForkJoinPool.png

核心内部类 WorkQueue

static final class WorkQueue {

    // 队列初始容量
    static final int INITIAL_QUEUE_CAPACITY = 1 << 13;

    // 队列最大容量
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

    volatile int qlock;        // 1: locked, < 0: terminate; else 0
    // 下一次出队的索引位
    volatile int base;         // index of next slot for poll
    // 下一次入队索引位
    int top;                   // index of next slot for push
    // 存放任务的容器
    ForkJoinTask<?>[] array;   // the elements (initially unallocated)
    final ForkJoinPool pool;   // the containing pool (may be null)
    // 执行当前队列任务的线程
    final ForkJoinWorkerThread owner; // owning thread or null if shared
    volatile Thread parker;    // == owner during call to park; else null

    WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
        this.pool = pool;
        this.owner = owner;
        // Place indices in the center of array (that is not yet allocated)
        base = top = INITIAL_QUEUE_CAPACITY >>> 1;
    }

    // 入队
    final void push(ForkJoinTask<?> task) {
        ForkJoinTask<?>[] a; ForkJoinPool p;
        int b = base, s = top, n;
        if ((a = array) != null) {    // ignore if queue removed
            int m = a.length - 1;     // fenced write for task visibility
            U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
            U.putOrderedInt(this, QTOP, s + 1);
            if ((n = s - b) <= 1) {
                if ((p = pool) != null)
                    p.signalWork(p.workQueues, this);
            }
            else if (n >= m)
                growArray();
        }
    }

    // 初始化扩展扩容ForkJoinTask<?>[]
    final ForkJoinTask<?>[] growArray() {
        ForkJoinTask<?>[] oldA = array;
        int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
        if (size > MAXIMUM_QUEUE_CAPACITY)
            throw new RejectedExecutionException("Queue capacity exceeded");
        int oldMask, t, b;
        ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
        if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
            (t = top) - (b = base) > 0) {
            int mask = size - 1;
            do { // emulate poll from old array, push to new array
                ForkJoinTask<?> x;
                int oldj = ((b & oldMask) << ASHIFT) + ABASE;
                int j    = ((b &    mask) << ASHIFT) + ABASE;
                x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
                if (x != null &&
                    U.compareAndSwapObject(oldA, oldj, x, null))
                    U.putObjectVolatile(a, j, x);
            } while (++b != t);
        }
        return a;
    }


    // 出队
    final ForkJoinTask<?> poll() {
        ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
        while ((b = base) - top < 0 && (a = array) != null) {
            int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
            t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
            if (base == b) {
                if (t != null) {
                    if (U.compareAndSwapObject(a, j, t, null)) {
                        base = b + 1;
                        return t;
                    }
                }
                else if (b + 1 == top) // now empty
                    break;
            }
        }
        return null;
    }
}

WorkQueue主要作用是接受外部提交的任务,并且支持工作窃取。

核心属性

// Instance fields
// 线程池控制位
volatile long ctl;                   // main pool control
// 线程池状态
volatile int runState;               // lockable status
// 工作队列数组
volatile WorkQueue[] workQueues;     // main registry

构造函数

private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

核心方法

execute()

public void execute(ForkJoinTask<?> task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
}

只是判断了一下任务是否为null,然后就调用了externalPush方法。

externalPush()

final void externalPush(ForkJoinTask<?> task) {
    // ws工作队列数组;q:存放当前任务的工作队列;m:工作队列数组最后一个索引位
    WorkQueue[] ws; WorkQueue q; int m;
    // 获取一个随机数
    int r = ThreadLocalRandom.getProbe();
    int rs = runState;
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
        // 找到存放当前任务的工作队列q
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
        // 获取q队列上的锁
        U.compareAndSwapInt(q, QLOCK, 0, 1)) {
        // a:q队列中存放任务的数组;am是数组长度;n:数组中的任务数;q:下一个入队的索引位
        ForkJoinTask<?>[] a; int am, n, s;
        if ((a = q.array) != null &&
            // 判断队列是否满了
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            // 计算存放任务的索引位
            int j = ((am & s) << ASHIFT) + ABASE;
            // 存放任务
            U.putOrderedObject(a, j, task);
            // 更新top指针(索引位)
            U.putOrderedInt(q, QTOP, s + 1);
            // 解锁
            U.putIntVolatile(q, QLOCK, 0);
            if (n <= 1)
                // 尝试创建或者激活线程
                signalWork(ws, q);
            return;
        }
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }
    // 完整版的push,处理一些不常见的情况,比如初始化工作队列数组workQueues,和新的工作队列workQueues[i]
    externalSubmit(task);
}
  1. 先判断工作队列数组workQueues数组是否为NULL,如果是直接走下面完整版的push方法
  2. 根据随机数,找到当前任务所需要放入的工作队列p=workQueues[i]
  3. 如果p为NULL直接走下面完整版的push方法
  4. 获取q队列上的锁
  5. 判断队列是否满了,如果是直接走下面完整版的push方法
  6. 任务入队
  7. 解锁

externalSubmit(task)整版的push方法,处理一些不常见的情况,比如初始化和扩容工作队列数组workQueuesworkQueues;新创建工作队列workQueues[i]

createWorker()

externalSubmit(task)方法中我们创建了工作队列后,我们需要将工作队列里面的工作线程启动起来,然后处理工作队列里面的任务,最终externalSubmit(task)方法会调用createWorker()方法创建工作线程,并启动线程。

private boolean createWorker() {
    ForkJoinWorkerThreadFactory fac = factory;
    Throwable ex = null;
    ForkJoinWorkerThread wt = null;
    try {
        // 创建工作线程,并将调用registerWorker方法将工作线程和工作队列做绑定
        if (fac != null && (wt = fac.newThread(this)) != null) {
            // 启动工程线程
            wt.start();
            return true;
        }
    } catch (Throwable rex) {
        ex = rex;
    }
    // 最后解绑工作线程和工作队列的关系
    deregisterWorker(wt, ex);
    return false;
}

runWorker()

ForkJoinWorkerThread.run()最终会调用ForkJoinPool.runWorker()方法,来循环的执行队列里面的任务。

final void runWorker(WorkQueue w) {
    // 分配队列中存储任务的数据
    w.growArray();                   // allocate queue
    int seed = w.hint;               // initially holds randomization hint
    int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
    // 自旋,执行队列中的任务
    for (ForkJoinTask<?> t;;) {
        // 获取任务
        if ((t = scan(w, r)) != null)
            // 执行任务
            w.runTask(t);
        // 等待任务
        else if (!awaitWork(w, r))
            break;
        r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
    }
}

这个方法最终回去调用到ForkJoinTask.exec()方法,进而调用到子类RecursiveTaskRecursiveActioncompute()方法,任务执行。

ForkJoinTask

fork()

当我们调用ForkJoinTask的fork方法时,程序会将任务放到队列里面去,然后异步地执行这个任务。代码如下:

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

push方法把当前任务存放到工作队列中的ForkJoinTask数组中,代码如下:

final void push(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a; ForkJoinPool p;
    int b = base, s = top, n;
    if ((a = array) != null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
        U.putOrderedInt(this, QTOP, s + 1);
        if ((n = s - b) <= 1) {
            if ((p = pool) != null)
                p.signalWork(p.workQueues, this);
        }
        else if (n >= m)
            growArray();
    }
}

join()

Join方法的主要作用是阻塞当前线程并等待获取结果,代码如下:

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}
private void reportException(int s) {
    if (s == CANCELLED)
        throw new CancellationException();
    if (s == EXCEPTIONAL)
        rethrow(getThrowableException());
}

首先,它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结 果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常 (EXCEPTIONAL)。

让我们再来分析一下doJoin()方法的实现代码。

private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                    (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                            // 执行任务
                            tryUnpush(this) && (s = doExec()) < 0 ? s :
                            wt.pool.awaitJoin(w, this, 0L) :
                    // 阻塞非工作线程,直到工作线程执行完毕
                    externalAwaitDone();
}
final int doExec() {
    int s; boolean completed;
    if ((s = status) >= 0) {
        try {
            completed = exec();
        } catch (Throwable rex) {
            return setExceptionalCompletion(rex);
        }
        if (completed)
            s = setCompletion(NORMAL);
    }
    return s;
}

在doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成, 则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。

参考

《java并发编程的艺术》

源码

https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-concurrent 工程

上一篇下一篇

猜你喜欢

热点阅读