Java8 Stream

2018-06-27  本文已影响0人  zhackertop

Java8 Stream

Stream讲解示例: 找出前3短的单词并转成大写形式

List<String> words = Arrays.asList(
      "One",
      "Two",
      "Three",
      "Four",
      "Five",
      "Six"
  );

实现一:命令式——迭代:

  @Test
  public void test_takeTop3ShortWords_iterative() {
    
    List<String> filtered = new ArrayList<>();
    for(String word : words){
      if(word.length()<=4){
        filtered.add(word);
      }
    }
    
    filtered.sort(Comparator.comparing(String::length));
    
    List<String> top3 = filtered.subList(0, Math.min(3, filtered.size())); //节省了一个迭代
    
    List<String> result = new ArrayList<>(top3);
    for(String word : top3){
      result.add(word.toUpperCase());
    }
  }

缺点:

实现二:声明式——stream

  @Test
  public void test_takeTop3ShortWords_stream(){
    
    List<String> result =  words.stream()
        .filter(s-> s.length()<=4)
        .sorted(Comparator.comparing(String::length))
        .map(String::toUpperCase)
        .limit(3)
        .collect(Collectors.toList());
  }

Stream的实现原理

操作的分类

操作的分类

操作的串联

  1. 通过相关的操作对象PipelineHelper的构造函数,将操作先后关系维护起来(是反向的),维护一个代码块Sink链式结构: D(C(B(A)))
  2. 调用wrapSink,将代码块正向串连起来: A->B->C->D
  3. 分别调用begin、foreach accecpt, end 代码块

操作的协作契约

interface Sink<T> extends Consumer<T> {
    void begin(long size); //执行一些初始化工作,如排序时要建一个临时表用于存储排序后的数据
    void accept(T t); //继承自Consumer,指明如何处理单个数据。
    void end(); //指明本操作结束时(所有数据都已经处理完),如何进行后续的处理。
    boolean cancellationRequested(); //当下游不再需要更多数据时,可以用这个通知到上游操作。一般为短路操作。
}

自带拆分功能的新版迭代器-数据流的驱动器

public interface Iterator<E> {
    boolean hasNext();  //是否还有更多数据
    E next(); //获取下一个数据
    void remove(); //删除最近一个数据
    
    default void forEachRemaining(Consumer<? super E> action) {
            Objects.requireNonNull(action);
            while (hasNext())
                    action.accept(next());
    }
}
public interface Spliterator<T> {

    long estimateSize(); //估算还有多少数据待迭代
    boolean tryAdvance(Consumer<? super T> action); //处理单个数据
    Spliterator<T> trySplit(); //用于并行流分解子任务

    default void forEachRemaining(Consumer<? super T> action) {//默认调用tryAdvance去遍历,类似foreach
        do { } while (tryAdvance(action));
    }
}

流程示例

流程示例

Collection.stream()

AbstractPipeline

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true;
        previousStage.nextStage = this;

        this.previousStage = previousStage;
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
                sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
}
        
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
}

 final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
                sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
}

        @Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
                wrappedSink.begin(spliterator.getExactSizeIfKnown());
                spliterator.forEachRemaining(wrappedSink);
                wrappedSink.end();
        }
        else {
                copyIntoWithCancel(wrappedSink, spliterator);
        }
}

Spliterator

class ArraySpliterator<E> implements Spliterator<E> {

        @SuppressWarnings("unchecked")
        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            Object[] a; int i, hi; // hoist accesses and checks from loop
            if (action == null)
                throw new NullPointerException();
            if ((a = array).length >= (hi = fence) &&
                (i = index) >= 0 && i < (index = hi)) {
                do { action.accept((T)a[i]); } while (++i < hi);
            }
        }
}

filter

    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

sorted

class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
        @Override
        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
                Objects.requireNonNull(sink);

                // If the input is already naturally sorted and this operation
                // also naturally sorted then this is a no-op
                if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
                        return sink;
                else if (StreamOpFlag.SIZED.isKnown(flags))
                        return new SizedRefSortingSink<>(sink, comparator);
                else
                        return new RefSortingSink<>(sink, comparator);
        }
}               
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
        private ArrayList<T> list;

        RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
            super(sink, comparator);
        }

        @Override
        public void begin(long size) {
            if (size >= Nodes.MAX_ARRAY_SIZE)
                throw new IllegalArgumentException(Nodes.BAD_SIZE);
            list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
        }

        @Override
        public void end() {
            list.sort(comparator);
            downstream.begin(list.size());
            if (!cancellationWasRequested) {
                list.forEach(downstream::accept);
            }
            else {
                for (T t : list) {
                    if (downstream.cancellationRequested()) break;
                    downstream.accept(t);
                }
            }
            downstream.end();
            list = null;
        }

        @Override
        public void accept(T t) {
            list.add(t);
        }
    }

map

    @Override
    @SuppressWarnings("unchecked")
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

limit

 Sink<T> opWrapSink(int flags, Sink<T> sink) {
                return new Sink.ChainedReference<T, T>(sink) {
                    long n = skip;
                    long m = limit >= 0 ? limit : Long.MAX_VALUE;

                    @Override
                    public void begin(long size) {
                        downstream.begin(calcSize(size, skip, m));
                    }

                    @Override
                    public void accept(T t) {
                        if (n == 0) {
                            if (m > 0) {
                                m--;
                                downstream.accept(t);
                            }
                        }
                        else {
                            n--;
                        }
                    }

                    @Override
                    public boolean cancellationRequested() {
                        return m == 0 || downstream.cancellationRequested();
                    }
                };
            }

collect

ReduceOp

 @Override
    public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();  //new ReducingSink()
    }
        
class ReducingSink extends Box<R> implements AccumulatingSink<T, R, ReducingSink> {
        @Override
        public void begin(long size) {
                state = seedFactory.get();
        }

        @Override
        public void accept(T t) {
                accumulator.accept(state, t);
        }

        @Override
        public void combine(ReducingSink other) {
                reducer.accept(state, other.state);
        }
}
上一篇 下一篇

猜你喜欢

热点阅读