java 多线程 -- Fork/Join
2019-01-14 本文已影响0人
aix91
1. 简介
Fork/Join 框架,将大任务分解成可以直接执行的小任务,然后将小任务的结果汇总得到大任务的结果。通过这种方式,Fork/Join 能试着去使用所有可用的处理器,以达到加速处理多线程的目的。
2. ForkJoinPool
ForkJoinPool是该框架的核心,它实现了AbstractExecutorService
,并且管理着工作线程(ForkJoinWorkerThread). ForkJoinPool并不会为每一个ForkJoinTask分配一个线程,而是为每一个工作线程分配一个deque
, 每一个deque里面存放着多个ForkJoinTask。正是这样的结构,Fork/Join才能实现"work-stealing"算法,从而增加多线程的使用效率。
-
work-stealing算法
work-stealing基本思路
通常情况下,一个工作线程从自己的 deque 中pop出一个ForkJoinTask (LIFO) 并执行。当自己的deque里没有task时,就会随机地从其他工作线程的 deque 中take 一个ForkJoinTask (FIFO) 并执行。当工作线程遇到join时,它会执行其他的task,这样所有的task都不会阻塞其他task的执行。 - ForkJoinPool创建
- 通过ForkJoinPool.common()方法创建一个ForkJoinPool实例:该方法默认会创建 "Runtime.getRuntime().availableProcessors() - 1"个工作线程。
- ForkJoinPool forkJoinPool = new ForkJoinPool(2);
3. ForkJoinTask<V>
ForkJoinTask是能在ForkJoinPool中执行的基础task,他有两个实现类:RecursiveAction(无返回值), RecursiveTask<V>(有返回值)
4. submit task to ForkJoinPool
- ForkJoin 的 invoke()方法 可以fork并执行task
- 先execute(task), 再join()获取结果
5. ForkJoinPool VS ExecutorService
ForkJoinPool只是ExecutorService的一个补充。当一个任务可以递归分解成多个小任务时,使用ForkJoin效率会更高;当多个任务之间没有关联时,ExecutorService是更好的选择。
6. 实例
- 创建Task
public class TopicCountTask extends RecursiveTask<List<TopicCountModel>> {
private List<Function<List<Long>, TopicCountModel>> countFunctionList;
private List<Long> trackIds;
public TopicCountTask(List<Function<List<Long>, TopicCountModel>> countFunctionList, List<Long> trackIds) {
this.countFunctionList = countFunctionList;
this.trackIds = trackIds;
}
@Override
protected List<TopicCountModel> compute() {
if (countFunctionList.size() == 1) {
//执行任务
TopicCountModel model = countFunctionList.get(0).apply(trackIds);
System.out.println("count service thread" + Thread.currentThread().getName());
return Arrays.asList(model);
} else {
List<TopicCountModel> result = new ArrayList<>();
int length = countFunctionList.size();
int mid = length / 2;
List<Function<List<Long>, TopicCountModel>> leftList = countFunctionList.subList(0, mid);
List<Function<List<Long>, TopicCountModel>> rightList = countFunctionList.subList(mid, length);
// 分解任务
TopicCountTask taskLeft = new TopicCountTask(leftList, trackIds);
TopicCountTask taskRight = new TopicCountTask(rightList, trackIds);
taskLeft.fork();
taskRight.fork();
result.addAll(taskLeft.join());
result.addAll(taskRight.join());
return result;
}
}
}
- 在ForkJoin中执行Task
//创建ForkJoinPool
forkJoinPool = new ForkJoinPool(5);
//执行ForkJoinTask
List<TopicCountModel> list = forkJoinPool.invoke(new TopicCountTask(functionList, trackIds));