ParallelStream源码解读

2019-07-11  本文已影响0人  firefly_

先看一下源码

AbstractPipeline.class

    /**
     * Evaluate the pipeline with a terminal operation to produce a result.
     * 使用终端操作 terminalOp 对此流管道进行处理,处理过程中会从后往前链接形成流水线
     *
     * @param <R> the type of result
     * @param terminalOp the terminal operation to be applied to the pipeline.
     * @return the result
     */
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        // 此阶段的输出类型==终端操作的输入类型
        assert getOutputShape() == terminalOp.inputShape();
        // 不允许重复消费        
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        // 设置已消费标识
        linkedOrConsumed = true;
        // 使用终端操作并行或串行处理此流管道
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

     /**
     *  获取此阶段的源分割迭代器【数据源】
     */
    @SuppressWarnings("unchecked")
    private Spliterator<?> sourceSpliterator(int terminalFlags) {
        Spliterator<?> spliterator = null;
        // 1)源分割迭代器不为 null
        if (sourceStage.sourceSpliterator != null) {
            // 读取
            spliterator = sourceStage.sourceSpliterator;
            // 使用后置空
            sourceStage.sourceSpliterator = null;
        }
        // 2)分割迭代器通过 sourceSupplier 进行生成
        else if (sourceStage.sourceSupplier != null) {
            spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
            sourceStage.sourceSupplier = null;
        }
        else {
            throw new IllegalStateException(MSG_CONSUMED);
        }

        // 此流是并行的 && 流管道中存在有状态操作
        if (isParallel() && sourceStage.sourceAnyStateful) {
            // Adapt the source spliterator, evaluating each stateful op in the pipeline up to and including this pipeline stage.
            // The depth and flags of each pipeline stage are adjusted accordingly.
            int depth = 1;
            /**
             * 从源阶段开始处理,一直处理到当前阶段为止
             */
            for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
                    u != e;
                    u = p, p = p.nextStage) {

                int thisOpFlags = p.sourceOrOpFlags;
                // 当前处理阶段是有状态操作
                if (p.opIsStateful()) {
                    depth = 0;
                    // 当前操作是短路操作
                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
                        thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
                    }

                    spliterator = p.opEvaluateParallelLazy(u, spliterator);

                    // Inject or clear SIZED on the source pipeline stage based on the stage's spliterator
                    thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
                            ? thisOpFlags & ~StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SIZED
                                    : thisOpFlags & ~StreamOpFlag.IS_SIZED | StreamOpFlag.NOT_SIZED;
                }
                p.depth = depth++;
                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
            }
        }

        // 终端操作带有标识位
        if (terminalFlags != 0)  {
            // 将终端操作的标志位合并到最后一阶段中
            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
        }

        return spliterator;
    }

并行流处理
通过调用终端 sink 的 evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator)方法完成并行计算任务的创建和执行。

并行计算逻辑-图片来自 (https://www.jianshu.com/p/cf6ce7e14dcc)

再来看下stream包下面的AbstractTask.class是如何进行分割并处理任务的

AbstractTask类图
/**
     * Decides whether or not to split a task further or compute it
     * directly. If computing directly, calls {@code doLeaf} and pass
     * the result to {@code setRawResult}. Otherwise splits off
     * subtasks, forking one and continuing as the other.
     *
     * <p> The method is structured to conserve resources across a
     * range of uses.  The loop continues with one of the child tasks
     * when split, to avoid deep recursion. To cope with spliterators
     * that may be systematically biased toward left-heavy or
     * right-heavy splits, we alternate which child is forked versus
     * continued in the loop.
     */
    @Override
    public void compute() {
        Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
        // 读取分割迭代器的估计总元素个数
        long sizeEstimate = rs.estimateSize();
        // 读取单个任务的元素上限
        final long sizeThreshold = getTargetSize(sizeEstimate);
        boolean forkRight = false;
        @SuppressWarnings("unchecked") K task = (K) this;
        // 估计总元素数 > 阈值 && 尝试对 spliterator 进行二分
        while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
            K leftChild, rightChild, taskToFork;
            // 基于leftSpliterator 创建子任务并写入 leftChild
            task.leftChild  = leftChild = task.makeChild(ls);
            // 基于 rightSpliterator 创建子任务并写入 leftChild
            task.rightChild = rightChild = task.makeChild(rs);
            // 设置 task 的待完成任务数为 1【将会有一个子任务被 fork 进线程池中并行处理】
            task.setPendingCount(1);
            // 是否 forkRight【第一次 forkRight,接着 forkLeft,轮流交替】
            if (forkRight) {
                forkRight = false;
                // 更新待分割的 spliterator
                rs = ls;
                // 待处理的任务
                task = leftChild;
                // 待 fork 进线程池并行处理的任务
                taskToFork = rightChild;
            }
            else {
                forkRight = true;
                // 待处理的任务
                task = rightChild;
                // 待 fork 进线程池并行处理的任务
                taskToFork = leftChild;
            }
            taskToFork.fork();
            // 读取待分割 spliterator 的估计总元素个数
            sizeEstimate = rs.estimateSize();
        }
        // 此任务已经是叶子任务,则执行计算逻辑
        task.setLocalResult(task.doLeaf());
        // 此任务计算完毕后尝试完成主任务
        task.tryComplete();
    }
Fork/Join流程
上一篇下一篇

猜你喜欢

热点阅读