手把手教你 Fork/Join 并发处理
ForkJoinPool 是 Java 7 加入的一个并发处理类,位于 java.util.concurrent 包。
Fork / Join 回顾
ForkJoinPool 让我们可以很方便地把任务分裂成几个更小的任务,这些分裂出来的任务也将会提交给 ForkJoinPool。任务可以继续分割成更小的子任务,只要它还能分割。分叉和合并原理包含两个递归进行的步骤。两个步骤分别是分叉步骤和合并步骤。
一个使用了分叉和合并原理的任务可以将自己分叉(分割)为更小的子任务,这些子任务可以被并发执行。如下图所示:
通过把自己分割成多个子任务,每个子任务可以由不同的 CPU 并行执行,或者被同一个 CPU 上的不同线程执行。
只有当给的任务过大,把它分割成几个子任务才有意义。把任务分割成子任务有一定开销,因此对于小型任务,这个分割的消耗可能比每个子任务并发执行的消耗还要大。
什么时候把一个任务分割成子任务是有意义的,这个界限也称作一个阀值。这要看每个任务对有意义阀值的决定。很大程度上取决于它要做的工作的种类。
当一个任务将自己分割成若干子任务之后,该任务将等待所有子任务结束。一旦子任务执行结束,该任务可以把所有结果合并到同一个结果。图示如下:
Fork / Join 的使用
Fork / Join 的使用主要涉及 ForkJoinPool 和 ForkJoinTask。ForkJoinTask 类定义了任务,实现了 Fork 和 Join 操作;ForkJoinPool 管理线程与任务的执行。
ForkJoinTask 类是一个抽象类,要求子类实现以下三个方法:
getRawResult :获取 ForkJoinTask 的执行结果;
setRawResult :设置 ForkJoinTask 的执行结果;
exec :ForkJoinTask 的执行逻辑,返回 true 表示正常返回;
为了方便开发,标准库提供了 ForkJoinTask 的一个子类 RecursiveTask。RecursiveTask 类也是一个抽象类,封装了上述 3 个方法的实现,要求子类实现一个方法 compute。这样一来,我们只需要实现 compute 一个方法就可以使用 ForkJoinTask 了。RecursiveTask 的定义如下:
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
V result;
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
protected final boolean exec() {
result = compute();
return true;
}
}
以计算数组所有数字的和为例,我们定义一个 Task 类继承 RecursiveTask,在 compute 方法中把数组一分为二,创建两个 Task 实例,分别调用 fork 方法,再分别调用 join 方法获取两个 Task 的计算结果,从而得到数组所有数字的和。
public class Task extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
long[] data;
int start;
int end;
public Task(long[] data, int start, int end) {
this.data = data;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if(end - start < 1000){
for(int i = start; i <= end; i++){
sum += data[i];
}
}else {
//分割任务
int middle = (start + end) / 2;
Task left = new Task(data,start,middle);
Task right = new Task(data,middle + 1,end);
left.fork();//fork 操作
right.fork();//fork 操作
sum = left.join() + right.join();//join操作
}
return sum;
}
}
使用 ForkJoinPool 的方法很简单,创建 ForkJoinPool 实例,然后调用 invoke 方法执行 ForkJoinTask 任务即可获得计算结果。
long[] data = new long[1024*1024];
Arrays.setAll(data, i -> i);
long sum = new ForkJoinPool().invoke(
new Task(data, 0, data.length - 1)
);
System.out.println(sum);
如果不需要获取计算的结果,比如需要执行一些没有返回值的操作,也可以调用 execute 方法。
每周 3 篇学习笔记或技术总结,内容涉及 Java 进阶、虚拟机、MySQL、NoSQL、分布式计算、开源框架等多个领域。关注作者或微信公众号 后端开发那点事儿 第一时间获取最新内容。