Java8 CompletableFuture 异步任务
所在文集:Java 并发编程
所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。
关于 Java Future,请首先参见
JDK5 新增了 Future 接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。 例如:
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newSingleThreadExecutor();
// 在 Java8 中,推荐使用 Lambda 来替代匿名 Callable 实现类
Future<Integer> f = es.submit(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 123;
});
// 当前 main 线程阻塞,直至 future 得到值
System.out.println(f.get());
es.shutdown();
}
阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式呢?即当计算结果完成及时通知监听者。(例如通过回调的方式)
关于 Future
接口,还有如下一段描述:
The Future interface was added in Java 5 to serve as a result of an asynchronous computation, but it did not have any methods to combine these computations or handle possible errors.
不能很好地组合多个异步任务,也不能处理可能的异常。
CompletableFuture
Java 8 中, 新增加了一个包含 50 个方法左右的类 CompletableFuture
,它提供了非常强大的 Future
的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture
的方法。
对于阻塞或者轮询方式,依然可以通过 CompletableFuture
类的 CompletionStage
和 Future
接口方式支持。
CompletableFuture
类声明了 CompletionStage
接口,CompletionStage
接口实际上提供了同步或异步运行计算的舞台,所以我们可以通过实现多个 CompletionStage
命令,并且将这些命令串联在一起的方式实现多个命令之间的触发。
我们可以通过 CompletableFuture.supplyAsync(this::sendMsg);
这么一行代码创建一个简单的异步计算。在这行代码中,supplyAsync
支持异步地执行我们指定的方法,这个例子中的异步执行方法是 sendMsg
。当然,我们也可以使用 Executor
执行异步程序,默认是 ForkJoinPool.commonPool()
。
我们也可以在异步计算结束之后指定回调函数,例如 CompletableFuture.supplyAsync(this::sendMsg) .thenAccept(this::notify);
这行代码中的 thenAccept
被用于增加回调函数,在我们的示例中 notify
就成了异步计算的消费者,它会处理计算结果。
CompletionStage<T> 接口
A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes.
A stage completes upon termination of its computation, but this may in turn trigger other dependent stages.
一个可能执行的异步计算的某个阶段,在另一个CompletionStage完成时执行一个操作或计算一个值。
一个阶段完成后,其计算结束。但是,该计算阶段可能会触发下一个计算阶段。
最简单的例子
CompletableFuture
实际上也实现了 Future
接口:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
所以我们也可以利用 CompletableFuture
来实现基本的 Future
功能,例如:
public static void main(String[] args) throws Exception {
CompletableFuture future = new CompletableFuture();
// 在 Java8 中,推荐使用 Lambda 来替代匿名 Runnable 实现类
new Thread(
() -> {
try {
// 模拟一段耗时的操作
Thread.sleep(2000);
future.complete("I have completed");
} catch (Exception e) {
}
}
).start();
System.out.println(future.get());
}
此时此刻主线程 future.get()
将得到字符串的结果 I have completed
,同时完成回调以后将会立即生效。注意 complete()
方法只能调用一次,后续调用将被忽略。
注意:get()
方法可能会抛出异常 InterruptedException
和 ExecutionException
。
如果我们已经知道了异步任务的结果,我们也可以直接创建一个已完成的 future
,如下:
public static void main(String[] args) throws Exception {
// Returns a new CompletableFuture that is already completed with the given value.
CompletableFuture future = CompletableFuture.completedFuture("I have completed");
System.out.println(future.get());
}
如果在异步执行过程中,我们觉得执行会超时或者会出现问题,我们也可以通过 cancle()
方法取消,此时调用 get()
方法时会产生异常 java.util.concurrent.CancellationException
,代码如下:
public static void main(String[] args) throws Exception {
CompletableFuture future = new CompletableFuture();
// 在 Java8 中,推荐使用 Lambda 来替代匿名 Runnable 实现类
new Thread(
() -> {
try {
// 模拟一段耗时的操作
Thread.sleep(2000);
future.cancel(false);
} catch (Exception e) {
}
}
).start();
System.out.println(future.get());
}
使用工厂方法创建 CompletableFuture
在上述的代码中,我们手动地创建 CompletableFuture
,并且手动的创建一个线程(或者利用线程池)来启动异步任务,这样似乎有些复杂。
其实我们可以利用 CompletableFuture
的工厂方法,传入 Supplier
或者 Runnable
的实现类,直接得到一个 CompletableFuture
的实例:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
第一个和第三个方法,没有 Executor
参数,将会使用 ForkJoinPool.commonPool()
(全局的,在 JDK8 中介绍的通用池),这适用于 CompletableFuture
类中的大多数的方法。
-
Runnable
接口方法public abstract void run();
没有返回值 -
Supplier
接口方法T get();
有返回值。如果你需要处理异步操作并返回结果,使用前两种Supplier<U>
方法
一个小的 Tips:
Both Runnable and Supplier are functional interfaces that allow passing their instances as lambda expressions thanks to the new Java 8 feature. 使用 Lambda 表达式来传入
Supplier
或者Runnable
的实现类。
一个示例代码如下:
public static void main(String[] args) throws Exception {
// 在 Java8 中,推荐使用 Lambda 来替代匿名 Supplier 实现类
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return "I have completed";
});
System.out.println(future.get());
}
转换和作用于异步任务的结果 (thenApply)
我们可以叠加功能,把多个 future
组合在一起等
-
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
- 该方法的作用是在该计算阶段正常完成后,将该计算阶段的结果作为参数传递给参数
fn
值的函数Function
,并会返回一个新的CompletionStage
- 该方法的作用是在该计算阶段正常完成后,将该计算阶段的结果作为参数传递给参数
-
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
- 该方法和上面的方法
thenApply
功能类似,不同的是对该计算阶段的结果进行计算的函数fn
的执行时异步的。
- 该方法和上面的方法
-
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
- 该方法和上面的方法
thenApplyAsync
功能类似,不同的是对该计算阶段的结果进行计算的函数fn
的执行时异步的, 并且是在调用者提供的线程池中执行的。
- 该方法和上面的方法
Function
接口方法 R apply(T t);
包含一个参数和一个返回值
一个示例代码如下:
public static void main(String[] args) throws Exception {
// 在 Java8 中,推荐使用 Lambda 来替代匿名 Supplier 实现类
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return "I have completed";
});
CompletableFuture<String> upperfuture = future.thenApply(String::toUpperCase);
System.out.println(upperfuture.get());
}
运行完成的异步任务的结果 (thenAccept/thenRun)
在 future
的管道里有两种典型的“最终”阶段方法。他们在你使用 future
的值的时候做好准备,当
thenAccept()
提供最终的值时,thenRun
执行 Runnable
。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
Consumer
接口方法 void accept(T t);
包含一个参数,但是没有返回值
一个示例代码如下:
public static void main(String[] args) throws Exception {
// 在 Java8 中,推荐使用 Lambda 来替代匿名 Supplier 实现类
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return "I have completed";
});
future.thenAccept(s -> {
System.out.println(s);
});
// Waits if necessary for this future to complete, and then returns its result.
future.get();
}
结合两个 CompletableFuture
The best part of the CompletableFuture API is the ability to combine CompletableFuture instances in a chain of computation steps.
这就是 CompletableFuture 最大的优势。
-
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
- 传入前一个
CompletableFuture
的返回值,返回另外一个CompletableFuture
实例
- 传入前一个
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
一个示例代码如下:
public static void main(String[] args) throws Exception {
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
return "Hello ";
}).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return s + "World";
}));
System.out.println(future.get()); // Hello World
}
上述功能也可以通过 thenCombine()
方法实现,传入一个 BiFunction
接口的实例(以 Lambda 形式) 例如:
public static void main(String[] args) throws Exception {
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
return "Hello ";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return "World";
}), (s1, s2) -> s1 + s2);
System.out.println(future.get());
}
并行执行多个异步任务
有时候我们可能需要等待所有的异步任务都执行完毕,然后组合他们的结果。我们可以使用 allOf()
方法:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
一个示例代码如下:
public static void main(String[] args) throws Exception {
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2);
// 这个方法不会合并结果,可以看到他的返回值是 Void 类型
combinedFuture.get();
// 我们需要手动来处理每一个并行异步任务的结果
String combined = Stream.of(future1, future2)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
System.out.println(combined); // Hello World
}
有时候我们可能不需要等待所有的异步任务都执行完毕,只要任何一个任务完成就返回结果。我们可以使用 anyOf()
方法:
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
一个示例代码如下:
public static void main(String[] args) throws Exception {
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return "World";
}
);
CompletableFuture<Object> combinedFuture
= CompletableFuture.anyOf(future1, future2);
System.out.println(combinedFuture.get()); // Hello
}
异常的处理
我们可以在 handle()
方法里处理异常:
-
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
- 第一个参数为
CompletableFuture
返回的结果 - 第二个参数为抛出的异常
- 第一个参数为
一个示例代码如下:
public static void main(String[] args) throws Exception {
String name = null;
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
System.out.println(future.get()); // Hello, Stranger!
}
参考:
Java 8:CompletableFuture终极指南- ImportNew
通过实例理解JDK8 的CompletableFuture - IBM
Guide To CompletableFuture
java8中CompletableFuture解析