CompletableFuture入门

2018-08-02  本文已影响79人  不知名的蛋挞

CompletableFuture介绍

Future 接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:

等等之类的,新的CompletableFuture将使得这些成为可能。

CompletableFuture实现了Future<T>, CompletionStage<T>两个接口。所以还是可以像以前一样通过阻塞或轮询的方式获得结果,尽管这种方式不推荐使用。

虽说 CompletableFuture 实现了 Future 接口,但它多数方法源自于 CompletionStage。CompletionStage是一个接口,从命名上看得知是一个完成的阶段,它里面的方法也标明是在某个运行阶段得到了结果之后要做的事情。

如下我们使用new的方式创建一个CompletableFuture(这种方式不常用来创建一个CompletableFuture,此处仅为了说明情况),用阻塞的方式得到了结果:

public class CompletableFutureInAction {

    private static Random RAMDOM = new Random(System.currentTimeMillis());

    public static void main(String[] args)
            throws ExecutionException, InterruptedException {
        CompletableFuture<Double> completableFuture = new CompletableFuture();
        new Thread(()->{
            double value = get();
            completableFuture.complete(value);
        }).start();

        System.out.print("===========no==block");
        Optional.ofNullable(completableFuture.get()).ifPresent(System.out::println);
    }

    private static double get(){
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return RAMDOM.nextDouble();
    }
}

运行之后会立刻输出:

===========no==block

过了n秒就会输出一个随机数:

0.39273128973

如果我们想实现方法完成时会收到通知,而不是一直阻塞在结果的获取上面,可以这样写:

public class CompletableFutureInAction {

    private static Random RAMDOM = new Random(System.currentTimeMillis());

    public static void main(String[] args)
            throws ExecutionException, InterruptedException {
        CompletableFuture<Double> completableFuture = new CompletableFuture();
        new Thread(()->{
            double value = get();
            completableFuture.complete(value);
        }).start();

        System.out.print("===========no==block");
        completableFuture.whenComplete((v,t)->{
            Optional.ofNullable(v).ifPresent(System.out::println);
            Optional.ofNullable(t).ifPresent(x->x.printStackTrace());
        });
    }

    private static double get(){
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return RAMDOM.nextDouble();
    }
}

CompletableFuture的基本使用

【创建CompletableFuture对象】

CompletableFuture.completedFuture是一个静态辅助方法,用来返回一个已经计算好的CompletableFuture。

public static <U> CompletableFuture<U> completedFuture(U value)

而以下四个静态方法用来为一段异步执行的代码创建CompletableFuture对象:

public static CompletableFuture<Void>   runAsync(Runnable runnable)
public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)

以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

这四个方法的参数类型都是函数式接口,所以可以使用lambda表达式实现异步任务。

【计算结果完成时的处理】

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

可以看到Action的类型是BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。

方法不以Async结尾,意味着Action使用相同的线程执行,而如果以Async结尾且没有指定Executor,则从ForkJoinPool.commonPool()中获取线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。

如果你用过Future,就会知道糟糕的时候有多糟糕。幸运的是,CompletableFuture有一个漂亮的对应手段,通过使用exceptionally。exceptionally给我们一个机会恢复,通过执行当异步执行的计算抛出exception时备选的方法(alternative method)。

代码示例:

public class CompletableFutureInAction2 {

    private static Random RAMDOM = new Random(System.currentTimeMillis());

    public static void main(String[] args)
            throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(CompletableFutureInAction2::get)
                         .whenComplete((v,t)->{
                             Optional.ofNullable(v).ifPresent(System.out::println);
                             Optional.ofNullable(t).ifPresent(x->x.printStackTrace());
                         });
        System.out.print("===========no==block");
    }

    private static double get(){
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return RAMDOM.nextDouble();
    }
}

输出结果:

===========no==block

为什么没有结果输出呢?因为whenComplete里面的线程是守护线程,所有的“非后台线程”结束时,程序也就终止了,同时会杀死进程中所有后台线程:main就是一个非后台线程。所以运行之后会直接输出===========no==block,但方法没有执行完那当然没有任何结果输出。如果我们想得到结果可以在 System.out.print("===========no==block");后面加上一句:

Thread.currentThread().join();

这样main线程会等待CompletableFuture的线程结束之后才能继续运行。

