Java8-Lambda
引子
首先Lambda配合Stream拥有很强大的数据处理能力,并且能够以更加清晰的表达方式描述数据,大大减少了代码的冗余。在平常开发中,能大大提高开发效率,学习它的目的也正因为如此,此文介绍了一些Lambda相关的知识以及一些注意事项,避免滥用反而起到反作用。
Lambda基本介绍
Lambda:可以理解为一种匿名函数:它没有名称,但它有参数列表、函数主体、返回类型,可能还有一个可以抛出的异常列表。
Lambda示例
// 在以前,我们使用匿名类是这样:
Thread t = new Thread(new Runnable() {
public void run(){
System.out.println("Hello world");
}
});
// 现在用Lambda表达式的话,看起来是这样:
Thread t = new Thread(() -> System.out.println("Hello world"));
从上面的例子中可以看出,采用匿名内部类和采用Lambda的写法,Lambda的写法明显更加精简和清晰了
Lambda最基本的构成
() -> System.out.println("Hello world");
image
- 参数列表: 空参则括号里面什么都不写
- ->: 把参数列表与Lambda主体分隔开
- Lambda主体: 代码具体逻辑
带入参,并且有返回值
// 第一种写法,如果Lambda主体部分不带花括号,可以不用写return,返回的具体类型编译器会自动推断
(String s1, String s2) -> s1.concat(s2);
// 第二种写法,如果Lambda主体部分加了花括号,要带返回值必须加上return,否者就是Void类型的匿名函数
(String s1, String s2) -> {
return s1.concat(s2);
};
默认方法
如果要在接口中添加新方法,则必须在实现该接口的类中提供其实现代码。为了解决这个问题,Java 8引入了默认方法的概念,它允许接口具有默认方法,而不会影响其实现类。默认方法不是抽象方法,子类实现了该接口会继承该默认实现,子类也可以覆盖该默认实现。
对于学习函数式接口关系不大,可以当做是一个新特性,如果不打算了解可以直接跳过。
子类可以继承接口的默认方法
// 定义接口1
interface MyInterface1 {
default void defaultMethod() {
System.out.println("defaultMethod1");
}
}
// 定义接口2,接口2继承了接口1,也默认继承了接口1的默认方法
interface MyInterface2 extends MyInterface1 {
}
static class A implements MyInterface2 {
}
static class B implements MyInterface1 {
}
public static void main(String[] args) {
A a = new A();
a.defaultMethod();
B b = new B();
b.defaultMethod();
}
// 输出
// defaultMethod1
// defaultMethod1
子类可以覆盖接口的默认方法
// 定义接口1
interface MyInterface1 {
default void defaultMethod() {
System.out.println("defaultMethod1");
}
}
static class A implements MyInterface1 {
public void defaultMethod() {
System.out.println("defaultMethod1 from MyInterface1");
}
}
public static void main(String[] args) {
A a = new A();
a.defaultMethod();
}
// 输出
// defaultMethod1 from MyInterface1
子类实现了两个拥有相同默认方法,可以通过:接口名称.super.方法名()调用
// 定义接口1
interface MyInterface1 {
default void defaultMethod() {
System.out.println("defaultMethod1");
}
}
interface MyInterface2 {
default void defaultMethod() {
System.out.println("defaultMethod2");
}
}
static class A implements MyInterface1,MyInterface2 {
// 此时必须实现该方法,否则通过不了编译
@Override
public void defaultMethod() {
// 如果要调用MyInterface2的默认方法,可以使用MyInterface2.super.defaultMethod();
MyInterface2.super.defaultMethod();
}
}
public static void main(String[] args) {
A a = new A();
a.defaultMethod();
}
// 输出
// defaultMethod2
子类对接口默认方法调用规则
- 类中的方法优先级最高。类或父类中声明的方法的优先级高于任何声明为默认方法的优先级。
- 如果无法依据第一条进行判断,那么子接口的优先级更高:函数签名相同时,优先选择拥有最具体实现的默认方法的接口。
- 最后,如果还是无法判断,继承了多个接口的类必须通过显式覆盖和调用期望的方法,显式地选择使用哪一个默认方法的实现。
上面第1条规则以及第3条规则都已经展示过,下面展示第二条规则,该类图继承关系如下:
按照规则2,MyInterface2比MyInterface1更加具体,所以A会调用MyInterface2的defalutMethod
interface MyInterface1 {
default void defaultMethod() {
System.out.println("defaultMethod1");
}
}
interface MyInterface2 extends MyInterface1{
default void defaultMethod() {
System.out.println("defaultMethod2");
}
}
static class A implements MyInterface1,MyInterface2 {
}
public static void main(String[] args) {
A a = new A();
a.defaultMethod();
}
// 输出
// defaultMethod2
接口静态方法
与接口的默认方法类似,需要加上关键字static,静态方法需要,并且由于定义是完整的并且方法是静态的,因此在实现类中不能覆盖或更改这些方法。
interface MyInterface1 {
default void defaultMethod() {
System.out.println("defaultMethod1");
}
static void staticMethod() {
System.out.println("staticMethod1");
}
}
public static void main(String[] args) {
MyInterface1.staticMethod();
}
// 输出
// staticMethod1
过于简单,就不上更多的例子了,下面直接说明与默认方法的区别就差不多了解了。
与默认方法的相同点:
- 静态方法与默认方法必须要有默认的实现;
- 静态方法与默认方法都不是抽象方法;
与默认方法的不同点:
- 子类实现了该接口,子类不会继承静态接口方法,并且也不能覆盖静态接口方法,但是子类可以定义与父接口一样的静态方法,如果要调用接口的静态方法只能以接口名称.方法名()来调用;接口的默认方法子类是可以继承默认方法并且也可以覆盖的
函数式接口
image说起Lambda,就必须了解函数式接口,因为要使用Lambda,必须在函数式接口上使用。
函数式接口:就是一个有且仅有一个抽象方法,但是可以有多个默认方法的接口,这样的接口可以隐式转换为Lambda表达式。一般在函数式接口上都有个注解@FunctionalInterface,该注解的作用类似@Override一样告诉编译器这是一个函数式接口,用于编译期间检测该接口是否仅有一个抽象方法,如果拥有多个则编译不通过。如下图所示
在函数式接口上使用lambda表达式
函数式接口可以被隐式转换为 lambda 表达式。
如下例子
Thread t = new Thread(() -> System.out.println("Hello world"));
我们可以看看Thread的构造:
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}
其中入参为Runnable类型的接口,继续查看Runnable接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
可以看出在jdk1.8中,Runnable就是一个函数式接口
Runnable r1 = () -> System.out.println("Hello world")
// 等价于
Runnable r2 = new Runnable() {
public void run(){
System.out.println("Hello world");
}
}
run()方法签名:参数列表为空,返回为void;lambda签名:() -> void 参数列表为空,返回为void可以看出Runnable的run方法签名与lambda的签名匹配,我们将这种对方法抽象描述叫作函数描述符
在java8中,提供了很多函数式接口,可以用于描述各种Lambda表达式的签名
函数式接口 | 函数描述符 |
---|---|
Predicate<T> | T->boolean |
Consumer<T> | T->void |
Function<T,R> | T->R |
Supplier<T> | ()->T |
UnaryOperator<T> | T->T |
BiPredicate<L,R> | (L,R)->boolean |
BiConsumer<T,U> | (T,U)->void |
BiFunction<T,U,R> | (T,U)->R |
这些都是较为常用的函数式接口,还有很多都在
java.util.function
包下,有兴趣可以自行查看。
Stream
一个新的抽象,称为流,可以以声明的方式处理数据。提供了一系列的api,使用类似sql语句直观的方式来提供对集合处理的高阶抽象。
另外还有两个特点:
- 流水线:很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大的流水线。这样做可以对操作进行优化, 比如延迟执行和短路。流水线的操作可以看作对数据源进行数据库式查询。
- 内部迭代:以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式。
示例
// 创建一个1至10的集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers
.stream() //将集合转化成串行流
.filter(i -> i % 2 == 0) //过滤掉奇数
.limit(3) //取出前三个元素
.map(Double::valueOf) // 将int类型转换成double类型
.forEach(System.out::println); // 迭代并打印出元素
// 最终输出结果:2.0 4.0 6.0
如果采用传统的For-Each迭代方式来处理集合,可以想象代码可不是这短短这几行了。
整个流水线的操作包含两个
- 中间操作:形成一条操作流水线。例如:filter()对流操作并返回一个流,limit()对流操作也会返回流,这样可以通过多个中间操作连接起来合成一个查询。注意:整个流水线中,除非触发了一个终端操作,否者中间操作不会执行任何处理。因为多个中间操作可以合并起来,在终端操作时一次性全部处理。
- 终端操作:执行流水线,生成处理结果。
方法引用
上面例子中有一行代码为:map(Double::valueOf)
,::这个写法是什么意思呢?
实际上Double::valueOf就是一个方法引用。map(Double::valueOf)
等价于map(element -> Double.value(element))
类名放在分隔符::前,方法的名称放在后面。
例如,Double::valueOf就是引用了Double类中定义的方法valueOf,并且不需要加括号;
方法引用就是Lambda表达式的快捷写法,例如:
-
(Integer i) -> Double.valueOf(i)
---Double::valueOf
-
(String s) -> System.out.println(s)
---System.out::println
-
(str, i) -> str.substring(i)
---String::substring
方法引用的种类
- 静态方法引用:ClassName::methodName
- 实例上的实例方法引用:instanceReference::methodName
- 超类上的实例方法引用:super::methodName
- 类型上的实例方法引用:ClassName::methodName
- 构造方法引用:Class::new
- 数组构造方法引用:TypeName[]::new
生成流
- Collection的stream()方法或者parallelStream() ,例如Arrays.asList(1,2,3).stream()。
- Arrays.stream(Object[]) 例如Arrays.stream(new int[]{1,2,3})。
- 使用流的静态方法,比如Stream.of(Object[]), IntStream.range(int, int) 或者 Stream.iterate(Object, UnaryOperator) ,如Stream.iterate(0, n -> n * 2) , 或者generate(Supplier<T> s) 如Stream.generate(Math::random)。
- BufferedReader.lines() 从文件中获得行的流。
- Files类的操作路径的方法,如list、find、walk等。
- 随机数流Random.ints()。
- 其它一些类提供了创建流的方法,如BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence), 和 JarFile.stream()。
- 更底层的使用StreamSupport,它提供了将Spliterator转换成流的方法。
// 列举一些常用创建流的例子
List<Integer> list = Arrays.asList(1, 2, 3);
// Collection的stream方法
Stream<Integer> stream = list.stream();
// Stream的of方法
Stream<List<Integer>> stream2 = Stream.of(list);
// BufferedReader的lines方法
BufferedReader bufferedReader = new BufferedReader(new FileReader("filePath"));
Stream<String> lines = bufferedReader.lines();
中间操作
filter
Stream<T> filter(Predicate<? super T> predicate)
返回此流中匹配元素组成的流
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers
.stream()
.filter(i -> i % 2 == 0) //过滤掉奇数
.forEach(System.out::println); // 终端操作,打印结果
// 输出:2 4 6 8 10
map
<R> Stream<R> map(Function<? super T, ? extends R> mapper)
返回一个流,该流的元素映射成另外的值,新的值类型可以与原来的类型不同
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers
.stream()
.map(i -> i + "str ") // 转换成String
.forEach(System.out::print); // 终端操作,打印结果
// 输出:1str 2str 3str 4str 5str 6str 7str 8str 9str 10str
mapToInt
IntStream mapToInt(ToIntFunction<? super T> mapper)
返回一个IntStream,该流的元素映射成int类型的流 IntStream:原始流
List<String> strings = Arrays.asList("1","2","3");
strings
.stream()
.mapToInt(Integer::parseInt) // 转换成int
.forEach(System.out::println); // 终端操作,打印结果
// 输出:int类型的 1 2 3
mapToLong,mapToDouble与mapToInt类似只不过原始类型不同而已,下面会单独讲解这三个原始流的作用及区别。
flatMap
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper)
返回一个Stream,和map类似,不同的是会将每个元素的扁平化。
flatMap的理解可能稍微有点难,通过下面两个例子来展示。
例子1:引用java8实战中的例子 将集合中的两个字符串根据字母去重复
List<String> strings = Arrays.asList("Hello", "World");
List<String> list = strings
.stream() // 将集合转成流
.map(s -> s.split("")) // 转换成['H','e','l','l','o'],['W','o','r','l','d'] 两个数组
.flatMap(Arrays::stream) // 将两个数组扁平化成为['H','e','l','l','o','W','o','r','l','d'],实际上还是把两个数组再次转成流
.distinct() // 去除重复元素
.collect(Collectors.toList()); // 终端操作,转化成集合
System.out.println(list);
// 输出: [H, e, l, o, W, r, d]
引入java8实战的流程图如下:
image例子2:
List<Integer> numbers1 = Arrays.asList(1, 2, 3);
List<Integer> numbers2 = Arrays.asList(4, 5, 6);
Stream.of(numbers1, numbers2) // 将两个集合转成流
.flatMap(numbers -> numbers.stream()) // 两个集合流扁平化为[1,2,3,4,5,6]
.forEach(System.out::println);
// 输出: 1,2,3,4,5,6
image
distinct
Stream<T> distinct()
过滤流中重复的元素
Arrays.asList(1, 2, 3, 2, 3, 4)
.stream()
.distinct() // 去除重复
.forEach(System.out::println);
// 输出: 1234
sorted
Stream<T> sorted()
对流中的元素顺序排序
Arrays.asList(1, 3, 5, 2, 4)
.stream()
.sorted() // 顺序排序
.forEach(System.out::println);
// 输出: 12345
上面的例子中只支持顺序排序,如果要倒序呢?Stream中sorted还提供了一个重载方法:
Stream<T> sorted(Comparator<? super T> comparator);
可以通过传入Comparator来实现自己的排序规则
Arrays.asList(1, 3, 5, 2, 4)
.stream()
.sorted(Comparator.reverseOrder()) // 倒序排序
.forEach(System.out::println);
// 输出: 54321
limit
Stream<T> limit(long maxSize)
截取流,返回一个不超过给定长度的流
Arrays.asList(1, 2, 3, 4, 5)
.stream()
.limit(3) // 截取前三个元素
.forEach(System.out::println);
// 输出: 123
skip
Stream<T> skip(long n)
跳过给定长度的流
Arrays.asList(1, 2, 3, 4, 5)
.stream()
.skip(2) // 跳过前两个元素
.forEach(System.out::println);
// 输出: 345
parallel
S parallel()
将流转成并行流
Arrays.asList(1, 2, 3, 4, 5)
.stream()
.parallel() // 转成并行流
.forEach(System.out::println);
// 由于是并行的,每次输出结果都会不一致
// 输出: 1 5 2 4 3
parallel线程安全需要注意的点
直接上例子
// 反例1
for (int i = 0; i < 5; i++) {
List<Integer> list = new ArrayList<>();
IntStream.rangeClosed(1, 1000).parallel().forEach(element->{
list.add(element);
});
System.out.println(list.size());
}
// 输出:981 990 962 ...... 多次运行会发现每次结果都不一样,并且有时还会报ArrayIndexOutOfBoundsException数组越界
// 这边体现了在多线程中操作共享变量引发的问题,例如list容器当前容量为50,两个线程同时进入方法体,此时线程A持有的list里面有49个元素,线程B持有的list里面也是49个元素,然后线程A执行list.add()完成,此时容器内的元素的数量有50,由于线程之间不可见,线程B也进入到了add方法并且过了list容器扩容的检查,然后添加元素时发生ArrayIndexOutOfBoundsException
// 如果要能安全的新增,那么可以使用线程安全的容器
List<Integer> list = Collections.synchronizedList(new ArrayList<>());
List<Integer> list = new CopyOnWriteArrayList<>();
// 反例2
List<Integer> list = new ArrayList<>(1000);
long count = IntStream.rangeClosed(1, 1000).parallel().map(element -> {
list.add(element);
return element;
}).count();
long count = IntStream.rangeClosed(1, 1000).parallel().peek(element -> {
list.add(element);
}).count();
// 使用并行流时,不要去操作共享变量,以上例子皆为反例
parallel性能上需要注意的点
对并行流的效率进行测试,每台机器上的结果可能不一致,请自行注意。下面例子全部采用遍历五次,取其中最快的一次。
// 串行与并行流效率测试 基于i7 8核cpu
// 对100_000_000求和
// for求和性能测试
static void testFor(long size) {
List<Long> timeList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
long sum = 0;
long start = System.currentTimeMillis();
for (long j = 0L; j <= size; j++) {
sum += j;
}
long end = System.currentTimeMillis();
timeList.add((end - start));
}
System.out.println("For 处理时间:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}
// 并行流求和性能测试
static void testParallel(long size) {
List<Long> timeList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
long start = System.currentTimeMillis();
Stream.iterate(0L, (element -> element + 1L)).limit(size).parallel().reduce(0L,Long::sum);
long end = System.currentTimeMillis();
timeList.add((end - start));
}
System.out.println("ParallelStream 处理时间:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}
public static void main(String[] args) {
// 初始值
long size = 10_000_000L;
testFor(size);
testParallelStream(size);
}
// 输出为:
// For 处理时间:5ms
// ParallelStream 处理时间:254ms
为什么并行的会比传统For要慢,是因为Stream.iterate生成的是装箱对象,在求和过程中,装箱对象需要拆箱,计算完还会在装箱,数据量越大,那么采用装箱对象计算则会越慢。可以稍微更改一行代码:
Stream.iterate(0L, (element -> element + 1)).limit(size).parallel().reduce(0L,Long::sum);
更改为
Stream.iterate(0L, (element -> element + 1)).mapToLong(Long::longValue).limit(size).parallel().reduce(0L,Long::sum);
,这边生成流的时候先转成原始流,然后在去做计算
// 并行流求和性能测试
static void testParallelStream(long size) {
List<Long> timeList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
long start = System.currentTimeMillis();
Stream.iterate(0L, (element -> element + 1L)).mapToLong(Long::longValue).limit(size).parallel().reduce(0L,Long::sum);
long end = System.currentTimeMillis();
timeList.add((end - start));
}
System.out.println("ParallelStream 处理时间:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}
public static void main(String[] args) {
// 初始值
long size = 10_000_000L;
testFor(size);
testParallelStream(size);
}
// 输出为:
// For 处理时间:5ms
// ParallelStream 处理时间:143ms
// 可以看出提升了接近一倍的性能,在数据量更大的情况下,会更高。
// 在java8里,还提供了3个生成原始流的对象:LongStream,DoubleStream,IntStream,下面直接测试采用原始流来做测试
static void testParallelLongStream(long size) {
List<Long> timeList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
long start = System.currentTimeMillis();
LongStream.rangeClosed(0, size).parallel().sum();
long end = System.currentTimeMillis();
timeList.add((end - start));
}
System.out.println("ParallelLongStream 处理时间:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}
public static void main(String[] args) {
// 初始值
long size = 10_000_000L;
testFor(size);
testParallelStream(size);
testParallelLongStream(size)
}
// 输出为:
// For 处理时间:5ms
// ParallelStream 处理时间:148ms
// ParallelLongStream 处理时间:1ms
虽然将序列流转成并行流很容易,但是不恰当的使用反倒会成为负优化。在数据量不大的情况下,并行不一定比顺序的要快,反倒要慢上很多,因为数据量小的情况下,在线程的上下文切换之间的开销已经大于数据处理的开销了。以及在做数值计算的情况下,要留意是否是装箱对象,自动装箱拆箱在数据量大起来会成为性能上的累赘。
下面再看一个例子
static void testStructure(Collection<Long> c) {
List<Long> timeList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
long start = System.currentTimeMillis();
c.parallelStream().reduce(0L, Long::sum);
long end = System.currentTimeMillis();
timeList.add((end - start));
}
// 取五次中最快的一次
System.out.println("处理时间:" + (timeList.stream().mapToLong(Long::longValue)).min().getAsLong() + "ms");
}
public static void main(String[] args) {
// 使用ArrayList容器
ArrayList<Long> arrayList = Stream.iterate(1L, a -> a + 1L).limit(10_000_000L).collect(toCollection(ArrayList::new));
// 使用LinkedList容器
LinkedList<Long> linkedList = Stream.iterate(1L, a -> a + 1L).limit(10_000_000L).collect(toCollection(LinkedList::new));
testStructure(linkedList);
testStructure(arrayList);
}
// 输出
// 处理时间:420ms
// 处理时间:36ms
在选用数据结构上,可以看出ArrayList在并行中效率要高于LinkedList,这是因为ArrayList的拆分效率比LinkedList高得多,前者用不着遍历就可以平均拆分,而后者则必须遍历。
按照可分解性总结了一些流数据源适不适于并行
数据源 | 可分解性 |
---|---|
ArrayList | 极佳 |
IntStream.range | 极佳 |
HashSet | 好 |
TreeSet | 好 |
LinkedList | 差 |
Stream.iterate | 差 |
parallel操作上需要注意的点
image并行流底层使用的是java7引入的Fork/Join(并发框架),它可以以并行的方式将任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体的结果。此文不多描述,有兴趣者自行查阅。需要注意的是,使用并行流时,内部使用了默认的 ForkJoinPool,池的大小为默认的cup核数-1(java8实战说的是默认核数,如果看过此书的请自行测试),
Runtime.getRuntime().availableProcessors()
来查看cpu的核心数量。
在使用并行流时请注意,如果为IO密集型的并行,如果在多处使用,极有可能会影响所有的并行流,因为使用的是系统全局的ForkJoinPool,当池子里的线程被占用了,那么别处要使用线程只能等待它被释放。
// 模拟8个任务,独占线程并且不释放
Runnable runnable = () -> IntStream.rangeClosed(1, 8).parallel().forEach(c -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 启用任务
new Thread(runnable).start();
System.out.println("任务开始");
// 等待一会,让池子里的线程充分被占用
Thread.sleep(1000);
IntStream.rangeClosed(0, 1000).parallel().forEach(c -> {
try {
// 打印当前前程,查看是否使用了ForkJoinPool中的线程
System.out.println(Thread.currentThread().getName());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
image
可以看出,当池子里的线程被占用完,别的地方使用了并行流,完全变成了单线程执行。如果要避免这种情况,可以设置JVM启动参数
-Djava.util.concurrent.ForkJoinPool.common.parallelism=16
来设置ForkJoinPool的大小,也可以使用代码System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "16")
来设置全局的参数,以上两种方法及其不推荐,因为它将影响所有的并行流,推荐使用自定义ForkJoinPool的方式,如下所示
Runnable runnable = () -> IntStream.rangeClosed(1, 8).parallel().forEach(c -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 启用任务
new Thread(runnable).start();
System.out.println("任务开始");
// 设置一个容量为10的ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
// 执行任务
ForkJoinTask<?> submit = forkJoinPool.submit(() -> {
IntStream.rangeClosed(0, 20).parallel().forEach(c -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
});
while (!submit.isDone()) {
Thread.sleep(500);
}
image
sequential
S sequential()
将流转成序列流
Arrays.asList(1, 2, 3, 4, 5)
.parallelStream()
.sequential() // 转成序列流
.forEach(System.out::println);
// 输出: 1 2 3 4 5
终端操作
allMatch,anyMatch,noneMatch
boolean anyMatch(Predicate<? super T> predicate)
anyMatch:此流的任意元素有一个匹配返回ture,都不匹配返回false
boolean allMatch(Predicate<? super T> predicate)
allMatch:此流的所有元素是都匹配返回ture,否者为false
boolean noneMatch(Predicate<? super T> predicate)
noneMatch:此流中没有一个元素匹配返回ture,否者返回false
// 全部匹配
System.out.println(Stream.of(5, 6, 7, 8, 9).allMatch(i -> i >= 5)); // true
System.out.println(Stream.of(5, 6, 7, 8, 9).allMatch(i -> i > 5)); // false
// 任意一个匹配
System.out.println(Stream.of(5, 6, 7, 8, 9).anyMatch(i -> i > 5)); // true
System.out.println(Stream.of(5, 6, 7, 8, 9).anyMatch(i -> i > 9)); // false
// 都不匹配
System.out.println(Stream.of(5, 6, 7, 8, 9).noneMatch(i -> i > 5)); // false
System.out.println(Stream.of(5, 6, 7, 8, 9).noneMatch(i -> i > 9)); // true
reduce
聚合操作 sum()、max()、min()、count()调用的都是reduce
Optional<T> reduce(BinaryOperator<T> accumulator)
无初始值,按传入的lambda的累加规则来聚合数据
// 无默认值,求和
Optional<Integer> sum1 = Arrays.asList(1, 2, 3, 4, 5)
.stream()
.reduce((a, b) -> a + b);
System.out.println(sum1.get()); // 输出:15
T reduce(T identity, BinaryOperator<T> accumulator)
第一个参数为初始值,第二个参数为累加器(归并数据的lambda)
// 有默认值,求和
Integer sum2 = Arrays.asList(1, 2, 3, 4, 5)
.stream()
.reduce(5, (a, b) -> a + b);
System.out.println(sum2); // 输出:20
// 求最大值
Integer max = Arrays.asList(1, 2, 3, 4, 5)
.stream()
.reduce(0, Integer::max); // 也可以写成 reduce(0, (a, b) -> a > b ? a : b);
System.out.println(max); // 输出:20
<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner)
combiner:合并器,用于合并累加器的值,这个参数只有在并行流下才会生效
reduce操作可以并行进行,为了避免竞争,每个reduce线程都会有独立的result,combiner的作用在于合并每个线程的result得到最终结果。
Integer reduce = Arrays.asList(1, 2, 3, 4, 5)
.parallelStream()
.reduce(0, (a, b) -> a + b, (c, d) -> c + d);
System.out.println(reduce); // 输出:20
reduce在并行流中的注意事项
System.out.println(
Arrays.asList(1, 2, 3)
.parallelStream()
.reduce(0,(a, b) -> (a - b),(c, d) -> c + d)
);
// 如果无意料,那么输出将会是 -6,当运行程序的时候结果却是 -2,这与我们的预期结果大大不符
// 为什么会是-3呢,那么在序列流和并行流结果不一致,将以上代码修改一下,把参数和线程打印出来
System.out.println(
Arrays.asList(1, 2, 3)
.parallelStream()
.reduce(0,
(a, b) -> {
System.out.format("a:%s b:%s Thread:%s \n", a, b, Thread.currentThread().getName());
return a - b;
},
(c, d) -> {
System.out.format("c:%s d:%s Thread:%s \n", c, d, Thread.currentThread().getName());
return c - d;
}
));
image
累加器的输出:0-2,0-3,0-1
image
合并器的输出:-2 - (-3),-1-1
执行流程如下图所示
在并行流中,reduce计算的方式与序列流不同,这归根于fork/join的特殊性,所有任务不断拆分,如果有初始值,那么会在累加阶段会以每个初始值与流中的数据累加,例如初始值为1,执行一个求和的累加,那么如果有N个元素,那么最终结果值为SUM + (N * 1),在相乘,相加,相减等等计算在使用并行流时需要好好考虑由并行带来的影响,当然如果只是聚合计算(sum,avg,max,min)可以放心的使用,如果采用自定义计算规则,那么一定需要谨慎使用,并测试。
findFirst,findAny
Optional<T> findFirst()
返回此流的第一个元素的Optional,如果流为空,则返回空Optional。
Optional<T> findAny()
返回此流的任意一个元素的Optional,如果流为空,则返回空Optional。
findFirst在并行流中的执行代价非常大,需要注意
Optional<Integer> first = Arrays.asList(1, 2, 3, 4, 5)
.stream().findFirst();
System.out.println(first.get()); // 输出 1
Optional<Integer> any = Arrays.asList(1, 2, 3, 4, 5)
.stream().findAny();
System.out.println(any.get()); // 因为是顺序流,所以输出1
collect
<R, A> R collect(Collector<? super T, A, R> collector)
收集,对数据做聚合,将流转换为其他形式,比如List,Map,Integer,Long...
// 准备一些初始数据
@Data
@AllArgsConstructor
class Student {
private String name;
private Integer age;
}
// 初始化数据
Student student1 = new Student("zhangsan", 20);
Student student2 = new Student("lisi", 15);
Student student3 = new Student("wangwu", 10);
Student student4 = new Student("zhaoliu", 20);
List<Student> students = Arrays.asList(student1, student2, student3, student4);
// 如果要取出所有学生的姓名并转成集合可以写成
List<String> names = students.stream()
.map(Student::getName) // 获取name
.collect(Collectors.toList()); // 转成List
System.out.println(names); // 输出:[zhangsan, lisi, wangwu, zhaoliu]
// 以年龄为key,姓名为value转成Map可以写成
Map<Integer, String> map = students.stream()
.collect(Collectors.toMap(Student::getAge, Student::getName)); // 此写法会有问题,如果Map的key重复了,会报java.lang.IllegalStateException: Duplicate key 如果可以确保key不会重复就可以省略第三个参数
Map<Integer, String> map = students.stream()
.collect(Collectors.toMap(Student::getAge, Student::getName, (first, second) -> second)); // 前面两个参数是映射key和value,第三个参数为如果key重复了要如何处理,是保留旧的还是选择新的
System.out.println(map); // 输出:{20=zhaoliu, 10=wangwu, 15=lisi} 因为zhangsan和zhaoliu的年龄都是20,按照我们的策略,始终选择新的,所以key为20的value是zhaoliu
Map<Integer, List<Student>> groupByAge = students.stream()
.collect(Collectors.groupingBy(Student::getAge)); // 根据age分组
System.out.println(groupByAge);
// 输出:{20=[Student(name=zhangsan, age=20), Student(name=zhaoliu, age=20)], 10=[Student(name=wangwu, age=10)], 15=[Student(name=lisi, age=15)]}
<R> R collect(Supplier<R> supplier,BiConsumer<R, ? super T> accumulator,BiConsumer<R, R> combiner)
supplier:定义一个容器
accumulator:该容器怎么添加流中的数据
combiner:容器如何去聚合
// 仿Collectors.toList(),简单实现一个toList()
// 1.定义一个List容器
// 2.调用List的add方法将元素添加到容器中
// 3.采用List的addAll方法聚合容器
List<Integer> toList = Arrays.asList(1, 2, 3, 4).stream().collect(ArrayList::new, List::add, List::addAll);
System.out.println(toList);
// 输出:[1, 2, 3, 4]
// 仿Collectors.toMap(),简单实现toMap()
// 1.定义一个Map容器
// 2.调用Map的merge方法将元素添加到容器中
// 3.采用Map的putAll方法聚合容器
Map<Object, Object> map = students.stream()
.collect(HashMap::new,
(holder, element) -> {
holder.merge(element.getAge(), element.getName(), (u, v) -> {
return u;
// throw new IllegalStateException(String.format("Duplicate key %s", u));
});
}, Map::putAll);
System.out.println(map);
// 输出:{20=zhangsan, 10=wangwu, 15=lisi}
总结
- lambda由参数列表,箭头,主体组成。
- 函数式接口只能拥有一个抽象方法,可以拥有多个默认方法,多个静态方法。
- 方法引用实际就是Lambda的快捷写法。
- 流只能遍历一次。遍历完之后,我们就说这个流已经被消费掉了。你可以从原始数据源那里再获得一个新的流来重新遍历一遍。
- 并行流是采用ForkJoin实现的。
- 在并行流中,不要在peek,map中不要去修改外部数据。
- 并行流使用需要注意,不要靠猜测,请多测试。
- 接口默认方法,优先级最低,子类会继承默认方法并且可以覆盖默认方法。如果因为多继承问题引起冲突(子类实现了两个接口,两个接口都拥有相同的方法名,相同函数描述符),那么必须覆盖该方法,如果期望调用某接口中的默认方法,可以使用X.super.m(…)来显示调用哪个接口的默认方法。
- 接口静态方法,子类不会继承,也不能覆盖,但是可以定义一个名称相同返回值相同的普通或静态方法。