JavaGuide知识点整理——CompletableFutur
其实CompletableFuture现在使用的比较多。很多开源框架都大量用到了。因此专门写一篇文章来介绍这个java8才被引入的一个非常有用的异步编程的类。
CompletableFuture简介
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
CompletableFuture同时实现了Future和CompletionStage接口。除了提供了更为好用和强大的Future特性之外,还提供了函数式编程的能力。
先从Future开始说起:
Future是个接口,比较简洁,有五个方法:

- cancel(boolean mayInterruptIfRunning):尝试取消执行的任务。
- isCancelled():判断任务是否被取消。
- isDone():判断任务是否已被执行完成。
- get():等待任务执行完成并获取运算结果。
- get(long timeout,TimeUtil unit):等待任务执行完成并获取运算结果,但是如果过了超时时间还没执行完也返回。
CompletionStage<T>接口的方法就比较多了,CompletableFuture的函数式能力就是这个接口赋予的。从这个接口的方法参数你就可以发现其中大量使用了java8引入方法函数式编程。

CompletionStage<T>接口中的方法比较多CompletableFuture的函数式能力就是这个接口赋予的从这个接口的方法参数就可以发现大量使用了java8引入的函数式编程。
常见操作
创建CompletableFuture
常见的创建CompletableFuture对象方法有两种:
- 通过new关键字
- 基于CompletableFuture自带的静态工厂方法:runAsync(),supplyAsync()
new关键字
通过new关键字创建CompletableFuture对象这种使用方式可以看做是将CompletableFuture当做Future来使用。
下面看一个简单的案例:
我们通过创建一个结果值类型为List<Integer>的CompletableFuture,可以把result看做是异步运算结果的载体。
CompletableFuture<List<Integer>> result = new CompletableFuture<>();
假设在某个时刻我们得到了最终结果,这时我们可以调用complete()方法为其传入结果,这表示result已经被完成了(我们可以理解为在这里把结果装进result中)。
result.complete(ids);//只有第一次调用有效
可以通过idDone()方法来检查是否已经完成。
public boolean isDone() {
return result != null;
}
获取异步计算的结果也非常简单,直接调用get方法即可
ids = result.get();

静态工厂方法
这两个方法可以帮助我们封装计算逻辑
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 使用自定义线程池(推荐)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
// 使用自定义线程池(推荐)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
runAsync()方法接收的参数的Runnable(执行run方法),这是一个函数式接口,不允许返回值。当你需要异步操作且不关心返回结果的时候可以使用runAsync()方法。
supplyAsync()方法接收的参数是Supplier<U>,这也是一个函数式接口,U是返回结果值的类型。当你需要异步操作且关系返回结果的时候,可以使用supplyAsync()方法。
下面是两个方法是使用案例:
public static void main(String[] args) throws Exception {
CompletableFuture<List<Integer>> result = CompletableFuture.supplyAsync(()->{
List<Integer> ids = Arrays.asList(1,2,3);
return ids;
});
System.out.println(result.get());
CompletableFuture noResult = CompletableFuture.runAsync(()-> System.out.println("这个没返回值"));
System.out.println(noResult.get());
}
这个控制台打印结果如下:

处理异步结算的结果
当我们获取到异步计算的结果之后,还可以对其进行下一步的处理。比较常用的方法有下面几个:
- thenApply()
- thenAccept()
- thenRun()
-
whenComplete()
thenApply()方法接受一个Function实例,用它来处理结果。使用示例如下:

