java 多线程 -- Fork/Join

2019-01-14  本文已影响0人  aix91

1. 简介

Fork/Join 框架,将大任务分解成可以直接执行的小任务,然后将小任务的结果汇总得到大任务的结果。通过这种方式,Fork/Join 能试着去使用所有可用的处理器,以达到加速处理多线程的目的。

2. ForkJoinPool

ForkJoinPool是该框架的核心,它实现了AbstractExecutorService,并且管理着工作线程(ForkJoinWorkerThread). ForkJoinPool并不会为每一个ForkJoinTask分配一个线程,而是为每一个工作线程分配一个deque, 每一个deque里面存放着多个ForkJoinTask。正是这样的结构,Fork/Join才能实现"work-stealing"算法,从而增加多线程的使用效率。

  1. 通过ForkJoinPool.common()方法创建一个ForkJoinPool实例:该方法默认会创建 "Runtime.getRuntime().availableProcessors() - 1"个工作线程。
  2. ForkJoinPool forkJoinPool = new ForkJoinPool(2);

3. ForkJoinTask<V>

ForkJoinTask是能在ForkJoinPool中执行的基础task,他有两个实现类:RecursiveAction(无返回值), RecursiveTask<V>(有返回值)

4. submit task to ForkJoinPool

5. ForkJoinPool VS ExecutorService

ForkJoinPool只是ExecutorService的一个补充。当一个任务可以递归分解成多个小任务时,使用ForkJoin效率会更高;当多个任务之间没有关联时,ExecutorService是更好的选择。

6. 实例

public class TopicCountTask extends RecursiveTask<List<TopicCountModel>> {
    private List<Function<List<Long>, TopicCountModel>> countFunctionList;
    private List<Long> trackIds;
    public TopicCountTask(List<Function<List<Long>, TopicCountModel>> countFunctionList, List<Long> trackIds) {
        this.countFunctionList = countFunctionList;
        this.trackIds = trackIds;
    }
    @Override
    protected List<TopicCountModel> compute() {
        if (countFunctionList.size() == 1) {
           //执行任务
            TopicCountModel model = countFunctionList.get(0).apply(trackIds);
            System.out.println("count service thread" + Thread.currentThread().getName());
            return Arrays.asList(model);
        } else {
            List<TopicCountModel> result = new ArrayList<>();
            int length = countFunctionList.size();
            int mid = length / 2;
            List<Function<List<Long>, TopicCountModel>> leftList = countFunctionList.subList(0, mid);
            List<Function<List<Long>, TopicCountModel>> rightList = countFunctionList.subList(mid, length);
            // 分解任务
            TopicCountTask taskLeft = new TopicCountTask(leftList, trackIds);
            TopicCountTask taskRight = new TopicCountTask(rightList, trackIds);
            taskLeft.fork();
            taskRight.fork();
            result.addAll(taskLeft.join());
            result.addAll(taskRight.join());
            return result;
        }
    }
}

        //创建ForkJoinPool
        forkJoinPool = new ForkJoinPool(5);
        //执行ForkJoinTask
        List<TopicCountModel> list = forkJoinPool.invoke(new TopicCountTask(functionList, trackIds));

上一篇下一篇

猜你喜欢

热点阅读