Java线程-Callable、Future、FutureTas
一. 前言
在前面,我们已经了解了创建线程的两种方式:继承Thread或者实现Runnable接口,由于Thread也实现了Runnable接口,实际上上面两种方式是一种方式,不过这种方式有一个问题就是在任务完成之后无法获取执行的结果。而我们今天要学习的Callable就可以解决该问题。
二. 简介
Callable和Future是JDK1.5之后引入的,也是用于创建线程,不过在任务执行完成之后可以得到任务的执行结果,我们来简单看下这两个接口及相关的实现。
1. Callable接口
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
Callable接口是一个泛型接口,可以返回结果,并可以抛出异常,只有一个call方法,类似于Runnable的run方法,用于执行任务,但run方法没有返回值,并且不能抛出checked Exception。而call方法是有返回值,且返回的类型就是传进来的V类型。
而Callable 接口一般是配合ExecutorService线程池来使用的,在ExecutorService中声明了若干个submit方法:
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
目前只需要知道Callable一般是和ExecutorService配合来使用的,而有关线程池方面的内容,我们后续自然会学习到。
2. Future接口
Future表示的是异步计算的结果,提供了查看任务是否完成,取消任务,获取结果等接口。而针对获取结果接口,只有在任务执行完成之后,才能获取,否则的话,该方法会阻塞直到任务执行完成。取消接口的话,一旦任务执行完成,就不能取消。
If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task. (如果出于可取消性的考虑而不想提供可用的结果,那么可以声明 Future<?> 类型并返回 null作为任务的执行结果)
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future接口提供了如下5个接口:
- cancel,尝试取消任务,如果任务已经执行完成,已被取消或者由于其他原因无法取消,则取消失败,返回false;如果该任务尚未start,那么调用cancel后,该任务将取消成功,返回true;如果任务已经在执行中,那么通过参数
mayInterruptIfRunning
来判断执行该任务的线程是否可用中断,也就是是否取消正在执行但还没有执行完成的任务,如果该值为true,则可以取消;- isCancelled,表示任务是否被取消成功,如果任务在正常完成之前被取消,则返回true;
- isDone,表示任务是否已经完成,已经完成返回true;完成可能是由于正常执行完成、抛出异常或取消,在这些情况下,该接口都返回true;也就是,在调用cancel方法返回之后,该方法的调用将返回true;
- get,获取线程的执行结果,如果任务尚未执行完成,则该接口将会处于阻塞,直到线程执行完成;而该方法有可能会抛出以下几个异常:
- CancellationException - if the computation was cancelled
- ExecutionException - if the computation threw an exception
- InterruptedException - if the current thread was interrupted while waiting
- get,有参数的get方法,该方法也是用于获取执行结果,如果在指定的时间内没有获取到结果,则会抛出超时异常:
- TimeoutException - if the wait timed out
也就是说,通过Future我们可以判断任务是否执行完成,取消任务,获取任务执行完成之后的结果。而Future是一个接口,我们来看以下它的实现类FutureTask。
3. FutureTask
我们先来看一下FutureTask的实现:
public class FutureTask<V> implements RunnableFuture<V> {
public FutureTask(Callable<V> callable)
public FutureTask(Runnable runnable, V result)
}
我们可以看到,FutureTask继承自RunnableFuture,我们再来看下RunnableFuture:
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
可以看到,RunnableFuture同时继承了Runnable接口和Future接口,所以说它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
4. CompletableFuture
虽然Future提供了异步执行的能力,但Future对于结果的获取却不方便,只能通过阻塞或者轮询的方式得到结果,所以在JDK 8.0之后引入了CompletableFuture这个类来优化这些问题,CompletableFuture大概有50多个方法,功能十分强大,比如说 将多个异步计算合并为一个,后面的依赖于前面执行的结果等,我们先来看一下它的继承结构:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
CompletableFuture算是Future的增强类,因为该类在实现Future的同时又实现了CompletionStage
这个接口,而 CompletionStage 接口表示的是阶段,表示某个阶段完成之后要做什么,该接口提供了多种方法,接下来我们来学习下。
4.1 生成 CompletableFuture
我们可用借助CompletableFuture 的几个静态方法来生成 CompletableFuture 对象:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor)
以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。
也可以通过completedFuture返回一个已经计算好的CompletedFuture的对象:
public static <U> CompletableFuture<U> completedFuture(U value)
4.2 CompletionStage 接口
我们来看下CompletionStage 的接口。CompletionStage 的接口虽然很多,但相似的方法一般都会分为三类,一个是使用当前线程执行,一个是使用共享的线程池,另一个是使用自定义的(参数传递)Executor来执行,类似于下面这种:
thenXXX() :使用当前线程继续执行接下来的内容
thenXXXAsync() :使用共享线程池执行接下来的内容
thenXXXAsync(with Executor) :使用参数中提供的Executor来执行接下来的内容
- thenApply,返回一个新的CompletionStage,当该阶段执行完成后,该阶段的结果作为参数进行下一步执行,比如将CompletableFuture的返回结果String类型的结果转换为Integer:
completableFuture.thenApply(s -> Integer.parseInt(s));
- thenAccept,对结果的消费处理,只能消费一次,该方法没有返回值:
completableFuture.thenAccept(s -> System.out.println("->" + s));
- thenRun,该阶段执行完成之后,执行下一个任务,比如说如果你想在CompletableFuture完成之后执行一个Runnable任务,就可以使用该操作,一旦CompletableFuture完成,就会执行指定的Runnable任务,下面是使用示例:
completableFuture.thenRun(() -> System.out.println("then running"));
- thenCombine,对两个CompletableFuture的结果进行组合然后返回,比如将当前CompletableFuture 的结果和等待的 CompletableFuture 的结果进行一些组合然后返回:
completableFuture.thenCombine(completableFuture1, (s1, s2) -> "Combine:" + s1 + " : " + s2);
- thenAcceptBoth,等待两个CompletableFuture结束,然后消费结果,所以该方法自然没有返回值。只有当两个任务都完成之后才执行:
completableFuture.thenAcceptBoth(completableFuture,
(s1, s2) -> System.out.println("Consume:" + s1 + ":" + s2));
- runAfterBoth,相比于AcceptBoth,runAfterBoth虽然也会等待两个CompletableFuture都结束再运行,但是不再依赖两个CompletableFuture的运行结果:
completableFuture.runAfterBoth(stringCompletableFutureV1,
(Runnable) () -> System.out.println("do some jobs here"));
- applyToEither,两个CompletableFuture 有竞争关系,哪个CompletableFuture先结束,就会对哪个的返回值做处理,然后第二个CompletableFuture的结果将不会再处理:
completableFuture.applyToEither(stringCompletableFutureV1,
(Function<String, Integer>) Integer::parseInt);
- acceptEither,同样,对两个CompletableFuture 先结束的进行消费:
completableFuture.acceptEither(stringCompletableFutureV1,
(Consumer<String>) s -> System.out.println("->" + s));
- runAfterEither,同样,对两个CompletableFuture先结束的 再执行新的任务:
completableFuture.runAfterEither(stringCompletableFutureV1,
(Runnable) () -> System.out.println("other job"));
- thenCompose,表示使用CompletableFuture的结果来产生一个全新的CompletableFuture,then的含义是等待当前的CompletableFuture运行结束,然后再运行我们指定的Supplier:
completableFuture.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ":" + s));
- exceptionally,对当前的CompletableFuture抛出的异常进行一些处理,类似于fallback,比如说返回异常内容来代替当前抛出的异常,当然你也可以在该方法里面直接抛出当前CompletableFuture抛出的异常:
completableFuture.exceptionally(throwable -> "fallback");
- whenComplete,当前CompletableFuture将会把运行结果作为参数传递给指定的function,然后我们可以通过参数来判断当前CompletableFuture是否成功完成还是抛出了异常,然后进行后续处理:
completableFuture.whenComplete((s, throwable) -> System.out.println(s));
- handle,和whenComplete相比,该类型不仅可以获取到当前CompletableFuture是成功了还是抛出了异常,还可以对结果进行转换(如果当前CompletableFuture成功完成了):
completableFuture.handle((s, throwable) -> {
if (throwable == null) {
return Integer.parseInt(s);
}
return -1;
});
4.3 CompletableFuture实现类
CompletableFuture中,除了实现上述方法以及Future接口的实现方法之外,还有一些方法,比如除了常规的阻塞get及超时get方法之外,还有一个getNow
方法:
public T getNow(T valueIfAbsent) {
Object r;
return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}
result字段代表任务的执行结果,所以首先判断是否为null,为null则表示任务还没有执行结束,直接返回参数中传递的默认值;如果result不为null,表示说明任务已经执行结束,然后直接返回;如果要了解源码的执行流程,可以参考本章后面引用的链接地址。
另外两个常用的方法,allOf,anyOf方法,在CompletionStage接口的方法中操作的CompletionStage对象,要么是一个,要么是两个,而这两个静态方法可以组合多个对象。
- allOf,allOf可以接收多个CompletableFuture参数,该方法会等待所有的CompletableFuture都执行完成之后组合完成,再执行接下来的操作;
- anyOf,anyOf和allOf类似,也是组合多个,但只要有一个CompletableFuture完成之后便执行后续的操作;
这部分内容参考自:
一字马胡 - 深度学习Java Future(一)
一字马胡 - 深度学习Java Future(二)
3. 运行示例
接下来,我们通过几个简单的例子来看一下这几个接口的使用。
3.1 Callable和Future
我们先来看一下Callable和Future的使用:
public class CallableTest {
public static void main(String[] args) {
try {
callable();
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
public static void callable() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
Future<String> future = executor.submit(new Callable<String>() {
public String call() throws InterruptedException {
System.out.println("child thread start --");
TimeUnit.SECONDS.sleep(3);
System.out.println("child thread end --");
return "hello callable";
}
});
executor.shutdown();
String result = future.get();
System.out.println("return value: " + result);
}
}
其中,匿名类Callable可以替换为lambda表达式:
public static void callable() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
Future<String> future = executor.submit(() -> {
System.out.println("child thread start --");
TimeUnit.SECONDS.sleep(3);
System.out.println("child thread end --");
return "hello callable";
});
executor.shutdown();
String result = future.get();
System.out.println("return value: " + result);
}
output:
child thread start --
child thread end --
return value:hello callable
3.2 Callable和FutureTask
至于FutureTask,我们可以通过FutureTask的构造方法来构造对象,然后通过ExecutorService的execute方法来执行:
public static void callable() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
FutureTask<String> future = new FutureTask<>(() -> {
System.out.println("child thread start --");
TimeUnit.SECONDS.sleep(3);
System.out.println("child thread end --");
return "hello callable";
});
executor.execute(future);
executor.shutdown();
String result = future.get();
System.out.println("return value: " + result);
}
当然也可以借助Thread类而不是ExecutorService来实现:
public static void callable() throws ExecutionException, InterruptedException {
FutureTask<String> future = new FutureTask<>(() -> {
System.out.println("child thread start --");
TimeUnit.SECONDS.sleep(3);
System.out.println("child thread end --");
return "hello callable";
});
new Thread(future).start();
String result = future.get();
System.out.println("return value: " + result);
}
本文参考自:
海子-Java并发编程:Callable、Future和FutureTask
https://docs.oracle.com/javase/8/docs/api/
IBM - 通过实例理解 JDK8 的 CompletableFuture
鸟窝-Java CompletableFuture 详解
《Java并发编程实战》