【精】JDK8中CompletableFuture+lambda
如何“快准狠”的使用线程池呢,JDK给出了一个工具类:CompletableFuture来简化开发。
1. 快速入手
1.1 执行单个任务
任务无非两种,一种没有返回值:Runnable,一种有返回值:Callable。而CompletableFuture可以理解为一个简化操作的工具类,其提供的API目的就是为了更加便利的进行流式操作。
- 提交任务的方法
方法 | 参数 | 作用 |
---|---|---|
supplyAsync | Supplier<U>,executor | 带有返回值的任务,自定义的线程池 |
runAsync | Runnable,executor | 没有返回值的任务,自定义的线程池 |
- 等待任务的方法
方法 | 作用 |
---|---|
join | 阻塞等待结果,但不会抛出uncheck异常 |
get | 阻塞等待结果,但会抛出uncheck异常 |
join和get区别:
- get()方法会
throws InterruptedException, ExecutionException
(继承了Exception类); - join()方法包装了异常,将异常封装为
CompletionException
异常(继承了RuntimeException类);
准备代码:
@Slf4j
public class TestBf {
static ThreadPoolExecutor executor =
new ThreadPoolExecutor(10, 10, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20));
static List<String> sources = Arrays.asList("tom", "liMing", "tony");
private static User createUser(String name) {
if (StringUtils.startsWith(name, "t")) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
throw new RuntimeException("eee");
}
User user = new User();
user.setName(name);
return user;
}
@Data
public static class User {
private String name;
}
}
测试代码:
/**
* 1. 子线执行supplyAsync内部方法,父线程调用join/get方法等待子线程执行结果。
* 2. 当不传递executor参数时,默认使用forkJoinPool线程池
*/
private static void testSupplyAsync() {
User user = CompletableFuture.supplyAsync(() -> {
log.info("执行supplyAsyn方法{}");
return createUser("t");
}, executor).join();
}
/**
* 1. 子线执行runAsync内部方法,父线程调用join/get方法等待子线程执行结果。
* 2. 当不传递executor参数时,默认使用forkJoinPool线程池
*/
private static void testRunAsync() {
CompletableFuture.runAsync(() -> {
log.info("执行runAsync方法{}");
}, executor).join();
}
1.2 执行多个任务
实际上,我们现实中需求一般要求并发批量处理一组数据,在最慢的子任务执行完毕后,统一的返回结果。
方法 | 作用 |
---|---|
allOf | 聚合多个CompletableFuture对象,全部执行完毕后才会返回结果 |
anyOf | 聚合多个CompletableFuture对象,最快的任务执行完毕后返回结果 |
join/get | 利用lambda特性。全部执行完毕才会返回结果 |
1.2.1 allOf实战
说下为什么allOf()的响应参数为CompletableFuture<Void>
对象,因为是将一批CompletableFuture对象(这里称为futures)进行聚合(可能响应对象不同),所以allOf()无法使用一个公共的响应对象,只能使用void。
获取结果时,依旧还得去遍历futures拿到每个子任务的响应对象。
private static void testAllOf() throws InterruptedException, ExecutionException {
//提交一批任务
List<CompletableFuture<User>> futures =
sources.stream().map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor))
.collect(Collectors.toList());
log.info("begin allOf");
//将这一批CompletableFuture对象,使用allOf操作得到一个CompletableFuture对象
CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}));
log.info("after allOf");
//等待所有结果执行完毕
allFutures.join();
//拿到所有结果(注意此处逻辑是主线程去进行的map操作,可进行优化)
List<User> list = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
log.info("执行完毕:{}", list);
}
优化版2:使用子线程链式的处理后续结果
private static void testAllOfAndThenApply() throws InterruptedException, ExecutionException {
//提交一批任务
List<CompletableFuture<User>> futures =
sources.stream().map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor))
.collect(Collectors.toList());
log.info("begin allOf");
//将这一批CompletableFuture对象,使用allOf操作得到一个CompletableFuture对象
CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}));
log.info("after allOf");
//allFutures(即所有futures)执行完毕后。执行thenApply()内部的逻辑,实现转化。因为此处依旧是交由子线程处理的,所以返回的依旧是CompletableFuture<List<User>>对象
CompletableFuture<List<User>> listCompletableFuture =
allFutures.thenApply(s -> futures.stream().map(r -> {
User user = r.join();
if (user.getName().startsWith("t")) {
sleepWithNoException(1000);
}
log.info("此处是子线程打印:{}", user);
return user;
}).collect(Collectors.toList()));
log.info("执行完毕:{}", listCompletableFuture.get());
}
1.2.2 anyOf实战
获取执行最快的子任务,某些场景下毕竟有用,响应对象是CompletableFuture<Object>
,还是一样的原因,因为聚合的子任务的响应对象可以是不同的。
获取结果时,需要强制。
private static void testAnyOf() throws InterruptedException, ExecutionException {
//提交一批任务
List<CompletableFuture<User>> userFutures =
sources.stream().map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor))
.collect(Collectors.toList());
//使用anyOf聚合
CompletableFuture<Object> anyFuture =
CompletableFuture.anyOf(userFutures.toArray(new CompletableFuture[] {}));
log.info("anyOf操作...");
//获取执行最快的任务
User user = (User) anyFuture.get();
log.info("最终输出{}", user);
}
1.2.3 join/get操作
private static void testBatchJoin() {
//当createUser抛出异常时,该方法也会抛出异常
List<User> users = sources.stream()
.map(s -> CompletableFuture.supplyAsync(() -> createUser(s))) //提交任务
.map(CompletableFuture::join) //等待结果
.collect(Collectors.toList()); //转化列表
log.info("打印最终参数:{}", users);
}
这个方法的也是可以并发处理多个任务,但是不如allOf()的就是map(CompletableFuture::join)
的操作实际上是主线程执行的。
2. 链式操作
2.1 子线程链式处理
CompletableFuture提供了丰富的链式操作逻辑。
方法 | 入参 | 作用 |
---|---|---|
thenApply | Function<? super T,? extends U> | 将传入的对象转换为另一个对象,响应对象依旧是CompletableFuture |
thenCompose | Function<? super T, ? extends CompletionStage<U>> | 连接两个CompletableFuture对象,响应对象依旧是CompletableFuture |
thenAccept | Consumer<? super T> action | 消费传入的对象,响应对象void |
thenRun | Runnable | 执行下一个任务 |
- thenXxx:即拿到上层流返回的结果(或者上层流执行完毕后),然后在将任务(theXxx内部逻辑)交由子线程处理。
- XxxAsync:任务(theXxx内部逻辑)交由新的子线程处理。
thenApply和thenCompose的区别
-
thenApply()转换的是泛型中的类型,是同一个CompletableFuture,相当于将CompletableFuture<T> 转换成CompletableFuture<U>
-
thenCompose()用来组合两个CompletableFuture,是生成一个新的CompletableFuture。
thenApply的案例:
private static void testAllOfAndThenApply() throws InterruptedException, ExecutionException {
//提交一批任务
List<CompletableFuture<User>> futures =
sources.stream().map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor))
.collect(Collectors.toList());
log.info("begin allOf");
//将这一批CompletableFuture对象,使用allOf操作得到一个CompletableFuture对象
CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}));
log.info("after allOf");
//allFutures(即所有futures)执行完毕后。执行thenApply()内部的逻辑,实现转化。因为此处依旧是交由子线程处理的,所以依旧是Com
CompletableFuture<List<User>> listCompletableFuture =
allFutures.thenApply(s -> futures.stream().map(r -> {
User user = r.join();
if (user.getName().startsWith("t")) {
sleepWithNoException(1000);
}
log.info("run thenApply:{}", user);
return user;
}).collect(Collectors.toList()));
log.info("执行完毕:{}", listCompletableFuture.get());
}
结果:可以看到thenApply
使用的是同一个CompletableFuture
,thenApply中的逻辑也是使用子线程进行处理。
17:34:50.827 [main] INFO com.tellme.obj.TestBf - begin allOf
17:34:50.831 [main] INFO com.tellme.obj.TestBf - after allOf
17:34:51.848 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=tom)
17:34:51.848 [pool-1-thread-3] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=tony)
17:34:52.848 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=liMing)
17:34:53.852 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run thenApply:TestBf.User(name=tom)
17:34:53.852 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run thenApply:TestBf.User(name=liMing)
17:34:54.857 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run thenApply:TestBf.User(name=tony)
17:34:54.857 [main] INFO com.tellme.obj.TestBf - 执行完毕:[TestBf.User(name=tom), TestBf.User(name=liMing), TestBf.User(name=tony)]
(等效)thenCompose的案例:
private static void testAllOfAndThenCompose() throws InterruptedException, ExecutionException {
//提交一批任务
List<CompletableFuture<User>> futures =
sources.stream()
.map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor)) //此时的线程为executor
.collect(Collectors.toList());
log.info("begin allOf");
//将这一批CompletableFuture对象,使用allOf操作得到一个CompletableFuture对象
CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}));
log.info("after allOf");
CompletableFuture<List<User>> listCompletableFuture = allFutures
.thenCompose(s -> CompletableFuture.supplyAsync(
() -> futures.stream().map(future -> {
log.info("print thenCompose request:{}", s);
User user = future.join();
if (user.getName().startsWith("t")) {
sleepWithNoException(1000);
}
log.info("print thenCompose response:{}", user); //此时的线程为ForkJoinPool
return user;
}).collect(Collectors.toList())));
log.info("执行完毕:{}", listCompletableFuture.get());
}
结果:可以看到thenCompose
是将两个CompletableFuture
组合起来,然后返回结果,使用的子线程也并不是一个。
20:16:00.602 [main] INFO com.tellme.obj.TestBf - begin allOf
20:16:00.606 [main] INFO com.tellme.obj.TestBf - after allOf
20:16:01.636 [pool-1-thread-3] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=tony)
20:16:01.627 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=tom)
20:16:02.628 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=liMing)
20:16:02.633 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose request:null
20:16:03.636 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose response:TestBf.User(name=tom)
20:16:03.636 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose request:null
20:16:03.636 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose response:TestBf.User(name=liMing)
20:16:03.637 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose request:null
20:16:04.642 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose response:TestBf.User(name=tony)
20:16:04.642 [main] INFO com.tellme.obj.TestBf - 执行完毕:[TestBf.User(name=tom), TestBf.User(name=liMing), TestBf.User(name=tony)]
2.2 结果合并
方法 | 作用 |
---|---|
thenCombine | 两个CompletableFuture结合,有返回结果 |
thenAcceptBoth | 两个CompletableFuture结合,无返回结果 |
/**
* 无返回值的结合
*/
private static void testThenAcceptBoth() {
CompletableFuture<User> userCompletableFuture1 = CompletableFuture.supplyAsync(() -> createUser("1"));
CompletableFuture<User> userCompletableFuture2 = CompletableFuture.supplyAsync(() -> createUser("2"));
userCompletableFuture1.thenAcceptBoth(userCompletableFuture2, (r1, r2) -> {
log.info("打印数据{},{}", r1, r2);
}).join();
}
/**
* 有返回值的结合
*/
private static void testTheCombine() {
CompletableFuture<User> userCompletableFuture1 = CompletableFuture.supplyAsync(() -> createUser("1"));
CompletableFuture<User> userCompletableFuture2 = CompletableFuture.supplyAsync(() -> createUser("2"));
//两个结果结合
CompletableFuture<String> resFuture =
userCompletableFuture1.thenCombine(userCompletableFuture2, (r1, r2) -> r1.getName() + r2.getName());
log.info("两个结果的结合:{}" + resFuture.join());
}
2.3 结果(异常)回调
方法 | 参数 | 作用 |
---|---|---|
exceptionally | Function<Throwable, ? extends T> | 当子线程抛出异常时,将回调该方法,完成降级,该方法有返回值 |
whenComplete | BiConsumer<? super T, ? super Throwable> action | 当子线程执行完后/抛出异常,将调用该方法,该方法无返回值 |
handle | BiFunction<? super T, Throwable, ? extends U> | 当子线程执行完后/抛出异常,将调用该方法,该方法有返回值 |
exceptionally使用场景:
- 并发处理多个任务时,若一个任务抛出异常时,不希望去终止所有的任务时,可以使用该方法。
private static void testExceptionally() {
List<User> users = sources.stream()
.map(s -> CompletableFuture
.supplyAsync(() -> createUser(s), executor) //执行此方法,当此方法抛出异常,可能影响导致并发执行失败
.exceptionally(ex -> new User())) //当出现异常时,将回调exceptionally方法。完成降级
.map(CompletableFuture::join).collect(Collectors.toList());
System.out.println(users);
}
- thenXxx链式调用:防止雪崩效应(即前面流程出现异常,导致theXx内部逻辑不执行)
/**
* 防止雪崩效应。
*/
private static void testExceptionally() {
String res = CompletableFuture.supplyAsync(() -> {
log.info("a1 请求");
return "a1";
})
.thenApply(s -> {
int i = 1 / 0;
log.info("a2 请求参数{}", s);
return "a2";
})
.exceptionally(ex -> {
return "a2 error"; //此处是降级逻辑
}).thenApply(s -> {
log.info("a3 请求参数:{}", s); //此时s的值为a2 error
return "a3";
})
.join();
log.info("最终结果:{}", res);
}
结果:
21:14:50.631 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - a1 请求
21:14:50.633 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - a3 请求参数:a2 error
21:14:50.634 [main] INFO com.tellme.obj.TestBf - 最终结果:a3
whenComplete使用场景:
无论是否出现异常,肯定会回调whenComplete方法,但是出现异常后,main方法会被中断。
whenComplete参数是BiConsumer<? super T, ? super Throwable>
当触发方法后,并不会返回降级结果。
private static void testWhenComplete() {
String res = CompletableFuture.supplyAsync(() -> {
log.info("run supplyAsync ");
int i = 1 / 0; //出现异常
return createUser("1");
}, executor).thenApply(u -> {
log.info("run thenApply ");
return u.getName();
}).whenComplete((s, ex) -> {
log.info("run whenComplete {}", s);
}).join();
log.info("main {}:", res);
}
结果:
21:19:12.180 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run supplyAsync
21:19:12.184 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run whenComplete null
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ArithmeticException: / by zero
at com.tellme.obj.TestBf.lambda$testWhenComplete$5(TestBf.java:106)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 3 more
handle使用场景:
方法执行完毕,或者出现异常时,都会调用这个方法,都结果进行最终的后处理。
private static void testHandle() {
String join = CompletableFuture.supplyAsync(() -> {
log.info("run supplyAsync ");
return createUser("1");
}, executor).thenApply(u -> {
log.info("run thenApply ");
return u.getName();
}).handle((s, ex) -> s + "123") // 方法执行完毕,或者出现异常时,都会调用这个方法,都结果进行最终的后处理。
.join();
System.out.println(join);
}
结果:
21:22:33.607 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run supplyAsync
null123
好文阅读
Java8的CompletableFuture进阶之道
Java8 CompletableFuture(6) thenCompose和thenCombine的区别
历史文章
多线程——线程池ThreadPoolExecutor
SpringBoot2.x整合线程池(ThreadPoolTaskExecutor)