Java多线程

Java Fork/Join框架 学习笔记

2018-01-15  本文已影响47人  专职跑龙套

什么是 Fork/Join 框架

Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

比如计算 1+2+...+10000,可以分割成若干个子任务,每个子任务分别对 10 个数进行求和,最终汇总这若干个子任务的结果。

工作窃取算法

fork-join 最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的 CPU,那么如何利用好这个空闲的 CPU 就成了提高性能的关键。

fork-join 框架通过一种称作工作窃取(work stealing) 的技术减少了工作队列的争用情况。
每个工作线程都有自己的工作队列,这是使用双端队列(或者叫做 deque)来实现的。当一个任务划分一个新线程时,它将自己推到 deque 的头部。当一个任务执行与另一个未完成任务的合并操作时,它会将另一个任务推到队列头部并执行,而不会休眠以等待另一任务完成。当线程的任务队列为空,它将尝试从另一个线程的 deque 的尾部 窃取另一个任务。

可以使用标准队列实现工作窃取,但是与标准队列相比,deque 具有两方面的优势:减少争用和窃取。
因为只有工作线程会访问自身的 deque 的头部,deque 头部永远不会发生争用;因为只有当一个线程空闲时才会访问 deque 的尾部,所以也很少存在线程的 deque 尾部的争用。

Fork/Join 框架的介绍

Fork/Join 使用两个类来完成以上两件事情:

示例:计算 1+2+...+10000

public class SumTask extends RecursiveTask<Integer> {
    private static int THRESHOLD = 10;
    private int start;
    private int end;

    public SumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        // 如果任务足够小就计算任务
        if (end - start <= THRESHOLD) {
            for (int i = start; i <= end; i++) {
                sum = sum + i;
            }
        }
        // 否则,分割成2个子任务的计算
        else {
            int middle = (start + end) / 2;

            SumTask left = new SumTask(start, middle);
            SumTask right = new SumTask(middle + 1, end);

            // 执行子任务
            left.fork();
            right.fork();

            // 等待子任务执行结束,获得结果
            int leftResult = left.join();
            int rightResult = right.join();

            // 合并子任务的结果
            sum = leftResult + rightResult;
        }

        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();

        SumTask task = new SumTask(1, 10000);

        Future<Integer> future = pool.submit(task);

        try {
            System.out.println(future.get());
        } catch (Exception e) {
        }
    }
}

Fork/Join框架的实现原理

ForkJoinPoolForkJoinTask 数组和 ForkJoinWorkerThread 数组组成:

当我们调用 ForkJoinTaskfork 方法时,程序会调用 ForkJoinWorkerThreadpush 方法异步的执行这个任务,然后立即返回结果。

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

push 方法把当前任务存放在 ForkJoinTask 数组 queue 里。然后再调用 ForkJoinPoolsignalWork()
方法唤醒或创建一个工作线程来执行任务。

final void push(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a; ForkJoinPool p;
    int b = base, s = top, n;
    if ((a = array) != null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
        U.putOrderedInt(this, QTOP, s + 1);
        if ((n = s - b) <= 1) {
            if ((p = pool) != null)
                p.signalWork(p.workQueues, this);
        }
        else if (n >= m)
            growArray();
    }
}

Fork/Join VS ThreadPoolExecutor

ForkJoin 同 ThreadPoolExecutor 一样,也实现了 ExecutorExecutorService 接口。
public class ForkJoinPool extends AbstractExecutorService
它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的 CPU 数量会被设置为线程数量作为默认值。

ForkJoinPool 主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。
当使用 ThreadPoolExecutor 时,使用分治法会存在问题,因为 ThreadPoolExecutor 中的线程无法向任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用 ForkJoinPool 时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。

使用 ForkJoinPool 能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用 4 个线程来完成超过 200 万个任务。但是,使用 ThreadPoolExecutor 时,是不可能完成的,因为 ThreadPoolExecutor中的 Thread 无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。


引用:
聊聊并发(八)——Fork/Join框架介绍
应用 fork-join 框架
深入浅出parallelStream

上一篇下一篇

猜你喜欢

热点阅读