java学习之路

JavaGuide知识点整理——CompletableFutur

2022-07-28  本文已影响0人  唯有努力不欺人丶

其实CompletableFuture现在使用的比较多。很多开源框架都大量用到了。因此专门写一篇文章来介绍这个java8才被引入的一个非常有用的异步编程的类。

CompletableFuture简介

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

CompletableFuture同时实现了Future和CompletionStage接口。除了提供了更为好用和强大的Future特性之外,还提供了函数式编程的能力。
先从Future开始说起:
Future是个接口,比较简洁,有五个方法:


Future接口

CompletionStage<T>接口的方法就比较多了,CompletableFuture的函数式能力就是这个接口赋予的。从这个接口的方法参数你就可以发现其中大量使用了java8引入方法函数式编程。


CompletionStage接口

CompletionStage<T>接口中的方法比较多CompletableFuture的函数式能力就是这个接口赋予的从这个接口的方法参数就可以发现大量使用了java8引入的函数式编程。

常见操作

创建CompletableFuture

常见的创建CompletableFuture对象方法有两种:

new关键字
通过new关键字创建CompletableFuture对象这种使用方式可以看做是将CompletableFuture当做Future来使用。
下面看一个简单的案例:
我们通过创建一个结果值类型为List<Integer>的CompletableFuture,可以把result看做是异步运算结果的载体。

        CompletableFuture<List<Integer>> result = new CompletableFuture<>();

假设在某个时刻我们得到了最终结果,这时我们可以调用complete()方法为其传入结果,这表示result已经被完成了(我们可以理解为在这里把结果装进result中)。

        result.complete(ids);//只有第一次调用有效

可以通过idDone()方法来检查是否已经完成。

public boolean isDone() {
    return result != null;
}

获取异步计算的结果也非常简单,直接调用get方法即可

ids = result.get();
complete和get用法

静态工厂方法
这两个方法可以帮助我们封装计算逻辑

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 使用自定义线程池(推荐)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
// 使用自定义线程池(推荐)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

runAsync()方法接收的参数的Runnable(执行run方法),这是一个函数式接口,不允许返回值。当你需要异步操作且不关心返回结果的时候可以使用runAsync()方法。

supplyAsync()方法接收的参数是Supplier<U>,这也是一个函数式接口,U是返回结果值的类型。当你需要异步操作且关系返回结果的时候,可以使用supplyAsync()方法。
下面是两个方法是使用案例:

    public static void main(String[] args) throws Exception {
        CompletableFuture<List<Integer>> result = CompletableFuture.supplyAsync(()->{
            List<Integer> ids = Arrays.asList(1,2,3);
            return ids;
        });
        System.out.println(result.get());
        
        CompletableFuture noResult = CompletableFuture.runAsync(()-> System.out.println("这个没返回值"));
        System.out.println(noResult.get());

    }

这个控制台打印结果如下:


没有返回值的get是null

处理异步结算的结果

当我们获取到异步计算的结果之后,还可以对其进行下一步的处理。比较常用的方法有下面几个:

