Java8 Lamdba表达式及Stream详解
Lamdba表达式是Java8里面的新特性。简单地说就是把函数作为方法的参数(也叫做“匿名函数”),用前端领域用的很广泛的一个专业术语来讲叫做“闭包”。用了Lamdba表达式之后,代码变的更简洁明了。
一、各语言中的lamdba表达式
在计算机编程中,匿名函数(函数文字,lambda抽象或lambda表达式)是未绑定到标识符的函数定义。匿名函数通常是传递给高阶函数的参数,或者用于构造需要返回函数的高阶函数的结果。如果函数仅使用一次或有限次数,则匿名函数在语法上可能比使用命名函数轻。
自从 Lisp 于1958年以来,匿名函数一直是编程语言的一个特性,越来越多的现代编程语言支持匿名函数。
C ++中的Lambda表达式
在C ++ 11及更高版本中,lambda表达式(通常称为lambda)是一种方便的方法,可以在调用它或作为参数传递给函数的位置定义匿名函数对象(闭包)。通常,lambdas用于封装传递给算法或异步方法的几行代码。
#include <algorithm>
#include <cmath>
void abssort(float* x, unsigned n) {
std::sort(x, x + n,
// Lambda expression begins
[](float a, float b) {
return (std::abs(a) < std::abs(b));
} // end of lambda expression
);
}
Python中的Lambda表达式
可以使用lambda关键字创建小的匿名函数。此函数返回其两个参数的总和:lambda a,b:a + b。Lambda函数可以在需要函数对象的任何地方使用。它们在语法上限于单个表达式。从语义上讲,它们只是正常函数定义的语法糖。与嵌套函数定义一样,lambda函数可以引用包含范围的变量。
def make_incrementor(n):
return lambda x:x + n
f = make_incrementor(50)
f(1)
JavaScript ES6中的箭头功能(相当于匿名函数)
箭头函数是声明匿名函数的现代方法。与函数表达式相比,ES6胖箭头函数具有更短的语法,并且词法绑定this值。箭头功能始终是匿名的,并且有效地将 function (arguments) { expression } 变成 arguments => expression。如果在箭头后面使用表达式,则返回是隐式的,因此return不需要。
let square = x => x * x;
console.log(square(10));
let add = (a, b) => a + b;
console.log(add(3, 4));
二、Java 8里面有关 lamdba 的4个新特性:
- Lambda 表达式:Lambda允许把函数作为一个方法的参数(函数作为参数传递进方法中)。
- 方法引用:方法引用提供了非常有用的语法,可以直接引用已有Java类或对象(实例)的方法或构造器。与lambda联合使用,方法引用可以使语言的构造更紧凑简洁,减少冗余代码。
- Stream API:新添加的Stream API(java.util.stream) 把真正的函数式编程风格引入到Java中。
- Optional 类:Optional 类已经成为 Java 8 类库的一部分,用来解决空指针异常。
三、Lamdba表达式详解
(一)Lamdba的作用域
局部变量:
-
1.使用局部变量的时候,自动添加(final)
-
2.它不会从超类(supertype)中继承任何变量名,也不会引入一个新的作用域。lambda 表达式基于词法作用域,也就是说 lambda 表达式函数体里面的变量和它外部环境的变量具有相同的语义(也包括 lambda 表达式的形式参数)。
-
3.lambda 表达式对 值 封闭,对 变量 开放.
(二)Lambda表达式 vs 匿名类:
- 一个关键的不同点就是关键字 this。匿名类的 this 关键字指向匿名类,而lambda表达式的 this 关键字指向包围lambda表达式的类。
- 另一个不同点是二者的编译方式。Java编译器将lambda表达式编译成类的私有方法。使用了Java 7的 invokedynamic 字节码指令来动态绑定这个方法。
(三)Lamdba的使用详解
(四)方法引用的种类:
例如:System.out::println
就是方法引用。
- 静态方法引用:ClassName::methodName
对于静态方法引用,我们需要在类名和方法名之间加入::
分隔符,例如 Integer::sum
- 实例上的实例方法引用:instanceReference::methodName
我们则需要在对象名和方法名之间加入::
分隔符:
Set<String> set = new HashSet<>();
set.add("a");
set.add("b");
Predicate<String> result = set::contains;
- 超类上的实例方法引用:super::methodName
- 类型上的实例方法引用:ClassName::methodName
- 构造方法引用:Class::new
- 数组构造方法引用:TypeName[]::new
数组的构造方法引用的语法则比较特殊,为了便于理解,你可以假想存在一个接收 int 参数的数组构造方法。参考下面的代码:
IntFunction<int[]> arrayMaker = int[]::new;
int[] array = arrayMaker.apply(10) // 创建数组 int[10]
(五)函数式接口:
【注:】可以参考文章:https://segmentfault.com/a/1190000019953045
接口 | 函数 | 介绍 |
---|---|---|
BiConsumer<T, U> | void accept(T t, U u) | |
BiFunction<T, U, R> | R apply(T t, U u) | 传入两个对象T、U,根据传入的T、U返回R |
BinaryOperator<T> | BinaryOperator<T> minBy(Comparator<? super T> comparator) | 比较最小值 |
BinaryOperator<T> | BinaryOperator<T> maxBy(Comparator<? super T> comparator) | 比较最大值 |
BiPredicate<T, U> | boolean test(T t, U u) | 传入两个个对象,返回一个boolean |
Consumer<T> | void accept(T t) | 传入一个对象,没有返回值 |
Function<T,R> | R apply(T t) | 传入两个对象,根据T返回R |
Predicate<T> | boolean test(T t) | 传入一个对象,返回一个boolean |
Supplier<T> | T get() | 返回一个对象 |
UnaryOperator<T> | UnaryOperator<T> identity() | 返回接口本身 |
(六)集合遍历:
(七)异常处理和封装:
四、Stream详解
(一)Stream很多函数都用到了函数式接口
。下面简单讲一下Stream的用法。
【注】可以参考文章:https://blog.csdn.net/hxhaaj/article/details/80725857
(二)Stream 源码解析
【注:】原文:https://www.jianshu.com/p/102b3df5a4c1
Stream可以将所有的操作抽象为数据源(Source)、数据操作(Operator)、数据的产出(Sink)。
例如:
Stream<Person> stream2 = personList.stream();
stream2.filter(p -> p.getGender() == Person.Sex.FEMALE).forEach(p -> System.out.println(p.toString()));
personList是数据源。
System.out.println(p.toString()是数据的产出。
数据操作是filter以及forEach。
数据源:
Iterator是迭代器,提供访问一系列数据的接口,通过hasNext和next方法来顺序的遍历相关的数据。
splitrerator从名字可以看出,多了一个可以“split”,也就是提供可查分的能力,主要目的是提高数据遍历的并行处理能力。
我们看一个JDK自带的例子来认识下Spliterator类。
static <T> void parEach(TaggedArray<T> a, Consumer<T> action) {
Spliterator<T> s = a.spliterator();
long targetBatchSize = s.estimateSize() / (ForkJoinPool.getCommonPoolParallelism() 8);
new ParEach(null, s, action, targetBatchSize).invoke();
}
static class ParEach<T> extends CountedCompleter<Void> {
final Spliterator<T> spliterator;
final Consumer<T> action;
final long targetBatchSize;
ParEach(ParEach<T> parent, Spliterator<T> spliterator,
Consumer<T> action, long targetBatchSize) {
super(parent);
this.spliterator = spliterator; this.action = action;
this.targetBatchSize = targetBatchSize;
}
@Override
public void compute() {
Spliterator<T> sub;
while (spliterator.estimateSize() > targetBatchSize &&
(sub = spliterator.trySplit()) != null) {
addToPendingCount(1);
new ParEach<>(this, sub, action, targetBatchSize).fork();
}
spliterator.forEachRemaining(action);
propagateCompletion();
}
}
class TaggedArray<T> {
private final Object[] elements; // immutable after construction
TaggedArray(T[] data, Object[] tags) {
int size = data.length;
if (tags.length != size) throw new IllegalArgumentException();
this.elements = new Object[2 size];
for (int i = 0, j = 0; i < size; ++i) {
elements[j++] = data[i];
elements[j++] = tags[i];
}
}
public Spliterator<T> spliterator() {
return new TaggedArraySpliterator<>(elements, 0, elements.length);
}
static class TaggedArraySpliterator<T> implements Spliterator<T> {
private final Object[] array;
private int origin; // current index, advanced on split or traversal
private final int fence; // one past the greatest index
TaggedArraySpliterator(Object[] array, int origin, int fence) {
this.array = array; this.origin = origin; this.fence = fence;
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
for (; origin < fence; origin += 2)
action.accept((T) array[origin]);
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (origin < fence) {
action.accept((T) array[origin]);
origin += 2;
return true;
}
else // cannot advance
return false;
}
@Override
public Spliterator<T> trySplit() {
int lo = origin; // divide range in half
int mid = ((lo + fence) >>> 1) & ~1; // force midpoint to be even
if (lo < mid) { // split out left half
origin = mid; // reset this Spliterator's origin
return new TaggedArraySpliterator<>(array, lo, mid);
}
else { // too small to split
return null;
}
}
@Override
public long estimateSize() {
return (long)((fence - origin) / 2);
}
@Override
public int characteristics() {
return ORDERED | SIZED | IMMUTABLE | SUBSIZED;
}
}
}
可以通过trySplit方法来分离出一个更小数量级的迭代器,同时提供了forEachRemaining方法来遍历属于的数据部分。
Spliterator是整个流框架设计的重要部分,其充当了数据源的角色。可以很方便的通过StreamSupport.stream(spliterator(), false)将一个Spliterator对象转换为Stream对象。
数据的操作:
Stream流可以进行Operator(map、filter、limit)等操作,整个operator是懒加载的模式,其本身的调用并不会触发相关的实际操作。
filter操作是用来对已有的数据流进行过滤的操作,比如过滤出整数流中的偶数部分等,
例如:ReferencePipeline类的filter方法的具体实现:
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
fiter方法仅仅只是返回了一个另一个Stream对象,并没有对被调用的流进行过滤操作,
那这个filter什么时候会真实的执行呢,
从上面的例子中也可以看到,只有在onWrapSink调用生成的Sink对象执行accept时才真正的运行。
数据的产出:
只有流调用相关的终端操作的时候,比如foreach、take、collect等相关操作时才会触发真正的执行操作。
这些方法最终都是通过Sink接口的来完成操作,我们以collect这个操作来说明下整个的执行过程,其他的执行过程也大致类似。
例如:ReferencePipeline类的collect方法的具体实现:
@Override
@SuppressWarnings("unchecked")
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
我们先不考虑并行化处理的过程,先来看下13行的实现,
可以看到此处将collector分装为了一个ReduceOps对象,
ReduceOps对象其实也是提供了一个ReduceSink的对象来完成整个数据的输出处理。
我们打开ReduceOps类,我们看看makeRef方法的具体实现:
public static <T, I> TerminalOp<T, I>
makeRef(Collector<? super T, I, ?> collector) {
Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
BiConsumer<I, ? super T> accumulator = collector.accumulator();
BinaryOperator<I> combiner = collector.combiner();
class ReducingSink extends Box<I>
implements AccumulatingSink<T, I, ReducingSink> {
@Override
public void begin(long size) {
state = supplier.get();
}
@Override
public void accept(T t) {
accumulator.accept(state, t);
}
@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
@Override
public int getOpFlags() {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0;
}
};
}
上面的例子其实比较简单了,通过accept方法来接收流中的每一个元素,
将其利用accumulator对象和之前已经生成好的state进行聚合操作,
combine方法主要是考虑并行的情况下将两个任务最中的state合并为一个,
字符串链接例子中就是将两个子任务已经链接好的字符串再链接成一个字符串。
并行
Stream框架可以很方便的将任务改为并行化的操作,
其底层是通过ForkJoinPool来完成相关的并行化处理操作,
我们接着上面的字符串链接的例子讲,来看一下并行话处理的整个过程。
我们打开ReduceOps类,我们看看它的内部类ReduceOp的具体实现:
private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
implements TerminalOp<T, R> {
private final StreamShape inputShape;
/**
* Create a {@code ReduceOp} of the specified stream shape which uses
* the specified {@code Supplier} to create accumulating sinks.
*
* @param shape The shape of the stream pipeline
*/
ReduceOp(StreamShape shape) {
inputShape = shape;
}
public abstract S makeSink();
@Override
public StreamShape inputShape() {
return inputShape;
}
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
}
ReduceTask是并行化遍历最终会调用的方法,
ReduceTask是一个继承了ForkJoinPool框架中的AbstractTask实现类( 其是ForkJoinTask的子类),
其用于支持任务的拆分和合并。
通过invoke方法将任务提供到默认的ForkJoinPool的common pool中,
下面是最终放入到common pool中的实现过程。感兴趣的同学可以自行阅读相关源码。
ForkJoinPool是整个并行化框架的基础,其调度方式和线程池有极大的区别,此处不再一一赘述。
以下是ForkJoinTask类的externalAwaitDone方法:
private int externalAwaitDone() {
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll();
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
参考文章: