Java线程-Callable、Future、FutureTas

2018-08-18  本文已影响87人  骑着乌龟去看海

一. 前言

  在前面,我们已经了解了创建线程的两种方式:继承Thread或者实现Runnable接口,由于Thread也实现了Runnable接口,实际上上面两种方式是一种方式,不过这种方式有一个问题就是在任务完成之后无法获取执行的结果。而我们今天要学习的Callable就可以解决该问题。

二. 简介

  Callable和Future是JDK1.5之后引入的,也是用于创建线程,不过在任务执行完成之后可以得到任务的执行结果,我们来简单看下这两个接口及相关的实现。

1. Callable接口
@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable接口是一个泛型接口,可以返回结果,并可以抛出异常,只有一个call方法,类似于Runnable的run方法,用于执行任务,但run方法没有返回值,并且不能抛出checked Exception。而call方法是有返回值,且返回的类型就是传进来的V类型。

而Callable 接口一般是配合ExecutorService线程池来使用的,在ExecutorService中声明了若干个submit方法:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

目前只需要知道Callable一般是和ExecutorService配合来使用的,而有关线程池方面的内容,我们后续自然会学习到。

2. Future接口

  Future表示的是异步计算的结果,提供了查看任务是否完成,取消任务,获取结果等接口。而针对获取结果接口,只有在任务执行完成之后,才能获取,否则的话,该方法会阻塞直到任务执行完成。取消接口的话,一旦任务执行完成,就不能取消。

If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task. (如果出于可取消性的考虑而不想提供可用的结果,那么可以声明 Future<?> 类型并返回 null作为任务的执行结果)

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future接口提供了如下5个接口:

  1. cancel,尝试取消任务,如果任务已经执行完成,已被取消或者由于其他原因无法取消,则取消失败,返回false;如果该任务尚未start,那么调用cancel后,该任务将取消成功,返回true;如果任务已经在执行中,那么通过参数mayInterruptIfRunning来判断执行该任务的线程是否可用中断,也就是是否取消正在执行但还没有执行完成的任务,如果该值为true,则可以取消;
  2. isCancelled,表示任务是否被取消成功,如果任务在正常完成之前被取消,则返回true;
  3. isDone,表示任务是否已经完成,已经完成返回true;完成可能是由于正常执行完成、抛出异常或取消,在这些情况下,该接口都返回true;也就是,在调用cancel方法返回之后,该方法的调用将返回true;
  1. get,获取线程的执行结果,如果任务尚未执行完成,则该接口将会处于阻塞,直到线程执行完成;而该方法有可能会抛出以下几个异常:
    • CancellationException - if the computation was cancelled
    • ExecutionException - if the computation threw an exception
    • InterruptedException - if the current thread was interrupted while waiting
  2. get,有参数的get方法,该方法也是用于获取执行结果,如果在指定的时间内没有获取到结果,则会抛出超时异常:
    • TimeoutException - if the wait timed out

也就是说,通过Future我们可以判断任务是否执行完成,取消任务,获取任务执行完成之后的结果。而Future是一个接口,我们来看以下它的实现类FutureTask。

3. FutureTask

我们先来看一下FutureTask的实现:

public class FutureTask<V> implements RunnableFuture<V> {
    public FutureTask(Callable<V> callable)
    public FutureTask(Runnable runnable, V result)
}

我们可以看到,FutureTask继承自RunnableFuture,我们再来看下RunnableFuture:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

可以看到,RunnableFuture同时继承了Runnable接口和Future接口,所以说它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

4. CompletableFuture

  虽然Future提供了异步执行的能力,但Future对于结果的获取却不方便,只能通过阻塞或者轮询的方式得到结果,所以在JDK 8.0之后引入了CompletableFuture这个类来优化这些问题,CompletableFuture大概有50多个方法,功能十分强大,比如说 将多个异步计算合并为一个,后面的依赖于前面执行的结果等,我们先来看一下它的继承结构:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 

CompletableFuture算是Future的增强类,因为该类在实现Future的同时又实现了CompletionStage 这个接口,而 CompletionStage 接口表示的是阶段,表示某个阶段完成之后要做什么,该接口提供了多种方法,接下来我们来学习下。

4.1 生成 CompletableFuture

我们可用借助CompletableFuture 的几个静态方法来生成 CompletableFuture 对象:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor)

以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。

也可以通过completedFuture返回一个已经计算好的CompletedFuture的对象:

public static <U> CompletableFuture<U> completedFuture(U value)
4.2 CompletionStage 接口

  我们来看下CompletionStage 的接口。CompletionStage 的接口虽然很多,但相似的方法一般都会分为三类,一个是使用当前线程执行,一个是使用共享的线程池,另一个是使用自定义的(参数传递)Executor来执行,类似于下面这种:

