CompletableFuture入门
CompletableFuture介绍
Future 接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:
- 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
- 仅等待 Future 集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。
- 等待 Future 集合中的所有任务都完成。
- 当 Future 的完成事件发生时会收到通知,并能使用 Future 计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果。
等等之类的,新的CompletableFuture将使得这些成为可能。
CompletableFuture实现了Future<T>, CompletionStage<T>两个接口。所以还是可以像以前一样通过阻塞或轮询的方式获得结果,尽管这种方式不推荐使用。
虽说 CompletableFuture 实现了 Future 接口,但它多数方法源自于 CompletionStage。CompletionStage是一个接口,从命名上看得知是一个完成的阶段,它里面的方法也标明是在某个运行阶段得到了结果之后要做的事情。
如下我们使用new的方式创建一个CompletableFuture(这种方式不常用来创建一个CompletableFuture,此处仅为了说明情况),用阻塞的方式得到了结果:
public class CompletableFutureInAction {
private static Random RAMDOM = new Random(System.currentTimeMillis());
public static void main(String[] args)
throws ExecutionException, InterruptedException {
CompletableFuture<Double> completableFuture = new CompletableFuture();
new Thread(()->{
double value = get();
completableFuture.complete(value);
}).start();
System.out.print("===========no==block");
Optional.ofNullable(completableFuture.get()).ifPresent(System.out::println);
}
private static double get(){
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return RAMDOM.nextDouble();
}
}
运行之后会立刻输出:
===========no==block
过了n秒就会输出一个随机数:
0.39273128973
如果我们想实现方法完成时会收到通知,而不是一直阻塞在结果的获取上面,可以这样写:
public class CompletableFutureInAction {
private static Random RAMDOM = new Random(System.currentTimeMillis());
public static void main(String[] args)
throws ExecutionException, InterruptedException {
CompletableFuture<Double> completableFuture = new CompletableFuture();
new Thread(()->{
double value = get();
completableFuture.complete(value);
}).start();
System.out.print("===========no==block");
completableFuture.whenComplete((v,t)->{
Optional.ofNullable(v).ifPresent(System.out::println);
Optional.ofNullable(t).ifPresent(x->x.printStackTrace());
});
}
private static double get(){
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return RAMDOM.nextDouble();
}
}
CompletableFuture的基本使用
【创建CompletableFuture对象】
CompletableFuture.completedFuture是一个静态辅助方法,用来返回一个已经计算好的CompletableFuture。
public static <U> CompletableFuture<U> completedFuture(U value)
而以下四个静态方法用来为一段异步执行的代码创建CompletableFuture对象:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
- runAsync方法:它以Runnabel函数式接口类型为参数,所以CompletableFuture的计算结果为空。
- supplyAsync方法以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U。
这四个方法的参数类型都是函数式接口,所以可以使用lambda表达式实现异步任务。
【计算结果完成时的处理】
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到Action的类型是BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。
方法不以Async结尾,意味着Action使用相同的线程执行,而如果以Async结尾且没有指定Executor,则从ForkJoinPool.commonPool()中获取线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。
如果你用过Future,就会知道糟糕的时候有多糟糕。幸运的是,CompletableFuture有一个漂亮的对应手段,通过使用exceptionally。exceptionally给我们一个机会恢复,通过执行当异步执行的计算抛出exception时备选的方法(alternative method)。
代码示例:
public class CompletableFutureInAction2 {
private static Random RAMDOM = new Random(System.currentTimeMillis());
public static void main(String[] args)
throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(CompletableFutureInAction2::get)
.whenComplete((v,t)->{
Optional.ofNullable(v).ifPresent(System.out::println);
Optional.ofNullable(t).ifPresent(x->x.printStackTrace());
});
System.out.print("===========no==block");
}
private static double get(){
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return RAMDOM.nextDouble();
}
}
输出结果:
===========no==block
为什么没有结果输出呢?因为whenComplete里面的线程是守护线程,所有的“非后台线程”结束时,程序也就终止了,同时会杀死进程中所有后台线程:main就是一个非后台线程。所以运行之后会直接输出===========no==block
,但方法没有执行完那当然没有任何结果输出。如果我们想得到结果可以在 System.out.print("===========no==block");
后面加上一句:
Thread.currentThread().join();
这样main线程会等待CompletableFuture的线程结束之后才能继续运行。
对于线程池中的所有线程默认都为非后台线程,这样主线程退出时不会直接退出JVM,而是等待线程池结束。所以线程池的生命周期要比创建它们的程序生命周期要长,我们必须使用shutdown()方法手动退出。我们知道Executors可以用来创建一个线程池,如果我们想让Executors创建的线程池中的线程自动结束,可以使用如下方法:
public static void main(String[] args) {
// ExecutorService 默认不是守护线程 默认false setDaemon(false)
ExecutorService executor = Executors.newFixedThreadPool(2,r->{
// 对线程池里面的线程进行初始化设置
Thread t = new Thread();
t.setDaemon(true); // 设置线程为守护线程
return t;
});
executor.execute(()->System.out.print("test"));
// 不用执行shutdown,线程池中的线程随着main()方法执行完也随之退出
}
执行supplyAsync的线程是守护线程,所以main()函数执行完了这个线程也随之被杀死。而如果我们把线程池中的所有线程默认都转换为非后台线程,然后用这个线程池中的县城去执行supplyAsync,我们就不用依赖join(),这样主线程退出时不会直接退出JVM,我们就可以等到方法输出结果。
public class CompletableFutureInAction2 {
private static Random RAMDOM = new Random(System.currentTimeMillis());
public static void main(String[] args)
throws ExecutionException, InterruptedException {
AtomicBoolean finished = new AtomicBoolean(false);
ExecutorService executor = Executors.newFixedThreadPool(2,r->{
Thread t = new Thread();
t.setDaemon(false);
return t;
});
// CompletableFuture.supplyAsync(executor) whenComplete
// 上面两步都是 [pool-1-thread-1]执行 即两个步骤是同一个线程执行
CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executor)
.whenComplete((v,t)->{
Optional.ofNullable(v).ifPresent(System.out::println);
finished.set(true);
});
System.out.print("===========no==block");
/* while(!finished.get()){
Thread.sleep(1);
}*/
}
private static double get(){
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return RAMDOM.nextDouble();
}
}
做好了上面的准备,下面我们来看一下方法使用Async结尾和不使用有什么区别:
public class CompletableFutureInAction2 {
private static Random RAMDOM = new Random(System.currentTimeMillis());
public static void main(String[] args) {
ExecutorService executorPool = Executors.newFixedThreadPool(2, run -> {
Thread t = new Thread(run);
t.setDaemon(false);
return t;
});
//###1.0 CompletableFuture.supplyAsync(executorPool) thenApply whenComplete
CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executorPool)
.thenApply(t->{
System.out.println("##1.0## [thenApplyThreadName]=["+Thread.currentThread().getName()+"]");
return t*10;
})
.whenComplete((v,t)->{
System.out.println("##1.0## [whenCompleteThreadName]=["+Thread.currentThread().getName()+"]");
System.out.println(v);
t.printStackTrace();
});
//### 上面三步都是 [pool-1-thread-1]执行 即三个步骤是同一个线程执行
//###2.0 CompletableFuture.supplyAsync(executorPool) thenApplyAsync whenCompleteAsync
CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executorPool)
.thenApplyAsync(t->{
System.out.println("##2.0## [thenApplyThreadName]=["+Thread.currentThread().getName()+"]");
return t*10;
}).
whenCompleteAsync((v,t)->{
System.out.println("##2.0## [whenCompleteThreadName]=["+Thread.currentThread().getName()+"]");
System.out.println(v);
t.printStackTrace();
});
//### 上面三步supplyAsync=[pool-1-thread-2] 后面两个都是[ForkJoinPool.commonPool-worker-1]
//### 原因由于调用这个时thenApplyAsync,没有指定Executor executor,然后又是因为异步,默认采用ForkJoin的连接池
//### 由于工作量不大,ForkJoinPool并没有分配两个线程,ForkJoinPool.commonPool-worker-1承担了两份工作
//###3.0 CompletableFuture.supplyAsync(executorPool) thenApplyAsync whenCompleteAsync(executorpool)
CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executorPool)
.thenApply(t->{
System.out.println("##3.0## [thenApplyThreadName]=["+Thread.currentThread().getName()+"]");
return t*10;
}).
whenCompleteAsync((v,t)->{
System.out.println("##3.0## [whenCompleteThreadName]=["+Thread.currentThread().getName()+"]");
System.out.println(v);
t.printStackTrace();},executorPool);
//### 上面三步supplyAsync和thenApply 都是[pool-1-thread-3] whenCompleteAsync[pool-1-thread-4]
//### 原因由于调用这个时thenApply和前者同一线程 whenCompleteAsync指定了线程
}
private static double get(){
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return RAMDOM.nextDouble();
}
}