对于线程池中的所有线程默认都为非后台线程,这样主线程退出时不会直接退出JVM,而是等待线程池结束。所以线程池的生命周期要比创建它们的程序生命周期要长,我们必须使用shutdown()方法手动退出。我们知道Executors可以用来创建一个线程池,如果我们想让Executors创建的线程池中的线程自动结束,可以使用如下方法:

 public static void main(String[] args) {
        // ExecutorService 默认不是守护线程 默认false setDaemon(false)
        ExecutorService executor = Executors.newFixedThreadPool(2,r->{
            // 对线程池里面的线程进行初始化设置
            Thread t = new Thread();
            t.setDaemon(true);  // 设置线程为守护线程
            return t;
        });
        
        executor.execute(()->System.out.print("test"));

        // 不用执行shutdown,线程池中的线程随着main()方法执行完也随之退出
    }

执行supplyAsync的线程是守护线程,所以main()函数执行完了这个线程也随之被杀死。而如果我们把线程池中的所有线程默认都转换为非后台线程,然后用这个线程池中的县城去执行supplyAsync,我们就不用依赖join(),这样主线程退出时不会直接退出JVM,我们就可以等到方法输出结果。

public class CompletableFutureInAction2 {
    private static Random RAMDOM = new Random(System.currentTimeMillis());

    public static void main(String[] args)
            throws ExecutionException, InterruptedException {
        AtomicBoolean finished = new AtomicBoolean(false);

        ExecutorService executor = Executors.newFixedThreadPool(2,r->{
            Thread t = new Thread();
            t.setDaemon(false);
            return t;
        });
        
        // CompletableFuture.supplyAsync(executor)  whenComplete
        // 上面两步都是 [pool-1-thread-1]执行 即两个步骤是同一个线程执行
        CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executor)
                .whenComplete((v,t)->{
                    Optional.ofNullable(v).ifPresent(System.out::println);
                    finished.set(true);
                });
        System.out.print("===========no==block");

       /* while(!finished.get()){
            Thread.sleep(1);
        }*/
    }

    private static double get(){
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return RAMDOM.nextDouble();
    }
}

做好了上面的准备,下面我们来看一下方法使用Async结尾和不使用有什么区别:

public class CompletableFutureInAction2 {
    private static Random RAMDOM = new Random(System.currentTimeMillis());

    public static void main(String[] args) {
        ExecutorService executorPool = Executors.newFixedThreadPool(2, run -> {
            Thread t = new Thread(run);
            t.setDaemon(false);
            return t;
        });

        //###1.0 CompletableFuture.supplyAsync(executorPool)  thenApply  whenComplete
        CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executorPool)
                .thenApply(t->{
                    System.out.println("##1.0## [thenApplyThreadName]=["+Thread.currentThread().getName()+"]");
                    return t*10;
                })
                .whenComplete((v,t)->{
                    System.out.println("##1.0## [whenCompleteThreadName]=["+Thread.currentThread().getName()+"]");
                    System.out.println(v);
                    t.printStackTrace();
                });
        //### 上面三步都是 [pool-1-thread-1]执行 即三个步骤是同一个线程执行

        //###2.0 CompletableFuture.supplyAsync(executorPool)  thenApplyAsync  whenCompleteAsync
        CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executorPool)
                .thenApplyAsync(t->{
                    System.out.println("##2.0## [thenApplyThreadName]=["+Thread.currentThread().getName()+"]");
                    return t*10;
                }).
                whenCompleteAsync((v,t)->{
                   System.out.println("##2.0## [whenCompleteThreadName]=["+Thread.currentThread().getName()+"]");
                   System.out.println(v);
                   t.printStackTrace();
                });
        //### 上面三步supplyAsync=[pool-1-thread-2]  后面两个都是[ForkJoinPool.commonPool-worker-1]
        //### 原因由于调用这个时thenApplyAsync,没有指定Executor executor,然后又是因为异步,默认采用ForkJoin的连接池
        //### 由于工作量不大,ForkJoinPool并没有分配两个线程,ForkJoinPool.commonPool-worker-1承担了两份工作

        //###3.0 CompletableFuture.supplyAsync(executorPool)  thenApplyAsync  whenCompleteAsync(executorpool)
        CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executorPool)
                .thenApply(t->{
                    System.out.println("##3.0## [thenApplyThreadName]=["+Thread.currentThread().getName()+"]");
                    return t*10;
                }).
                whenCompleteAsync((v,t)->{
                    System.out.println("##3.0## [whenCompleteThreadName]=["+Thread.currentThread().getName()+"]");
                    System.out.println(v);
                    t.printStackTrace();},executorPool);
        //### 上面三步supplyAsync和thenApply 都是[pool-1-thread-3]  whenCompleteAsync[pool-1-thread-4]
        //### 原因由于调用这个时thenApply和前者同一线程  whenCompleteAsync指定了线程
    }

    private static double get(){
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return RAMDOM.nextDouble();
    }
}
上一篇下一篇

猜你喜欢

热点阅读