Java Stream的并行实现
作者: 一字马胡
转载标志 【2017-11-03】
更新日志
日期 | 更新内容 | 备注 |
---|---|---|
2017-11-03 | 添加转载标志 | 持续更新 |
并行与并发
关于并发与并行,需要弄清楚的是,并行关注于多个任务同时进行,而并发则通过调度来不停的切换多个任务执行,而实质上多个任务不是同时执的。并发,英文单词为:Concurrent。并行的英文单词为:parallel。如果想对并发和并行有一个比较直观的认识,可以参考下面这张图片:
并行与并发Fork/Join 框架与 Java Stream API
Fork/Join框架属于并行框架,关于Fork/Join框架的一些内容,可以参考这篇文章:Java Fork/Join并行框架。简单来说,Fork/Join框架可以将大的任务切分为足够小的任务,然后将小任务分配给不同的线程来执行,而线程之间通过工作窃取算法来协调资源,提前昨晚任务的线程可以去“窃取”其他还没有做完任务的线程的任务,而每一个线程都会持有一个双端队列,里面存储着分配给自己的任务,Fork/Join框架在实现上,为了防止线程之间的竞争,线程在消费分配给自己的任务时,是从队列头取任务的,而“窃取”线程则从队列尾部取任务。
Fork/Join框架通过fork方法来分割大任务,通过使用join来获取小任务的结果,然后组合成大任务的结果。关于Fork/Join任务模型,可以参考下面的图片:
关于Java Stream API的相关内容,可以参考该文章:Java Streams API。
Stream在实现上使用了Fork/Join框架来实现并发,所以使用Stream我们可以在不知不觉间就使得我们的程序跑得飞快,究其原因就是Stream使用了Fork/Join并发框架来处理任务,当然,你需要显示的指定Stream为parallel,否则Stream默认都是串行流。比如对于Collection,你可以使用parallelStream来转换为一个并发流,或者使用stream方法转换为串行流,然后使用parallel操作使得串行流变为并发流。本文的重点是剖析Stream是如何使用Fork/Join来做并发的。
Stream的并发实现细节
在了解了Fork/Join并发框架和Java Stream之后,首要的问题就是:Stream是如何使用Fork/Join框架来做到并发的?其实对于使用者来说,了解Stream就是通过Fork/Join框架来做的就好了,但是如果想要深入了解一下Fork/Join框架的实践,以及Java Stream的设计方法,那么去读一下实现的源码还是很有必要的,下文中的分析仅代表个人观点!
需要注意的一点是,Java Stream的操作分为两类,也可以分为三类,具体的细节可以参考该文章:Java Streams API。一个简单的判断一个操作是否是Terminal操作还是Intermediate操作的方法是,如果操作返回的是一个新的Stream,那么就是一个Intermediate操作,否则就是一个Terminal操作。
-
Intermediate:一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据操作,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。
-
Terminal:一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。
-
还有一种操作被称为 short-circuiting。用以指:
- 对于一个 intermediate 操作,如果它接受的是一个无限大(infinite/unbounded)的 Stream,但返回一个 有限的新 Stream。
- 对于一个 terminal 操作,如果它接受的是一个无限大的 Stream,但能在有限的时间计算出结果。
Java Stream对四种类型的Terminal操作使用了Fork/Join实现了并发操作,下面的图片展示了这四种操作类型:
支持并行的四种Stream操作我们首先来走一遍Stream操作的执行路径,下面的代码是我们想要做的操作流,下文会根据该代码示例来跟踪Stream的执行路径:
Stream.of(1,2,3,4)
.parallel()
.map(n -> n*2)
.collect(Collectors.toCollection(ArrayList::new));
解释一下,上面的代码想要实现的功能是将(1,2,3,4)这四个数字每一个都变为其自身的两倍,然后收集这些元素到一个ArrayList中返回。这是一个非常简单的功能,下面是上面的操作流的执行路径:
step 1:
public static<T> Stream<T> of(T... values) {
return Arrays.stream(values);
}
step 2:
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
step 3:
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
...
container = evaluate(ReduceOps.makeRef(collector));
...
}
step 4:
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
step 5:
使用Fork/Join框架执行操作。
上面的五个步骤是经过一些省略的,需要注意的一点是,intermediate类型的操作仅仅将操作加到一个upstream里面,具体的原文描述如下:
Construct a new Stream by appending a stateless intermediate operation to an existing stream.
比如上面我们的操作中的map操作,实际上只是将操作加到一个intermediate链条上面,不会立刻执行。重点是第五步,Stream是如何使用Fork/Join来实现并发的。evaluate这个方法至关重要,在方法里面会分开处理,对于设置了并发标志的操作流,会使用Fork/Join来并发执行操作任务,而对于没有打开并发标志的操作流,则串行执行操作。
Fork/Join框架的核心方法是一个叫做compute的方法,下面分析一个forEach操作如何通过Fork/Join框架来实现并发,通过追踪代码,可以发现forEach的并发版本其实是一个交由一个ForEachTask对象来做,而ForEachTask类中实现了compute方法:
// Similar to AbstractTask but doesn't need to track child tasks
public void compute() {
Spliterator<S> rightSplit = spliterator, leftSplit;
long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
if ((sizeThreshold = targetSize) == 0L)
targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
boolean forkRight = false;
Sink<S> taskSink = sink;
ForEachTask<S, T> task = this;
while (!isShortCircuit || !taskSink.cancellationRequested()) {
if (sizeEstimate <= sizeThreshold ||
(leftSplit = rightSplit.trySplit()) == null) {
task.helper.copyInto(taskSink, rightSplit);
break;
}
ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
task.addToPendingCount(1);
ForEachTask<S, T> taskToFork;
if (forkRight) {
forkRight = false;
rightSplit = leftSplit;
taskToFork = task;
task = leftTask;
}
else {
forkRight = true;
taskToFork = leftTask;
}
taskToFork.fork();
sizeEstimate = rightSplit.estimateSize();
}
task.spliterator = null;
task.propagateCompletion();
}
}
在上面的代码中将大任务拆成成了小任务,那哪里收集了这些小任务呢?看下面的代码:
@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}
可以看到调用了invoke方法,而对invoke的描述如下:
* Commences performing this task, awaits its completion if
* necessary, and returns its result, or throws an (unchecked)
* {@code RuntimeException} or {@code Error} if the underlying
* computation did so.
不是说Fork/Join框架嘛?那有了fork为什么没有join而是invoke呢?下面是对join方法的描述:
* Returns the result of the computation when it {@link #isDone is
* done}. This method differs from {@link #get()} in that
* abnormal completion results in {@code RuntimeException} or
* {@code Error}, not {@code ExecutionException}, and that
* interrupts of the calling thread do <em>not</em> cause the
* method to abruptly return by throwing {@code
* InterruptedException}.
根据join的描述,我们知道还可以使用get方法来获取结果,但是get方法会抛出异常而join和invoke方法都不会抛出异常,而是将异常报告给ForkJoinTask,让ForkJoinTask来抛出异常。