Java技术程序员Java开发那些事

Java Stream的并行实现

2017-09-29  本文已影响373人  一字马胡

作者: 一字马胡
转载标志 【2017-11-03】

更新日志

日期 更新内容 备注
2017-11-03 添加转载标志 持续更新

并行与并发

关于并发与并行,需要弄清楚的是,并行关注于多个任务同时进行,而并发则通过调度来不停的切换多个任务执行,而实质上多个任务不是同时执的。并发,英文单词为:Concurrent。并行的英文单词为:parallel。如果想对并发和并行有一个比较直观的认识,可以参考下面这张图片:

并行与并发

Fork/Join 框架与 Java Stream API

Fork/Join框架属于并行框架,关于Fork/Join框架的一些内容,可以参考这篇文章:Java Fork/Join并行框架。简单来说,Fork/Join框架可以将大的任务切分为足够小的任务,然后将小任务分配给不同的线程来执行,而线程之间通过工作窃取算法来协调资源,提前昨晚任务的线程可以去“窃取”其他还没有做完任务的线程的任务,而每一个线程都会持有一个双端队列,里面存储着分配给自己的任务,Fork/Join框架在实现上,为了防止线程之间的竞争,线程在消费分配给自己的任务时,是从队列头取任务的,而“窃取”线程则从队列尾部取任务。
Fork/Join框架通过fork方法来分割大任务,通过使用join来获取小任务的结果,然后组合成大任务的结果。关于Fork/Join任务模型,可以参考下面的图片:

Fork/Join的任务模型

关于Java Stream API的相关内容,可以参考该文章:Java Streams API

Stream在实现上使用了Fork/Join框架来实现并发,所以使用Stream我们可以在不知不觉间就使得我们的程序跑得飞快,究其原因就是Stream使用了Fork/Join并发框架来处理任务,当然,你需要显示的指定Stream为parallel,否则Stream默认都是串行流。比如对于Collection,你可以使用parallelStream来转换为一个并发流,或者使用stream方法转换为串行流,然后使用parallel操作使得串行流变为并发流。本文的重点是剖析Stream是如何使用Fork/Join来做并发的。

Stream的并发实现细节

在了解了Fork/Join并发框架和Java Stream之后,首要的问题就是:Stream是如何使用Fork/Join框架来做到并发的?其实对于使用者来说,了解Stream就是通过Fork/Join框架来做的就好了,但是如果想要深入了解一下Fork/Join框架的实践,以及Java Stream的设计方法,那么去读一下实现的源码还是很有必要的,下文中的分析仅代表个人观点!

需要注意的一点是,Java Stream的操作分为两类,也可以分为三类,具体的细节可以参考该文章:Java Streams API。一个简单的判断一个操作是否是Terminal操作还是Intermediate操作的方法是,如果操作返回的是一个新的Stream,那么就是一个Intermediate操作,否则就是一个Terminal操作。

Java Stream对四种类型的Terminal操作使用了Fork/Join实现了并发操作,下面的图片展示了这四种操作类型:

支持并行的四种Stream操作

我们首先来走一遍Stream操作的执行路径,下面的代码是我们想要做的操作流,下文会根据该代码示例来跟踪Stream的执行路径:

        Stream.of(1,2,3,4)
                .parallel()
                .map(n -> n*2)
                .collect(Collectors.toCollection(ArrayList::new));

解释一下,上面的代码想要实现的功能是将(1,2,3,4)这四个数字每一个都变为其自身的两倍,然后收集这些元素到一个ArrayList中返回。这是一个非常简单的功能,下面是上面的操作流的执行路径:


    step 1:
    
    public static<T> Stream<T> of(T... values) {
        return Arrays.stream(values);
    }
    
    step 2:
    
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }
    
    step 3:
    
        public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
            ...
            container = evaluate(ReduceOps.makeRef(collector));
            ...
    }
    
    step 4:
    
        final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
    
    step 5:
    
    使用Fork/Join框架执行操作。
    

上面的五个步骤是经过一些省略的,需要注意的一点是,intermediate类型的操作仅仅将操作加到一个upstream里面,具体的原文描述如下:


Construct a new Stream by appending a stateless intermediate operation to an existing stream.

比如上面我们的操作中的map操作,实际上只是将操作加到一个intermediate链条上面,不会立刻执行。重点是第五步,Stream是如何使用Fork/Join来实现并发的。evaluate这个方法至关重要,在方法里面会分开处理,对于设置了并发标志的操作流,会使用Fork/Join来并发执行操作任务,而对于没有打开并发标志的操作流,则串行执行操作。

Fork/Join框架的核心方法是一个叫做compute的方法,下面分析一个forEach操作如何通过Fork/Join框架来实现并发,通过追踪代码,可以发现forEach的并发版本其实是一个交由一个ForEachTask对象来做,而ForEachTask类中实现了compute方法:

// Similar to AbstractTask but doesn't need to track child tasks
        public void compute() {
            Spliterator<S> rightSplit = spliterator, leftSplit;
            long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
            if ((sizeThreshold = targetSize) == 0L)
                targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
            boolean forkRight = false;
            Sink<S> taskSink = sink;
            ForEachTask<S, T> task = this;
            while (!isShortCircuit || !taskSink.cancellationRequested()) {
                if (sizeEstimate <= sizeThreshold ||
                    (leftSplit = rightSplit.trySplit()) == null) {
                    task.helper.copyInto(taskSink, rightSplit);
                    break;
                }
                ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
                task.addToPendingCount(1);
                ForEachTask<S, T> taskToFork;
                if (forkRight) {
                    forkRight = false;
                    rightSplit = leftSplit;
                    taskToFork = task;
                    task = leftTask;
                }
                else {
                    forkRight = true;
                    taskToFork = leftTask;
                }
                taskToFork.fork();
                sizeEstimate = rightSplit.estimateSize();
            }
            task.spliterator = null;
            task.propagateCompletion();
        }
    }

在上面的代码中将大任务拆成成了小任务,那哪里收集了这些小任务呢?看下面的代码:

        @Override
        public <S> Void evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<S> spliterator) {
            if (ordered)
                new ForEachOrderedTask<>(helper, spliterator, this).invoke();
            else
                new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
            return null;
        }

可以看到调用了invoke方法,而对invoke的描述如下:

     * Commences performing this task, awaits its completion if
     * necessary, and returns its result, or throws an (unchecked)
     * {@code RuntimeException} or {@code Error} if the underlying
     * computation did so.

不是说Fork/Join框架嘛?那有了fork为什么没有join而是invoke呢?下面是对join方法的描述:


     * Returns the result of the computation when it {@link #isDone is
     * done}.  This method differs from {@link #get()} in that
     * abnormal completion results in {@code RuntimeException} or
     * {@code Error}, not {@code ExecutionException}, and that
     * interrupts of the calling thread do <em>not</em> cause the
     * method to abruptly return by throwing {@code
     * InterruptedException}.
     

根据join的描述,我们知道还可以使用get方法来获取结果,但是get方法会抛出异常而join和invoke方法都不会抛出异常,而是将异常报告给ForkJoinTask,让ForkJoinTask来抛出异常。

上一篇下一篇

猜你喜欢

热点阅读