CompletableFuture异步编程

2019-10-16  本文已影响0人  刘彦青

CompletableFuture 有什么用

创建CompletableFuture对象

方式一:使用默认线程池

/**
 * 创建一个不带返回值得任务。
 */
CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
     @Override
     public void run() {
      //业务逻辑          
     }
 });
 /**
  * 创建一个带返回值的任务。
  */
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        //业务逻辑
        return null;
    }
});

方式二:使用自定义线程池(建议使用)

 //创建线程池
ExecutorService executor =  Executors.newFixedThreadPool(10); 
        
//创建一个不带返回值得任务。
CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {

    }
},executor);
//创建一个带返回值的任务。
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        //业务逻辑
        return null;
    }
},executor);    
  1. 默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的 核数(也可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置ForkJoinPool 线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的I/O操 作,就会导致线程池中所有线程都阻塞在I/O操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
  2. 创建完CompletableFuture对象之后,会自动地异步执行runnable.run()方法或者supplier.get()方法。因为CompletableFuture类实现了Future接口,所以这两个问题你都可以通过Future接口来解决。另外,CompletableFuture类还实现了CompletionStage接口。

常用API

理解CompletionStage接口

CompletionStage接口可以清晰地描述任务之间的这种时序关系,时序关系:串行,并行,汇聚。

串行

线程与线程之间的执行顺序是串行的。

//Async代表的是异步执行fn
CompletionStage<R>  thenApply(fn); 
CompletionStage<R>  thenApplyAsync(fn); 
CompletionStage<Void>   thenAccept(consumer); 
CompletionStage<Void>   thenAcceptAsync(consumer); 
CompletionStage<Void>   thenRun(action); 
CompletionStage<Void>   thenRunAsync(action); 
CompletionStage<R>  thenCompose(fn); 
CompletionStage<R>  thenComposeAsync(fn);

演示串行

//supplyAsync()启动一个异步 流程
CompletableFuture<String> f0 = CompletableFuture.supplyAsync(
    () -> "Hello World")                        //①     
    .thenApply(s -> s + "girl")     //②
    .thenApply(String::toUpperCase);//③
System.out.println(f0.join());
//输出结果 HELLO WORLD  girl

虽然这是一个异步流程,但任务①②③却是 串行执行的,②依赖①的执行结果,③依赖②的执行结果。

AND汇聚

CompletionStage接口里面描述AND汇聚关系,主要是thenCombine、thenAcceptBoth和runAfterBoth系列的接口,这些接口的区别也是源自fn、consumer、action这三个核心参数不同。

CompletionStage<R>  thenCombine(other,fn); 
CompletionStage<R>  thenCombineAsync(other,fn); 
CompletionStage<Void>   thenAcceptBoth(other,consumer); CompletionStage<Void>   thenAcceptBothAsync(other,consumer); CompletionStage<Void>  runAfterBoth(other, action); 
CompletionStage<Void>   runAfterBothAsync(other,    action);

演示:

        // 启动一个异步流程f1
        CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{
            System.out.println("异步任务-1");

        });
        // 启动一个异步流程f2
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
            String s = "异步任务-2";
            System.out.println(s);
            return  s;

        });
        //启动一个汇聚流程f3
        CompletableFuture<String> f3 = f1.thenCombine(f2,(a,tf)->{
            return tf;//tf是f2的值
        });

        //等待任务3执行结果
        System.out.println(f3.join());
OR汇聚

CompletionStage接口里面描述OR汇聚关系,主要是applyToEither、acceptEither和runAfterEither系列的 接口,这些接口的区别也是源自fn、consumer、action这三个核心参数不同。

CompletionStage applyToEither(other,    fn);
CompletionStage applyToEitherAsync(other,   fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other,    consumer);
CompletionStage runAfterEither(other,   action); 
CompletionStage runAfterEitherAsync(other,  action);

功能演示:

         // 启动一个异步流程f1
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{
            int t   =    new Random().nextInt(10);
            System.out.println("f1-t = "+t);
            sleep(t,    TimeUnit.SECONDS);
            return  String.valueOf(t);
        });
     // 启动一个异步流程f2
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
            int t   =    new Random().nextInt(10);
            System.out.println("f2-t = "+t);
            sleep(t,    TimeUnit.SECONDS);
            return  String.valueOf(t);
        });
        //将f1和f2的值汇总到f3
        CompletableFuture<String> f3 = f1.applyToEither(f2,s    ->
                    Integer.parseInt(f2.join())+Integer.parseInt(s)+""
        );
        System.out.println(f3.join());

**** 码字不易如果对你有帮助请给个关注****

**** 爱技术爱生活 QQ群: 894109590****

上一篇 下一篇

猜你喜欢

热点阅读