Stream

2022-03-15  本文已影响0人  ttyttytty

学科归属&背景

侧重点/目标

1.中间的操作结果存在哪里?内存的消耗情况怎么样?

2.中间操作的状态&短路是指什么

@Override
 public void end() {
     list.sort(comparator);
     downstream.begin(list.size());
     if (!cancellationWasRequested) {
         list.forEach(downstream::accept);
     }
     else {
         for (T t : list) {
             if (downstream.cancellationRequested()) break;
             downstream.accept(t);//等中间缓存的排序结果都ok end了,对排序后的结果,重新发起后续的操作
         }
     }
     downstream.end();
     list = null;
 }

3.传入的抽象方法实现,是存在哪里?是由哪个类怎么执行的?如何短路执行(中间取消)?

    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }
   @Override
   public final Stream<P_OUT> distinct() {
       return DistinctOps.makeRef(this);
   }

static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
       return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
                                                     StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
……
    @Override
    public void forEach(Consumer<? super P_OUT> action) {
        evaluate(ForEachOps.makeRef(action, false));
    }

    public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
                                                  boolean ordered) {
        Objects.requireNonNull(action);
        return new ForEachOp.OfRef<>(action, ordered);
    }
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
// **并行与否**
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))//ForkJoin框架,将原始数据不断拆分为更小的单元,对每一个单元做上述evaluateSequential类似的动作
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
       @Override
       public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                          Spliterator<S> spliterator) {
           return helper.wrapAndCopyInto(this, spliterator).get();
       }
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
// 从最后操作往前,包成新的Sink,复合combinedFlags
   final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
       Objects.requireNonNull(sink);

       for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
           sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
       }
       return (Sink<P_IN>) sink;
   }


// 第二个操作的节点,判断combinedFlags,如果短路了,即停止,然后重新反向执行?
// 未短路,执行此次的新Sink反向流的begin,依次执行操作的,递归发起下一个元素的
   @Override
   final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
       Objects.requireNonNull(wrappedSink);

       if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
           wrappedSink.begin(spliterator.getExactSizeIfKnown());// 流水ok,你准备一下
           spliterator.forEachRemaining(wrappedSink);// Collection实现spliterator,开始执行遍历元素,递归执行
           wrappedSink.end();
       }
       else {
           copyIntoWithCancel(wrappedSink, spliterator);
       }
   }

// ArrayList实现
public void forEachRemaining(Consumer<? super E> action) {
           int i, hi, mc; // hoist accesses and checks from loop
           ArrayList<E> lst; Object[] a;
           if (action == null)
               throw new NullPointerException();
           if ((lst = list) != null && (a = lst.elementData) != null) {
               if ((hi = fence) < 0) {
                   mc = lst.modCount;
                   hi = lst.size;
               }
               else
                   mc = expectedModCount;
               if ((i = index) >= 0 && (index = hi) <= a.length) {
                   for (; i < hi; ++i) {
                       @SuppressWarnings("unchecked") E e = (E) a[i];
                       action.accept(e);//各个操作通过Sink接口accept方法依次向下传递执行。
                   }
                   if (lst.modCount == mc)
                       return;
               }
           }
           throw new ConcurrentModificationException();
       }

4.泛型的使用

interface Builder<T> extends Sink<T> {

        /**
         * Builds the node.  Should be called after all elements have been
         * pushed and signalled with an invocation of {@link Sink#end()}.
         *
         * @return the resulting {@code Node}
         */
        Node<T> build();

        /**
         * Specialized @{code Node.Builder} for int elements
         */
        interface OfInt extends Node.Builder<Integer>, Sink.OfInt {
            @Override
            Node.OfInt build();
        }
Lists.<Person>newArrayList().stream()
        .collect(Collectors.groupingBy(Person::getType, HashMap::new, Collectors.toList()));

5.操作码是如何标识的。位的应用。sourceOrOpFlags

6.设计模式

责任链模式,一个接一个处理事件。

7.典型使用
collector&Collectors

知识迁移

1.并行的ForkJoinPool与多线程的关系,分治的思想如何有序拼接数据,如并行排序?

REF


[原来你是这样的 Stream —— 浅析 Java Stream 实现原理](https://zhuanlan.zhihu.com/p/47478339

上一篇 下一篇

猜你喜欢

热点阅读