Java 杂谈移动开发作家群(719776724)分享专题Java

Java8 Lamdba表达式及Stream详解

2019-08-14  本文已影响32人  AWeiLoveAndroid

Lamdba表达式是Java8里面的新特性。简单地说就是把函数作为方法的参数(也叫做“匿名函数”),用前端领域用的很广泛的一个专业术语来讲叫做“闭包”。用了Lamdba表达式之后,代码变的更简洁明了。


一、各语言中的lamdba表达式

在计算机编程中,匿名函数(函数文字,lambda抽象或lambda表达式)是未绑定到标识符的函数定义。匿名函数通常是传递给高阶函数的参数,或者用于构造需要返回函数的高阶函数的结果。如果函数仅使用一次或有限次数,则匿名函数在语法上可能比使用命名函数轻。

自从 Lisp 于1958年以来,匿名函数一直是编程语言的一个特性,越来越多的现代编程语言支持匿名函数。

C ++中的Lambda表达式

在C ++ 11及更高版本中,lambda表达式(通常称为lambda)是一种方便的方法,可以在调用它或作为参数传递给函数的位置定义匿名函数对象(闭包)。通常,lambdas用于封装传递给算法或异步方法的几行代码。

#include <algorithm>
#include <cmath>
void abssort(float* x, unsigned n) {
    std::sort(x, x + n,
        // Lambda expression begins
        [](float a, float b) {
            return (std::abs(a) < std::abs(b));
        } // end of lambda expression
    );
}

Python中的Lambda表达式

可以使用lambda关键字创建小的匿名函数。此函数返回其两个参数的总和:lambda a,b:a + b。Lambda函数可以在需要函数对象的任何地方使用。它们在语法上限于单个表达式。从语义上讲,它们只是正常函数定义的语法糖。与嵌套函数定义一样,lambda函数可以引用包含范围的变量。

def make_incrementor(n):
    return  lambda x:x + n 

f = make_incrementor(50)
f(1)

JavaScript ES6中的箭头功能(相当于匿名函数)

箭头函数是声明匿名函数的现代方法。与函数表达式相比,ES6胖箭头函数具有更短的语法,并且词法绑定this值。箭头功能始终是匿名的,并且有效地将 function (arguments) { expression } 变成 arguments => expression。如果在箭头后面使用表达式,则返回是隐式的,因此return不需要。

let square = x => x * x;
console.log(square(10));

let add = (a, b) => a + b;
console.log(add(3, 4));

二、Java 8里面有关 lamdba 的4个新特性:


三、Lamdba表达式详解

(一)Lamdba的作用域

局部变量:

(二)Lambda表达式 vs 匿名类:

(三)Lamdba的使用详解

(四)方法引用的种类:

例如:System.out::println就是方法引用。

对于静态方法引用,我们需要在类名和方法名之间加入::分隔符,例如 Integer::sum

我们则需要在对象名和方法名之间加入::分隔符:

    Set<String> set = new HashSet<>();
    set.add("a");
    set.add("b");
    Predicate<String> result = set::contains;

数组的构造方法引用的语法则比较特殊,为了便于理解,你可以假想存在一个接收 int 参数的数组构造方法。参考下面的代码:

IntFunction<int[]> arrayMaker = int[]::new;
int[] array = arrayMaker.apply(10) // 创建数组 int[10]

(五)函数式接口:

【注:】可以参考文章:https://segmentfault.com/a/1190000019953045

接口 函数 介绍
BiConsumer<T, U> void accept(T t, U u)
BiFunction<T, U, R> R apply(T t, U u) 传入两个对象T、U,根据传入的T、U返回R
BinaryOperator<T> BinaryOperator<T> minBy(Comparator<? super T> comparator) 比较最小值
BinaryOperator<T> BinaryOperator<T> maxBy(Comparator<? super T> comparator) 比较最大值
BiPredicate<T, U> boolean test(T t, U u) 传入两个个对象,返回一个boolean
Consumer<T> void accept(T t) 传入一个对象,没有返回值
Function<T,R> R apply(T t) 传入两个对象,根据T返回R
Predicate<T> boolean test(T t) 传入一个对象,返回一个boolean
Supplier<T> T get() 返回一个对象
UnaryOperator<T> UnaryOperator<T> identity() 返回接口本身

(六)集合遍历:

(七)异常处理和封装:


四、Stream详解

(一)Stream很多函数都用到了函数式接口。下面简单讲一下Stream的用法。

【注】可以参考文章:https://blog.csdn.net/hxhaaj/article/details/80725857


(二)Stream 源码解析