![image.png](https://img.haomeiwen.com/i16553345/9aa2cbba947e2f7f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

这个方法可以接好多个,流式调用一直.就行。
如果不需要从回调函数中获取返回结果,可以使用thenAccept()或者thenRun(),这两个方法的区别在于thenRun()不能访问异步计算的结果。rhenRun()有以下几个方法:

public CompletableFuture<Void> thenRun(Runnable action) {
    return uniRunStage(null, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(defaultExecutor(), action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action,
                                            Executor executor) {
    return uniRunStage(screenExecutor(executor), action);
}

下面是用法案例:

    public static void main(String[] args) throws Exception {
        CompletableFuture result =
            CompletableFuture.runAsync(()-> System.out.println("执行了一系列计算")).thenRun(()-> System.out.println(
                "执行完了计算通知")).thenRun(()-> System.out.println("附加了一些处理逻辑"));
        System.out.println(result.isDone());
    }

关于thenAccept()方法的参数是Consumer<? super T>.有以下几个方法:

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(defaultExecutor(), action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                               Executor executor) {
    return uniAcceptStage(screenExecutor(executor), action);
}

顾名思义,Consumer属于消费型接口,它可以接收1个输入对象然后进行消费,使用案例如下:

    public static void main(String[] args) throws Exception {
        CompletableFuture.supplyAsync(()-> "执行完任务!").thenAccept(s -> {
            System.out.println(s+"!!!");
        });
        CompletableFuture.runAsync(()-> System.out.println("执行计算!")).thenAccept(s-> System.out.println("添加了一些操作"));
    }
java8接口文档

因为这几个方法很容易搞混,所以简单说一下上面三者的区别:

whenComplete()的方法的参数是BiConsumer<? super T,? super Throwable>。有以下几个方法:

public CompletableFuture<T> whenComplete(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(defaultExecutor(), action);
}
// 使用自定义线程池(推荐)
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action, Executor executor) {
    return uniWhenCompleteStage(screenExecutor(executor), action);
}

相比于Consumer,Biconsumer可以接收两个输入对象向后进行消费。多了一个异常定义。用法类似。

    public static void main(String[] args) throws Exception {
        CompletableFuture.supplyAsync(()-> "执行完任务!").whenComplete((str,ex)->{
            System.out.println(str);
            System.out.println(ex == null);});

    }

这里ex == null的结果是会是true。

异常处理

可以通过handle()方法来处理任务执行过程中可能出现的抛出异常的情况。下面是使用案例:

    public static void main(String[] args) throws Exception {
        System.out.println(CompletableFuture.supplyAsync(()-> {
            int i = 1/0;
            return "返回结果";
        }).handle((res,ex)->{
            if(ex != null){
                return "报错了";
            }
            return res;
        }).get());
    }

还可以通过exceptionally()方法来处理异常情况。注意exceptionally()是有异常的时候才会处理,否则正常返回。

    public static void main(String[] args) throws Exception {
        System.out.println(CompletableFuture.supplyAsync(()-> {
            int i = 1/0;
            return "返回结果";
        }).exceptionally(ex->"处理异常了!").get());
        System.out.println(CompletableFuture.supplyAsync(()-> {
            return "返回结果";
        }).exceptionally(ex->"处理异常了!").get());
    }

如上代码的执行结果是这样的:


有异常才处理

当然了如果想要CompletableFuture的结果就是异常的话,可以使用completeExceptionally()方法为其赋值:

    public static void main(String[] args) throws Exception {
        CompletableFuture completableFuture = CompletableFuture.supplyAsync(()-> {
            return "返回结果";
        });
        completableFuture.completeExceptionally(new Exception("手动加入一个异常!"));
        System.out.println(completableFuture.get());
    }

这个比较麻烦的一点是不能直接流式编程,要分开写。反正用法也比较简单。

组合CompletableFuture

我们可以使用thenCompose()按顺序连接两个CompletableFuture对象。thenCompose方法有如下几个:

public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(null, fn);
}

public <U> CompletableFuture<U> thenComposeAsync(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(defaultExecutor(), fn);
}

public <U> CompletableFuture<U> thenComposeAsync(
    Function<? super T, ? extends CompletionStage<U>> fn,
    Executor executor) {
    return uniComposeStage(screenExecutor(executor), fn);
}

使用示例如下:

    public static void main(String[] args) throws Exception {
        System.out.println(CompletableFuture.supplyAsync(()->"任务一执行完毕").thenCompose(s->CompletableFuture.supplyAsync(()->s+" \n任务二执行完毕")).get());
    }

在实际开发中这个方法其实还挺有用的,比如说我们要先获取用户信息然后再利用用户信息去做别的事。和thernCompose()方法类似的还有thenCombine()方法,thenCombine()同样可以组合两个CompletableFuture对象。代码使用如下:

    public static void main(String[] args) throws Exception {
        CompletableFuture completableFuture =
            CompletableFuture.supplyAsync(()->"任务一执行完毕").thenCompose(s->CompletableFuture.supplyAsync(()->s+" " +
                "\n任务二执行完毕")).thenCombine(CompletableFuture.supplyAsync(()->"任务三执行完毕"),(s, s2) -> s+"\n"+s2);
        System.out.println(completableFuture.get());
    }
执行结果

那么thenCombine()和thenCompose()有什么区别呢?

并行运行多个CompletableFuture

可以通过CompletableFuture的allOf()这个静态方法来并行运行多个CompletableFuture。
实际项目中,我们经常需要并行运行多个互不相关的任务,这些任务之间没有依赖关系,可以互相独立运行。

比如说我们要读取6个文件,这6个任务都是没有执行顺序依赖的任务。但是我们需要返回给用户的时候将这几个文件 的处理的结果进行统计整理,像这种情况我们就可以使用并行运行多个CompletableFuture来处理。示例代码如下:

    public static void main(String[] args) throws Exception {
        CompletableFuture task1 = CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(3000l);
            }catch (Exception e){

            }
            return "执行任务1";
        });
        CompletableFuture task2 = CompletableFuture.supplyAsync(()->"执行任务2");
        CompletableFuture task3 = CompletableFuture.supplyAsync(()->"执行任务3");
        CompletableFuture task4 = CompletableFuture.supplyAsync(()->"执行任务4");
        CompletableFuture task5 = CompletableFuture.supplyAsync(()->"执行任务5");
        CompletableFuture task6 = CompletableFuture.supplyAsync(()->"执行任务6");
        System.out.println(DateUtils.format(new Date(),"HH:mm:ss"));
        System.out.println(CompletableFuture.allOf(task1,task2,task3,task3,task4,task5,task6).get()+DateUtils.format(new Date(),"HH:mm:ss"));
    }
运行结果
注意看这个运行结果,在所有的task都执行完了才会返回。
allOf()方法会等到所有的CompletableFuture都运行完成之后再返回。
当然了也可以调用.join()方法让程序等所有任务都运行完了再继续执行。
而anyOf()方法不会等待所有的CompletableFuture都运行完成,而是只要有一个执行完成即可。
案例如下:
有一个执行完了就返回了
代码和上面的差不多,但是换成anyOf以后,立刻返回了,明明task1睡了3秒也没影响。

这篇文章只是简单介绍了CompletableFuture常用的api,如果想深入学习可以自己去找资料看,如果本文对你有所帮助,记得点个喜欢点个关注~也祝大家工作顺顺利利,每天进步一点点!

上一篇 下一篇

猜你喜欢

热点阅读