thenXXX() :使用当前线程继续执行接下来的内容
thenXXXAsync() :使用共享线程池执行接下来的内容
thenXXXAsync(with Executor) :使用参数中提供的Executor来执行接下来的内容
completableFuture.thenApply(s -> Integer.parseInt(s));
completableFuture.thenAccept(s -> System.out.println("->" + s));
completableFuture.thenRun(() -> System.out.println("then running"));
completableFuture.thenCombine(completableFuture1, (s1, s2) -> "Combine:" + s1 + " : " + s2);
completableFuture.thenAcceptBoth(completableFuture, 
    (s1, s2) -> System.out.println("Consume:" + s1 + ":" + s2));
completableFuture.runAfterBoth(stringCompletableFutureV1, 
    (Runnable) () -> System.out.println("do some jobs here"));
completableFuture.applyToEither(stringCompletableFutureV1, 
    (Function<String, Integer>) Integer::parseInt);
completableFuture.acceptEither(stringCompletableFutureV1, 
    (Consumer<String>) s -> System.out.println("->" + s));
completableFuture.runAfterEither(stringCompletableFutureV1, 
    (Runnable) () -> System.out.println("other job"));
completableFuture.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ":" + s));
completableFuture.exceptionally(throwable -> "fallback");
completableFuture.whenComplete((s, throwable) -> System.out.println(s));
completableFuture.handle((s, throwable) -> {
    if (throwable == null) {
        return Integer.parseInt(s);
    }
    return -1;
});
4.3 CompletableFuture实现类

  CompletableFuture中,除了实现上述方法以及Future接口的实现方法之外,还有一些方法,比如除了常规的阻塞get及超时get方法之外,还有一个getNow方法:

public T getNow(T valueIfAbsent) {
    Object r;
    return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}

result字段代表任务的执行结果,所以首先判断是否为null,为null则表示任务还没有执行结束,直接返回参数中传递的默认值;如果result不为null,表示说明任务已经执行结束,然后直接返回;如果要了解源码的执行流程,可以参考本章后面引用的链接地址。

另外两个常用的方法,allOf,anyOf方法,在CompletionStage接口的方法中操作的CompletionStage对象,要么是一个,要么是两个,而这两个静态方法可以组合多个对象。

  1. allOf,allOf可以接收多个CompletableFuture参数,该方法会等待所有的CompletableFuture都执行完成之后组合完成,再执行接下来的操作;
  2. anyOf,anyOf和allOf类似,也是组合多个,但只要有一个CompletableFuture完成之后便执行后续的操作;

这部分内容参考自:
一字马胡 - 深度学习Java Future(一)
一字马胡 - 深度学习Java Future(二)

3. 运行示例

接下来,我们通过几个简单的例子来看一下这几个接口的使用。

3.1 Callable和Future

我们先来看一下Callable和Future的使用:

public class CallableTest {
    public static void main(String[] args) {
        try {
            callable();
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void callable() throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        Future<String> future = executor.submit(new Callable<String>() {
            public String call() throws InterruptedException {
                System.out.println("child thread start --");
                TimeUnit.SECONDS.sleep(3);
                System.out.println("child thread end --");
                return "hello callable";
            }
        });
        executor.shutdown();
        String result = future.get();
        System.out.println("return value: " + result);
    }
}

其中,匿名类Callable可以替换为lambda表达式:

public static void callable() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newCachedThreadPool();
    Future<String> future = executor.submit(() -> {
        System.out.println("child thread start --");
        TimeUnit.SECONDS.sleep(3);
        System.out.println("child thread end --");
        return "hello callable";
    });
    executor.shutdown();
    String result = future.get();
    System.out.println("return value: " + result);
}

output:

child thread start --
child thread end --
return value:hello callable
3.2 Callable和FutureTask

至于FutureTask,我们可以通过FutureTask的构造方法来构造对象,然后通过ExecutorService的execute方法来执行:

public static void callable() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newCachedThreadPool();
    FutureTask<String> future = new FutureTask<>(() -> {
        System.out.println("child thread start --");
        TimeUnit.SECONDS.sleep(3);
        System.out.println("child thread end --");
        return "hello callable";
    });
    executor.execute(future);
    executor.shutdown();
    String result = future.get();
    System.out.println("return value: " + result);
}

当然也可以借助Thread类而不是ExecutorService来实现:

public static void callable() throws ExecutionException, InterruptedException {
    FutureTask<String> future = new FutureTask<>(() -> {
        System.out.println("child thread start --");
        TimeUnit.SECONDS.sleep(3);
        System.out.println("child thread end --");
        return "hello callable";
    });
    new Thread(future).start();
    String result = future.get();
    System.out.println("return value: " + result);
}

本文参考自:
海子-Java并发编程:Callable、Future和FutureTask
https://docs.oracle.com/javase/8/docs/api/
IBM - 通过实例理解 JDK8 的 CompletableFuture
鸟窝-Java CompletableFuture 详解
《Java并发编程实战》

上一篇下一篇

猜你喜欢

热点阅读