【注:】原文:https://www.jianshu.com/p/102b3df5a4c1

Stream可以将所有的操作抽象为数据源(Source)、数据操作(Operator)、数据的产出(Sink)。
例如:

Stream<Person> stream2 = personList.stream();
stream2.filter(p -> p.getGender() == Person.Sex.FEMALE).forEach(p -> System.out.println(p.toString()));

personList是数据源。
System.out.println(p.toString()是数据的产出。
数据操作是filter以及forEach。

数据源:

Iterator是迭代器,提供访问一系列数据的接口,通过hasNext和next方法来顺序的遍历相关的数据。
splitrerator从名字可以看出,多了一个可以“split”,也就是提供可查分的能力,主要目的是提高数据遍历的并行处理能力。
我们看一个JDK自带的例子来认识下Spliterator类。

static <T> void parEach(TaggedArray<T> a, Consumer<T> action) {
    Spliterator<T> s = a.spliterator();
    long targetBatchSize = s.estimateSize() / (ForkJoinPool.getCommonPoolParallelism() 8);
    new ParEach(null, s, action, targetBatchSize).invoke();
}

static class ParEach<T> extends CountedCompleter<Void> {
    final Spliterator<T> spliterator;
    final Consumer<T> action;
    final long targetBatchSize;

    ParEach(ParEach<T> parent, Spliterator<T> spliterator,
            Consumer<T> action, long targetBatchSize) {
        super(parent);
        this.spliterator = spliterator; this.action = action;
        this.targetBatchSize = targetBatchSize;
    }

    @Override
    public void compute() {
        Spliterator<T> sub;
        while (spliterator.estimateSize() > targetBatchSize &&
                (sub = spliterator.trySplit()) != null) {
            addToPendingCount(1);
            new ParEach<>(this, sub, action, targetBatchSize).fork();
        }
        spliterator.forEachRemaining(action);
        propagateCompletion();
    }
}

class TaggedArray<T> {

    private final Object[] elements; // immutable after construction
    
    TaggedArray(T[] data, Object[] tags) {
        int size = data.length;
        if (tags.length != size) throw new IllegalArgumentException();
        this.elements = new Object[2 size];
        for (int i = 0, j = 0; i < size; ++i) {
            elements[j++] = data[i];
            elements[j++] = tags[i];
        }
    }

    public Spliterator<T> spliterator() {
        return new TaggedArraySpliterator<>(elements, 0, elements.length);
    }

    static class TaggedArraySpliterator<T> implements Spliterator<T> {
        private final Object[] array;
        private int origin; // current index, advanced on split or traversal
        private final int fence; // one past the greatest index

        TaggedArraySpliterator(Object[] array, int origin, int fence) {
            this.array = array; this.origin = origin; this.fence = fence;
        }

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            for (; origin < fence; origin += 2)
                action.accept((T) array[origin]);
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if (origin < fence) {
                action.accept((T) array[origin]);
                origin += 2;
                return true;
            }
            else // cannot advance
                return false;
        }

        @Override
        public Spliterator<T> trySplit() {
            int lo = origin; // divide range in half
            int mid = ((lo + fence) >>> 1) & ~1; // force midpoint to be even
            if (lo < mid) { // split out left half
                origin = mid; // reset this Spliterator's origin
                return new TaggedArraySpliterator<>(array, lo, mid);
            }
            else {      // too small to split
                return null;
            }
        }

        @Override
        public long estimateSize() {
            return (long)((fence - origin) / 2);
        }

        @Override
        public int characteristics() {
            return ORDERED | SIZED | IMMUTABLE | SUBSIZED;
        }
    }
}

可以通过trySplit方法来分离出一个更小数量级的迭代器,同时提供了forEachRemaining方法来遍历属于的数据部分。
Spliterator是整个流框架设计的重要部分,其充当了数据源的角色。可以很方便的通过StreamSupport.stream(spliterator(), false)将一个Spliterator对象转换为Stream对象。

数据的操作:


Stream流可以进行Operator(map、filter、limit)等操作,整个operator是懒加载的模式,其本身的调用并不会触发相关的实际操作。
filter操作是用来对已有的数据流进行过滤的操作,比如过滤出整数流中的偶数部分等,
例如:ReferencePipeline类的filter方法的具体实现:

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
            StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

fiter方法仅仅只是返回了一个另一个Stream对象,并没有对被调用的流进行过滤操作,
那这个filter什么时候会真实的执行呢,
从上面的例子中也可以看到,只有在onWrapSink调用生成的Sink对象执行accept时才真正的运行。

数据的产出:

