CompletableFuture 组合式异步编程
CompletableFuture
是jdk1.8引入的一个新特性。 它主要是为了解决多个Future
结果之间的依赖关系。比如:
- 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第
一个的结果。 - 等待
Future
集合中的所有任务都完成。 - 仅等待
Future
集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同
一个值),并返回它的结果。 - 通过编程方式完成一个
Future
任务的执行(即以手工设定异步操作结果的方式)。 - 应对Future的完成事件(即当
Future
的完成事件发生时会收到通知,并能使用Future
计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。
runAsync 和 supplyAsync方法
CompletableFuture
提供了四个静态方法来创建一个异步操作。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
-
runAsync
方法不支持返回值 -
supplyAsync
可以支持返回值
如果executor
参数没有设置值,那么会使用ForkJoinPool.commonPool
默认线程池执行任务。
示例
@Test
public void testRunAsync() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName() + " 执行异步任务 runAsync"), executor);
String result = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync");
return "有结果";
}, executor).get();
System.out.println(result);
}
pool-1-thread-1 执行异步任务 runAsync
pool-1-thread-2 执行异步任务 supplyAsync
有结果
计算结果完成时的回调方法
当CompletableFuture
的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到Action的类型是BiConsumer<? super T,? super Throwable>
它可以处理正常的计算结果,或者异常情况。
whenComplete
和 whenCompleteAsync
的区别:
-whenComplete
:是执行当前任务的线程执行继续执行 whenComplete 的任务。
-whenCompleteAsync
:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
示例
@Test
public void testWhenComplete() {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync");
return "有结果";
}, executor).whenComplete((s, throwable) -> {
System.out.println(Thread.currentThread().getName() + " 执行了 whenComplete");
if (!StringUtils.isEmpty(s)) {
System.out.println(Thread.currentThread().getName() + " 真的有结果诶! 结果是:" + s);
}
}).whenCompleteAsync((s, throwable) -> {
System.out.println(Thread.currentThread().getName() + " 执行了 whenCompleteAsync");
System.out.println(Thread.currentThread().getName() + " 打印结果值 :" + s);
}, executor).exceptionally(throwable -> {
System.out.println(Thread.currentThread().getName() + " 执行了 exceptionally");
System.out.println(Thread.currentThread().getName() + " 异常了 :" + throwable.getMessage());
return "异常了";
});
}
pool-1-thread-1 执行异步任务 supplyAsync
pool-1-thread-1 执行了 whenComplete
pool-1-thread-1 真的有结果诶! 结果是:有结果
pool-1-thread-2 执行了 whenCompleteAsync
pool-1-thread-2 打印结果值 :有结果
thenApply 方法
当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Function<? super T,? extends U>
- T:上一个任务返回结果的类型
- U:当前任务的返回值类型
thenApply
和thenApplyAsync
的区别:
-
thenApply
:使用当前线程来执行任务 -
thenApplyAsync
:如果设置了executor
则使用设置的线程池执行任务,如果没有设置则使用ForkJoinPool.commonPool
线程池执行
示例
@Test
public void testThenApply() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
Integer integer = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync");
return "有结果";
}, executor).thenApplyAsync(s -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 thenApply");
if (!StringUtils.isEmpty(s)) {
return s.length();
}
return 0;
}, executor).get();
System.out.println(Thread.currentThread().getName() + " " + integer);
}
pool-1-thread-1 执行异步任务 supplyAsync
pool-1-thread-2 执行异步任务 thenApply
main 3
handle 方法
handle
是执行任务完成时对结果的处理。
handle
方法和thenApply
方法处理方式基本一样。不同的是 handle
是在任务完成后再执行,还可以处理异常的任务。thenApply
只可以执行正常的任务,任务出现异常则不执行 thenApply
方法。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
示例
@Test
public void testHandle() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
Integer integer = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync");
return 10/0;
}, executor).handleAsync((s, throwable) -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 handleAsync");
if (Objects.nonNull(throwable)) {
System.out.println(Thread.currentThread().getName() + " 异常信息 " + throwable.getMessage());
return 1;
}
return s;
}, executor).get();
System.out.println(Thread.currentThread().getName() + " " + integer);
}
pool-1-thread-1 执行异步任务 supplyAsync
pool-1-thread-2 执行异步任务 handleAsync
pool-1-thread-2 异常信息 java.lang.ArithmeticException: / by zero
main 1
从示例中可以看出,在handle
中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply
方法,如果上个任务出现错误,则不会执行 thenApply
方法。
thenAccept 消费处理结果
接收任务的处理结果,并消费处理,无返回结果。
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
示例
@Test
@Test
public void testThenAccept() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync");
return 10;
}, executor).thenAcceptAsync(s -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 thenAccept");
System.out.println(Thread.currentThread().getName() + " " + s);
}, executor).get();
}
pool-1-thread-1 执行异步任务 supplyAsync
pool-1-thread-2 执行异步任务 thenAccept
pool-1-thread-2 10
从示例代码中可以看出,该方法只是消费执行完成的任务,并可以根据上面的任务返回的结果进行处理。如果第一个任务发生异常,那么thenAccept
方法不会被执行。
thenRun 方法
跟 thenAccept
方法不一样的是,thenRun
不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenRun
里面的任务。
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
示例
@Test
public void testThenRun() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync");
return 10;
}, executor).thenRunAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 thenRun");
}, executor).get();
}
pool-1-thread-1 执行异步任务 supplyAsync
pool-1-thread-2 执行异步任务 thenRun
thenCombine 合并任务
thenCombine
会把 两个CompletionStage
的任务都执行完成后,把两个任务的结果一块交给thenCombine
来处理。
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
示例
@Test
public void testThenCombine() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
Integer result = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync1");
return 10;
}, executor).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync2");
return 15;
}, executor), (result1, result2) -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 thenCombine");
return result1 + result2;
}).get();
System.out.println(Thread.currentThread().getName() + " " + result);
// 分解一下好看些
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync1");
return 10;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync2");
return 15;
}, executor);
result = future1.thenCombine(future2, (result1, result2) -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 thenCombine");
return result1 + result2;
}).get();
System.out.println(Thread.currentThread().getName() + " " + result);
}
pool-1-thread-1 执行异步任务 supplyAsync1
pool-1-thread-2 执行异步任务 supplyAsync2
pool-1-thread-2 执行异步任务 thenCombine
main 25
pool-1-thread-3 执行异步任务 supplyAsync1
pool-1-thread-4 执行异步任务 supplyAsync2
main 执行异步任务 thenCombine
main 25
thenAcceptBoth
当两个CompletionStage
都执行完成后,把结果一块交给thenAcceptBoth
来进行消耗。
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
示例
@Test
public void testThenAcceptBoth() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
// 分解一下好看些
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync1");
return 10;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync2");
return 15;
}, executor);
future1.thenAcceptBoth(future2, (result1, result2) -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 thenCombine");
System.out.println(Thread.currentThread().getName() + " " + (result1 + result2));
}).get();
}
pool-1-thread-1 执行异步任务 supplyAsync1
pool-1-thread-2 执行异步任务 supplyAsync2
pool-1-thread-2 执行异步任务 thenAcceptBoth
pool-1-thread-2 25
applyToEither 方法
两个CompletionStage
,谁执行返回的结果快,我就用那个CompletionStage
的结果进行下一步的转化操作。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
示例
@Test
public void testApplyToEither() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
// 分解一下好看些
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync1 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync2 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
String result = future1.applyToEither(future2, integer -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 applyToEither");
return integer + "";
}).get();
System.out.println(Thread.currentThread().getName() + " 执行异步任务 " + result);
}
pool-1-thread-1 执行异步任务 supplyAsync1 301
pool-1-thread-2 执行异步任务 supplyAsync2 422
pool-1-thread-1 执行异步任务 applyToEither
main 执行异步任务 301
acceptEither 方法
两个CompletionStage
,谁执行返回的结果快,我就用那个CompletionStage
的结果进行下一步的消耗操作。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
示例
@Test
public void testAcceptEither() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
// 分解一下好看些
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync1 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync2 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
future1.acceptEither(future2, integer -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 acceptEither" + integer);
}).get();
}
pool-1-thread-1 执行异步任务 supplyAsync1 507
pool-1-thread-2 执行异步任务 supplyAsync2 167
pool-1-thread-2 执行异步任务 acceptEither 167
runAfterEither 方法
两个CompletionStage
,任何一个完成了都会执行下一步的操作(Runnable
)
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
示例
@Test
public void testRunAfterEither() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
// 分解一下好看些
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync1 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync2 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
future1.runAfterEither(future2, () -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 runAfterEither ");
}).get();
System.out.println(Thread.currentThread().getName() + " 任务结束 耗时:" + (System.currentTimeMillis() - start));
}
pool-1-thread-1 执行异步任务 supplyAsync1 23
pool-1-thread-2 执行异步任务 supplyAsync2 704
pool-1-thread-1 执行异步任务 runAfterEither
main 任务结束 耗时:32
runAfterBoth 方法
两个CompletionStage
,都完成了计算才会执行下一步的操作(Runnable)
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
示例
@Test
public void testRunAfterBoth () throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
// 分解一下好看些
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync1 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync2 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
future1.runAfterBoth(future2, () -> {
System.out.println(Thread.currentThread().getName() + " 执行异步任务 runAfterBoth ");
}).get();
System.out.println(Thread.currentThread().getName() + " 任务结束 耗时:" + (System.currentTimeMillis() - start));
}
pool-1-thread-1 执行异步任务 supplyAsync1 576
pool-1-thread-2 执行异步任务 supplyAsync2 941
pool-1-thread-2 执行异步任务 runAfterBoth
main 任务结束 耗时:959
thenCompose 方法
thenCompose
方法允许你对两个CompletionStage
进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
示例
@Test
public void testThenCompose() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
// 分解一下好看些
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync1 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
String result = future1.thenCompose(integer -> CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 执行异步任务 supplyAsync1 " + probe);
CompletableFutureTest.sleep(probe);
return integer + " " + probe;
}, executor)).get();
System.out.println(Thread.currentThread().getName() + " " + result);
System.out.println(Thread.currentThread().getName() + " 任务结束 耗时:" + (System.currentTimeMillis() - start));
}
pool-1-thread-1 执行异步任务 supplyAsync1 460
pool-1-thread-2 执行异步任务 supplyAsync1 320
main 460 320
main 任务结束 耗时:788
总结
- 如果没有指定线程池
CompletableFuture
会使用ForkJoinPool.commonPool()
作为它的线程池执行异步代码,如果指定线程池,则使用指定的线程池运行。
参考
《Java 8实战》
https://www.jianshu.com/p/6bac52527ca4
源码
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-concurrent 工程
layering-cache
为监控而生的多级缓存框架 layering-cache这是我开源的一个多级缓存框架的实现,如果有兴趣可以看一下