Java8 Stream
2018-06-27 本文已影响0人
zhackertop
Java8 Stream
Stream讲解示例: 找出前3短的单词并转成大写形式
List<String> words = Arrays.asList(
"One",
"Two",
"Three",
"Four",
"Five",
"Six"
);
实现一:命令式——迭代:
@Test
public void test_takeTop3ShortWords_iterative() {
List<String> filtered = new ArrayList<>();
for(String word : words){
if(word.length()<=4){
filtered.add(word);
}
}
filtered.sort(Comparator.comparing(String::length));
List<String> top3 = filtered.subList(0, Math.min(3, filtered.size())); //节省了一个迭代
List<String> result = new ArrayList<>(top3);
for(String word : top3){
result.add(word.toUpperCase());
}
}
缺点:
- 代码过于繁杂, 不太好直接看出意图。
- 代码的中间变量过多,命名困难。
实现二:声明式——stream
@Test
public void test_takeTop3ShortWords_stream(){
List<String> result = words.stream()
.filter(s-> s.length()<=4)
.sorted(Comparator.comparing(String::length))
.map(String::toUpperCase)
.limit(3)
.collect(Collectors.toList());
}
Stream的实现原理
操作的分类
操作的分类操作的串联
- 通过相关的操作对象PipelineHelper的构造函数,将操作先后关系维护起来(是反向的),维护一个代码块Sink链式结构: D(C(B(A)))
- 调用wrapSink,将代码块正向串连起来: A->B->C->D
- 分别调用begin、foreach accecpt, end 代码块
操作的协作契约
interface Sink<T> extends Consumer<T> {
void begin(long size); //执行一些初始化工作,如排序时要建一个临时表用于存储排序后的数据
void accept(T t); //继承自Consumer,指明如何处理单个数据。
void end(); //指明本操作结束时(所有数据都已经处理完),如何进行后续的处理。
boolean cancellationRequested(); //当下游不再需要更多数据时,可以用这个通知到上游操作。一般为短路操作。
}
自带拆分功能的新版迭代器-数据流的驱动器
public interface Iterator<E> {
boolean hasNext(); //是否还有更多数据
E next(); //获取下一个数据
void remove(); //删除最近一个数据
default void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
while (hasNext())
action.accept(next());
}
}
public interface Spliterator<T> {
long estimateSize(); //估算还有多少数据待迭代
boolean tryAdvance(Consumer<? super T> action); //处理单个数据
Spliterator<T> trySplit(); //用于并行流分解子任务
default void forEachRemaining(Consumer<? super T> action) {//默认调用tryAdvance去遍历,类似foreach
do { } while (tryAdvance(action));
}
}
流程示例
流程示例Collection.stream()
- StreamSupport.stream(spliterator(), false)
- Spliterators.spliterator(this, 0)
- new ReferencePipeline.Head
AbstractPipeline
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
Spliterator
class ArraySpliterator<E> implements Spliterator<E> {
@SuppressWarnings("unchecked")
@Override
public void forEachRemaining(Consumer<? super T> action) {
Object[] a; int i, hi; // hoist accesses and checks from loop
if (action == null)
throw new NullPointerException();
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
do { action.accept((T)a[i]); } while (++i < hi);
}
}
}
filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
sorted
- SortedOps.makeRef(this)
class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
@Override
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
// If the input is already naturally sorted and this operation
// also naturally sorted then this is a no-op
if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedRefSortingSink<>(sink, comparator);
else
return new RefSortingSink<>(sink, comparator);
}
}
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
private ArrayList<T> list;
RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
super(sink, comparator);
}
@Override
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
}
@Override
public void end() {
list.sort(comparator);
downstream.begin(list.size());
if (!cancellationWasRequested) {
list.forEach(downstream::accept);
}
else {
for (T t : list) {
if (downstream.cancellationRequested()) break;
downstream.accept(t);
}
}
downstream.end();
list = null;
}
@Override
public void accept(T t) {
list.add(t);
}
}
map
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
limit
Sink<T> opWrapSink(int flags, Sink<T> sink) {
return new Sink.ChainedReference<T, T>(sink) {
long n = skip;
long m = limit >= 0 ? limit : Long.MAX_VALUE;
@Override
public void begin(long size) {
downstream.begin(calcSize(size, skip, m));
}
@Override
public void accept(T t) {
if (n == 0) {
if (m > 0) {
m--;
downstream.accept(t);
}
}
else {
n--;
}
}
@Override
public boolean cancellationRequested() {
return m == 0 || downstream.cancellationRequested();
}
};
}
collect
ReduceOp
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get(); //new ReducingSink()
}
class ReducingSink extends Box<R> implements AccumulatingSink<T, R, ReducingSink> {
@Override
public void begin(long size) {
state = seedFactory.get();
}
@Override
public void accept(T t) {
accumulator.accept(state, t);
}
@Override
public void combine(ReducingSink other) {
reducer.accept(state, other.state);
}
}