只有流调用相关的终端操作的时候,比如foreach、take、collect等相关操作时才会触发真正的执行操作。
这些方法最终都是通过Sink接口的来完成操作,我们以collect这个操作来说明下整个的执行过程,其他的执行过程也大致类似。

例如:ReferencePipeline类的collect方法的具体实现:

@Override
@SuppressWarnings("unchecked")
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
    A container;
    if (isParallel()
            && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
            && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
        container = collector.supplier().get();
        BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
        forEach(u -> accumulator.accept(container, u));
    }
    else {
        container = evaluate(ReduceOps.makeRef(collector));
    }
    return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
            ? (R) container
            : collector.finisher().apply(container);
}

我们先不考虑并行化处理的过程,先来看下13行的实现,
可以看到此处将collector分装为了一个ReduceOps对象,
ReduceOps对象其实也是提供了一个ReduceSink的对象来完成整个数据的输出处理。

我们打开ReduceOps类,我们看看makeRef方法的具体实现:

public static <T, I> TerminalOp<T, I>
makeRef(Collector<? super T, I, ?> collector) {
    Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
    BiConsumer<I, ? super T> accumulator = collector.accumulator();
    BinaryOperator<I> combiner = collector.combiner();
    class ReducingSink extends Box<I>
            implements AccumulatingSink<T, I, ReducingSink> {
        @Override
        public void begin(long size) {
            state = supplier.get();
        }

        @Override
        public void accept(T t) {
            accumulator.accept(state, t);
        }

        @Override
        public void combine(ReducingSink other) {
            state = combiner.apply(state, other.state);
        }
    }
    return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
        @Override
        public ReducingSink makeSink() {
            return new ReducingSink();
        }

        @Override
        public int getOpFlags() {
            return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                    ? StreamOpFlag.NOT_ORDERED
                    : 0;
        }
    };
}

上面的例子其实比较简单了,通过accept方法来接收流中的每一个元素,
将其利用accumulator对象和之前已经生成好的state进行聚合操作,
combine方法主要是考虑并行的情况下将两个任务最中的state合并为一个,
字符串链接例子中就是将两个子任务已经链接好的字符串再链接成一个字符串。

并行

Stream框架可以很方便的将任务改为并行化的操作,
其底层是通过ForkJoinPool来完成相关的并行化处理操作,
我们接着上面的字符串链接的例子讲,来看一下并行话处理的整个过程。

我们打开ReduceOps类,我们看看它的内部类ReduceOp的具体实现:

private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
        implements TerminalOp<T, R> {
    private final StreamShape inputShape;

    /**
     * Create a {@code ReduceOp} of the specified stream shape which uses
     * the specified {@code Supplier} to create accumulating sinks.
     *
     * @param shape The shape of the stream pipeline
     */
    ReduceOp(StreamShape shape) {
        inputShape = shape;
    }

    public abstract S makeSink();

    @Override
    public StreamShape inputShape() {
        return inputShape;
    }

    @Override
    public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                       Spliterator<P_IN> spliterator) {
        return helper.wrapAndCopyInto(makeSink(), spliterator).get();
    }

    @Override
    public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                     Spliterator<P_IN> spliterator) {
        return new ReduceTask<>(this, helper, spliterator).invoke().get();
    }
}

ReduceTask是并行化遍历最终会调用的方法,
ReduceTask是一个继承了ForkJoinPool框架中的AbstractTask实现类( 其是ForkJoinTask的子类),
其用于支持任务的拆分和合并。
通过invoke方法将任务提供到默认的ForkJoinPool的common pool中,
下面是最终放入到common pool中的实现过程。感兴趣的同学可以自行阅读相关源码。
ForkJoinPool是整个并行化框架的基础,其调度方式和线程池有极大的区别,此处不再一一赘述。

以下是ForkJoinTask类的externalAwaitDone方法:

private int externalAwaitDone() {
    int s = ((this instanceof CountedCompleter) ? // try helping
            ForkJoinPool.common.externalHelpComplete(
                    (CountedCompleter<?>)this, 0) :
            ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
    if (s >= 0 && (s = status) >= 0) {
        boolean interrupted = false;
        do {
            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                synchronized (this) {
                    if (status >= 0) {
                        try {
                            wait(0L);
                        } catch (InterruptedException ie) {
                            interrupted = true;
                        }
                    }
                    else
                        notifyAll();
                }
            }
        } while ((s = status) >= 0);
        if (interrupted)
            Thread.currentThread().interrupt();
    }
    return s;
}

参考文章:

不同语言中的Lamdba表达式(匿名函数)
Java 8 lamdba官方介绍

上一篇 下一篇

猜你喜欢

热点阅读