这个方法可以接好多个,流式调用一直.就行。
如果不需要从回调函数中获取返回结果,可以使用thenAccept()或者thenRun(),这两个方法的区别在于thenRun()不能访问异步计算的结果。rhenRun()有以下几个方法:
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(defaultExecutor(), action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
下面是用法案例:
public static void main(String[] args) throws Exception {
CompletableFuture result =
CompletableFuture.runAsync(()-> System.out.println("执行了一系列计算")).thenRun(()-> System.out.println(
"执行完了计算通知")).thenRun(()-> System.out.println("附加了一些处理逻辑"));
System.out.println(result.isDone());
}
关于thenAccept()方法的参数是Consumer<? super T>.有以下几个方法:
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(defaultExecutor(), action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
顾名思义,Consumer属于消费型接口,它可以接收1个输入对象然后进行消费,使用案例如下:
public static void main(String[] args) throws Exception {
CompletableFuture.supplyAsync(()-> "执行完任务!").thenAccept(s -> {
System.out.println(s+"!!!");
});
CompletableFuture.runAsync(()-> System.out.println("执行计算!")).thenAccept(s-> System.out.println("添加了一些操作"));
}

因为这几个方法很容易搞混,所以简单说一下上面三者的区别:
- thenApply() 此方法是要有返回值的。无论之前的任务是否有返回值,反正执行完这个要给个返回值。
- thenAccept()此方法无返回值,凡是必须有个形参。无论之前的任务有没有返回值都可以,没有返回值会默认是null,也可以对付用。
- thenRun()此方法无返回值,且不管之前的任务有没有返回值都不能用。
whenComplete()的方法的参数是BiConsumer<? super T,? super Throwable>。有以下几个方法:
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(defaultExecutor(), action);
}
// 使用自定义线程池(推荐)
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
相比于Consumer,Biconsumer可以接收两个输入对象向后进行消费。多了一个异常定义。用法类似。
public static void main(String[] args) throws Exception {
CompletableFuture.supplyAsync(()-> "执行完任务!").whenComplete((str,ex)->{
System.out.println(str);
System.out.println(ex == null);});
}
这里ex == null的结果是会是true。
异常处理
可以通过handle()方法来处理任务执行过程中可能出现的抛出异常的情况。下面是使用案例:
public static void main(String[] args) throws Exception {
System.out.println(CompletableFuture.supplyAsync(()-> {
int i = 1/0;
return "返回结果";
}).handle((res,ex)->{
if(ex != null){
return "报错了";
}
return res;
}).get());
}
还可以通过exceptionally()方法来处理异常情况。注意exceptionally()是有异常的时候才会处理,否则正常返回。
public static void main(String[] args) throws Exception {
System.out.println(CompletableFuture.supplyAsync(()-> {
int i = 1/0;
return "返回结果";
}).exceptionally(ex->"处理异常了!").get());
System.out.println(CompletableFuture.supplyAsync(()-> {
return "返回结果";
}).exceptionally(ex->"处理异常了!").get());
}
如上代码的执行结果是这样的:

当然了如果想要CompletableFuture的结果就是异常的话,可以使用completeExceptionally()方法为其赋值:
public static void main(String[] args) throws Exception {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(()-> {
return "返回结果";
});
completableFuture.completeExceptionally(new Exception("手动加入一个异常!"));
System.out.println(completableFuture.get());
}
这个比较麻烦的一点是不能直接流式编程,要分开写。反正用法也比较简单。
组合CompletableFuture
我们可以使用thenCompose()按顺序连接两个CompletableFuture对象。thenCompose方法有如下几个:
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(null, fn);
}
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(defaultExecutor(), fn);
}
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
return uniComposeStage(screenExecutor(executor), fn);
}
使用示例如下:
public static void main(String[] args) throws Exception {
System.out.println(CompletableFuture.supplyAsync(()->"任务一执行完毕").thenCompose(s->CompletableFuture.supplyAsync(()->s+" \n任务二执行完毕")).get());
}
在实际开发中这个方法其实还挺有用的,比如说我们要先获取用户信息然后再利用用户信息去做别的事。和thernCompose()方法类似的还有thenCombine()方法,thenCombine()同样可以组合两个CompletableFuture对象。代码使用如下:
public static void main(String[] args) throws Exception {
CompletableFuture completableFuture =
CompletableFuture.supplyAsync(()->"任务一执行完毕").thenCompose(s->CompletableFuture.supplyAsync(()->s+" " +
"\n任务二执行完毕")).thenCombine(CompletableFuture.supplyAsync(()->"任务三执行完毕"),(s, s2) -> s+"\n"+s2);
System.out.println(completableFuture.get());
}

那么thenCombine()和thenCompose()有什么区别呢?
- thenCompose可以两个CompletableFuture对象,并将前一个任务返回的结果作为下一个任务的参数他们之间存在着先后顺序。可以说第一个任务没完成没办法开始第二个任务。
- thenCombine会两个任务都执行完成后,把两个任务的结果进行处理。两个任务是并行执行的,它们之间没有依赖顺序。
并行运行多个CompletableFuture
可以通过CompletableFuture的allOf()这个静态方法来并行运行多个CompletableFuture。
实际项目中,我们经常需要并行运行多个互不相关的任务,这些任务之间没有依赖关系,可以互相独立运行。
比如说我们要读取6个文件,这6个任务都是没有执行顺序依赖的任务。但是我们需要返回给用户的时候将这几个文件 的处理的结果进行统计整理,像这种情况我们就可以使用并行运行多个CompletableFuture来处理。示例代码如下:
public static void main(String[] args) throws Exception {
CompletableFuture task1 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(3000l);
}catch (Exception e){
}
return "执行任务1";
});
CompletableFuture task2 = CompletableFuture.supplyAsync(()->"执行任务2");
CompletableFuture task3 = CompletableFuture.supplyAsync(()->"执行任务3");
CompletableFuture task4 = CompletableFuture.supplyAsync(()->"执行任务4");
CompletableFuture task5 = CompletableFuture.supplyAsync(()->"执行任务5");
CompletableFuture task6 = CompletableFuture.supplyAsync(()->"执行任务6");
System.out.println(DateUtils.format(new Date(),"HH:mm:ss"));
System.out.println(CompletableFuture.allOf(task1,task2,task3,task3,task4,task5,task6).get()+DateUtils.format(new Date(),"HH:mm:ss"));
}

注意看这个运行结果,在所有的task都执行完了才会返回。
allOf()方法会等到所有的CompletableFuture都运行完成之后再返回。
当然了也可以调用.join()方法让程序等所有任务都运行完了再继续执行。
而anyOf()方法不会等待所有的CompletableFuture都运行完成,而是只要有一个执行完成即可。
案例如下:

代码和上面的差不多,但是换成anyOf以后,立刻返回了,明明task1睡了3秒也没影响。
这篇文章只是简单介绍了CompletableFuture常用的api,如果想深入学习可以自己去找资料看,如果本文对你有所帮助,记得点个喜欢点个关注~也祝大家工作顺顺利利,每天进步一点点!