ForkJoinPool

2021-01-31  本文已影响0人  得力小泡泡

1、什么是ForkJoinPool

虽然目前处理器核心数已经发展到很大数目,但是按任务并发处理并不能完全充分的利用处理器资源,因为一般的应用程序没有那么多的并发处理任务。基于这种现状,考虑把一个任务拆分成多个单元,每个单元分别得到执行,最后合并每个单元的结果。 有些线程一直处于忙碌状态,而有些线程却一直空闲状态,这么可以充分的利用处理器资源

Fork/Join框架是JAVA7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干小任务,最终汇总每个小任务结果得到大任务结果的框架。

ForkJoinPool主要采用分治算法 和 工作窃取算法

2、分治法

分治法的基本思想是将一个规模为N的问题分解为K个规模较小的子问题,这些子问题的相互独立且与原问题的性质相同,求出子问题的解之后,将这些解合并,就可以得到原有问题的解。是一种分目标完成的程序算法。


image.png

3、工作窃取算法

一个大任务拆分成多个小任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列中,并且每个队列都有单独的线程来执行队列里的任务,线程和队列一一对应。

但是会出现这样一种情况:A线程处理完了自己队列的任务,B线程的队列里还有很多任务要处理。

A是一个很热情的线程,想过去帮忙,但是如果两个线程访问同一个队列,会产生竞争,所以A想了一个办法,从双端队列的尾部拿任务执行。而B线程永远是从双端队列的头部拿任务执行。

image.png

注意:线程池中的每个线程都有自己的工作队列(PS,这一点和ThreadPoolExecutor不同,ThreadPoolExecutor是所有线程公用一个工作队列,所有线程都从这个工作队列中取任务),当自己队列中的任务都完成以后,会从其它线程的工作队列中偷一个任务执行,这样可以充分利用资源。(特别注意)

工作窃取算法的优点:

利用了线程进行并行计算,减少了线程间的竞争。

工作窃取算法的缺点:

1、如果双端队列中只有一个任务时,线程间会存在竞争。(只有一个任务的时候需要特殊处理)

2、窃取算法消耗了更多的系统资源,如会创建多个线程和多个双端队列。

4、主要类

1、ForkJoinTask:

使用该框架,需要创建一个ForkJoin任务,它提供在任务中执行fork和join操作的机制。一般情况下,我们并不需要直接继承ForkJoinTask类,只需要继承它的子类,它的子类有两个:
RecursiveAction:用于没有返回结果的任务。
RecursiveTask:用于有返回结果的任务。

2、ForkJoinPool:

任务ForkJoinTask需要通过ForkJoinPool来执行。

5、例子

计算1到40的累加和,当间隔大于limit,则继续递归

package com.concurrency2;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class MyTest13 {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(2);
        MyTask myTask = new MyTask(1, 40);

        int result = forkJoinPool.invoke(myTask);

        System.out.println(result);

        forkJoinPool.shutdown();
    }
}

class MyTask extends RecursiveTask<Integer> {
    private int limit = 4;

    private int firstIndex;

    private int lastIndex;

    public MyTask(int firstIndex, int lastIndex) {
        this.firstIndex = firstIndex;
        this.lastIndex = lastIndex;
    }

    @Override
    protected Integer compute() {
        int result = 0;

        int gap = lastIndex - firstIndex;

        boolean flag = gap <= limit;

        if (flag) {
            System.out.println(Thread.currentThread().getName());

            for(int i = firstIndex;i <= lastIndex;i ++)
            {
                result += i;
            }
        } else {
            int middleIndex = (firstIndex + lastIndex) / 2;

            MyTask leftTask = new MyTask(firstIndex, middleIndex);
            MyTask rightTask = new MyTask(middleIndex + 1, lastIndex);

            invokeAll(leftTask, rightTask);

            int leftTaskResult = leftTask.join();
            int rightTaskResult = rightTask.join();

            result = leftTaskResult + rightTaskResult;
        }

        return result;
    }
}

输出

ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-0
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-0
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-0
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-0
820
上一篇 下一篇

猜你喜欢

热点阅读