CompletableFuture异步编排
在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
1、创建异步对象
CompletableFuture提供了四个静态方法来创建一个异步操作。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
- runAsync的两个重载方法都是没有返回值的,而supplyAsync是有返回值的。
- 都可以指定线程池或者使用默认的线程池
1.1、runAsync无返回值
public class ThreadTest {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
@Test
void test(){
System.out.println("test()方法开始了.....");
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
System.out.println("异步线程开始执行.....");
System.out.println("执行业务逻辑....");
System.out.println("异步线程执行结束.....");
}, executor);
System.out.println("test()方法结束.....");
}
}
执行结果:
test()方法开始了.....
test()方法结束.....
异步线程开始执行.....
执行业务逻辑....
异步线程执行结束.....
1.2、supplyAsync有返回值
@Test
void test() throws ExecutionException, InterruptedException {
System.out.println("test()方法开始了.....");
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("异步线程开始执行.....");
System.out.println("执行业务逻辑....");
System.out.println("异步线程执行结束.....");
return "我是返回值";
}, executor);
System.out.println("test()方法结束....."+stringCompletableFuture.get());
}
执行结果:
test()方法开始了.....
异步线程开始执行.....
执行业务逻辑....
异步线程执行结束.....
test()方法结束.....我是返回值
stringCompletableFuture.get()会阻塞当前线程,直到异步任务执行结束,才会恢复当前线程执行。
2、完成时回调方法
当异步任务执行完成之后,回调自定义的方法,CompletableFuture提供了以下四种使用方式:
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}
whenComplete方法和使用主线程执行任务,而whenCompleteAsync方法则会使用异步线程执行任务。
2.1、whenComplete同步执行回调
whenComplete()需要传一个BiConsumer接口的实现类,BiConsumer中accept(t,u)方法接收两个参数,第一个参数是future的返回结果,第二个参数是抛出的异常
@Test
void test() throws ExecutionException, InterruptedException {
System.out.println("test()方法开始了.....");
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("异步线程开始执行.....");
System.out.println("异步执行线程:"+Thread.currentThread().getName());
System.out.println("执行业务逻辑....");
System.out.println("异步线程执行结束.....");
return "我是返回值";
}, executor).whenComplete((result,exception)->{
System.out.println("异步任务执行结束,返回结果:"+result);
System.out.println("异步任务执行结束,异常:"+exception);
System.out.println("whenComplete线程:"+Thread.currentThread().getName());
}).exceptionally((t)->{
System.out.println("异常:"+t);
return "异常返回值";
});
System.out.println("test()方法结束.....");
}
whenComplete方法虽然能获得异常信息,但是无法修改CompletableFuture的返回值,因此可以链式调用exceptionally方法,指定一个出现异常以后默认的返回值。执行结果:
test()方法开始了.....
异步线程开始执行.....
异步执行线程:pool-1-thread-1
执行业务逻辑....
异步线程执行结束.....
异步任务执行结束,返回结果:我是返回值
异步任务执行结束,异常:null
whenComplete线程:main
test()方法结束.....
2.2、whenCompleteAsync异步执行回调
whenCompleteAsync使用异步线程来执行回调方法
@Test
void test() throws ExecutionException, InterruptedException {
System.out.println("test()方法开始了.....");
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("异步线程开始执行.....");
System.out.println("异步执行线程:"+Thread.currentThread().getName());
System.out.println("执行业务逻辑....");
System.out.println("异步线程执行结束.....");
return "我是返回值";
}, executor).whenCompleteAsync((result,exception)->{
System.out.println("异步任务执行结束,返回结果:"+result);
System.out.println("异步任务执行结束,异常:"+exception);
System.out.println("whenComplete线程:"+Thread.currentThread().getName());
},executor).exceptionally((t)->{
System.out.println("异常:"+t);
return "异常返回值";
});
System.out.println("test()方法结束.....");
}
执行结果:
test()方法开始了.....
异步线程开始执行.....
异步执行线程:pool-1-thread-1
执行业务逻辑....
异步线程执行结束.....
异步任务执行结束,返回结果:我是返回值
异步任务执行结束,异常:null
whenComplete线程:pool-1-thread-2
test()方法结束.....
3、handle
handle是方法执行结束后的处理,无论是成功结束,还是失败结束。complete系列方法只能感知异常,若要返回异常信息,只能在后面加exceptionally()方法。
@Test
void test() throws ExecutionException, InterruptedException {
System.out.println("test()方法开始了.....");
CompletableFuture.supplyAsync(() -> {
System.out.println("异步线程开始执行.....");
System.out.println("异步执行线程:"+Thread.currentThread().getName());
System.out.println("执行业务逻辑....");
System.out.println("异步线程执行结束.....");
return "我是返回值";
}, executor).handle((result,exception)->{
System.out.println("handle执行线程:"+Thread.currentThread().getName());
if(result != null){
return result;
}
if(exception != null){
return "handle exception 返回";
}
return "handle 返回";
});
System.out.println("test()方法结束.....");
}
执行结果:
test()方法开始了.....
异步线程开始执行.....
异步执行线程:pool-1-thread-1
执行业务逻辑....
异步线程执行结束.....
handle执行线程:main
test()方法结束.....
handle()方法也是在主线程中执行的。
4、线程串行化
假设B线程需要A线程执行完成之后才能继续执行,则需要线程串行化。
- thenRun系列方法,没有返回值,不需要依赖A返回的结果。
- thenAccept系列方法,没有返回值,需要依赖A返回的结果。
- thenApply系列方法,有返回值,依赖A返回的结果。
- 不带Async的,是在主线程中执行,带Async的,异步执行
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
4.1、thenRun系列方法
@Test
void testThen(){
System.out.println("test()开始.....");
CompletableFuture.supplyAsync(() -> {
System.out.println("异步线程开始执行.....");
System.out.println("异步执行线程:"+Thread.currentThread().getName());
System.out.println("执行业务逻辑....");
System.out.println("异步线程执行结束.....");
return "我是返回值";
}, executor).thenRun(()->{
System.out.println("thenRun执行线程:"+Thread.currentThread().getName());
});
System.out.println("test()结束.....");
}
执行结果:
test()开始.....
异步线程开始执行.....
异步执行线程:pool-1-thread-1
执行业务逻辑....
异步线程执行结束.....
thenRun执行线程:main
test()结束.....
将thenRun()换成thenRunAsync(),则会异步执行。
@Test
void testThen(){
System.out.println("test()开始.....");
CompletableFuture.supplyAsync(() -> {
System.out.println("异步线程开始执行.....");
System.out.println("异步执行线程:"+Thread.currentThread().getName());
System.out.println("执行业务逻辑....");
System.out.println("异步线程执行结束.....");
return "我是返回值";
}, executor).thenRunAsync(()->{
System.out.println("thenRun执行线程:"+Thread.currentThread().getName());
});
System.out.println("test()结束.....");
}
执行结果:
test()开始.....
异步线程开始执行.....
异步执行线程:pool-1-thread-1
执行业务逻辑....
异步线程执行结束.....
test()结束.....
thenRun执行线程:ForkJoinPool.commonPool-worker-1
4.2、thenAccept系列方法
@Test
void testThen(){
System.out.println("test()开始.....");
CompletableFuture.supplyAsync(() -> {
System.out.println("异步线程开始执行.....");
System.out.println("异步执行线程:"+Thread.currentThread().getName());
System.out.println("执行业务逻辑....");
System.out.println("异步线程执行结束.....");
return "我是返回值";
}, executor).thenAcceptAsync((result)->{
System.out.println("thenAcceptAsync执行线程:"+Thread.currentThread().getName());
System.out.println("thenAcceptAsync()接收到返回值:"+result);
});
System.out.println("test()结束.....");
}
执行结果:
test()开始.....
异步线程开始执行.....
异步执行线程:pool-1-thread-1
执行业务逻辑....
异步线程执行结束.....
test()结束.....
thenAcceptAsync执行线程:ForkJoinPool.commonPool-worker-1
thenAcceptAsync()接收到返回值:我是返回值
4.3、thenApply系列方法
@Test
void testThen() throws ExecutionException, InterruptedException {
System.out.println("test()开始.....");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("异步线程开始执行.....");
System.out.println("异步执行线程:" + Thread.currentThread().getName());
System.out.println("执行业务逻辑....");
System.out.println("异步线程执行结束.....");
return "我是返回值";
}, executor).thenApplyAsync((result) -> {
System.out.println("thenApplyAsync执行线程:" + Thread.currentThread().getName());
return "thenApplyAsync的返回结果";
});
System.out.println("test()结束....." + future.get() );
}
执行结果:
test()开始.....
异步线程开始执行.....
异步执行线程:pool-1-thread-1
执行业务逻辑....
异步线程执行结束.....
thenApplyAsync执行线程:ForkJoinPool.commonPool-worker-1
test()结束.....thenApplyAsync的返回结果
5、多任务组合
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf,等待所有任务都执行完成。
@Test
void testAll() throws ExecutionException, InterruptedException {
System.out.println("test()开始.....");
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("future1执行结束");
return "future1结果";
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("future2执行结束");
return "future2结果";
}, executor);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("future3执行结束");
return "future3结果";
}, executor);
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
allOf.get();
System.out.println("test()结束....." );
}
执行结果:
test()开始.....
future1执行结束
future2执行结束
future3执行结束
test()结束.....
anyOf,等待任何一个任务执行完成。
@Test
void testAll() throws ExecutionException, InterruptedException {
System.out.println("test()开始.....");
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("future1执行结束");
return "future1结果";
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("future2执行结束");
return "future2结果";
}, executor);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("future3执行结束");
return "future3结果";
}, executor);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
System.out.println("test()结束....."+anyOf.get());
}
执行结果:
test()开始.....
future1执行结束
future2执行结束
test()结束.....future1结果
future3执行结束