【精】JDK8中CompletableFuture+lambda

2022-01-07  本文已影响0人  小胖学编程

如何“快准狠”的使用线程池呢,JDK给出了一个工具类:CompletableFuture来简化开发。

1. 快速入手

1.1 执行单个任务

任务无非两种,一种没有返回值:Runnable,一种有返回值:Callable。而CompletableFuture可以理解为一个简化操作的工具类,其提供的API目的就是为了更加便利的进行流式操作。

方法 参数 作用
supplyAsync Supplier<U>,executor 带有返回值的任务,自定义的线程池
runAsync Runnable,executor 没有返回值的任务,自定义的线程池
方法 作用
join 阻塞等待结果,但不会抛出uncheck异常
get 阻塞等待结果,但会抛出uncheck异常

join和get区别:

  1. get()方法会throws InterruptedException, ExecutionException(继承了Exception类);
  2. join()方法包装了异常,将异常封装为CompletionException异常(继承了RuntimeException类);

准备代码:

@Slf4j
public class TestBf {
    static ThreadPoolExecutor executor =
            new ThreadPoolExecutor(10, 10, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20));

    static List<String> sources = Arrays.asList("tom", "liMing", "tony");

    private static User createUser(String name) {
        if (StringUtils.startsWith(name, "t")) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            throw new RuntimeException("eee");
        }
        User user = new User();
        user.setName(name);
        return user;
    }
    @Data
    public static class User {
        private String name;
    }
}

测试代码:

    /**
     * 1. 子线执行supplyAsync内部方法,父线程调用join/get方法等待子线程执行结果。
     * 2. 当不传递executor参数时,默认使用forkJoinPool线程池
     */
    private static void testSupplyAsync() {
        User user = CompletableFuture.supplyAsync(() -> {
            log.info("执行supplyAsyn方法{}");
            return createUser("t");
        }, executor).join();
    }

    /**
     * 1. 子线执行runAsync内部方法,父线程调用join/get方法等待子线程执行结果。
     * 2. 当不传递executor参数时,默认使用forkJoinPool线程池
     */
    private static void testRunAsync() {
        CompletableFuture.runAsync(() -> {
            log.info("执行runAsync方法{}");
        }, executor).join();
    }

1.2 执行多个任务

实际上,我们现实中需求一般要求并发批量处理一组数据,在最慢的子任务执行完毕后,统一的返回结果。

方法 作用
allOf 聚合多个CompletableFuture对象,全部执行完毕后才会返回结果
anyOf 聚合多个CompletableFuture对象,最快的任务执行完毕后返回结果
join/get 利用lambda特性。全部执行完毕才会返回结果

1.2.1 allOf实战

说下为什么allOf()的响应参数为CompletableFuture<Void>对象,因为是将一批CompletableFuture对象(这里称为futures)进行聚合(可能响应对象不同),所以allOf()无法使用一个公共的响应对象,只能使用void。

获取结果时,依旧还得去遍历futures拿到每个子任务的响应对象。

    private static void testAllOf() throws InterruptedException, ExecutionException {

        //提交一批任务
        List<CompletableFuture<User>> futures =
                sources.stream().map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor))
                        .collect(Collectors.toList());
        log.info("begin allOf");
        //将这一批CompletableFuture对象,使用allOf操作得到一个CompletableFuture对象
        CompletableFuture<Void> allFutures =
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}));
        log.info("after allOf");
        //等待所有结果执行完毕
        allFutures.join();
        //拿到所有结果(注意此处逻辑是主线程去进行的map操作,可进行优化)
        List<User> list = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
        log.info("执行完毕:{}", list);
    }

优化版2:使用子线程链式的处理后续结果

    private static void testAllOfAndThenApply() throws InterruptedException, ExecutionException {

        //提交一批任务
        List<CompletableFuture<User>> futures =
                sources.stream().map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor))
                        .collect(Collectors.toList());
        log.info("begin allOf");
        //将这一批CompletableFuture对象,使用allOf操作得到一个CompletableFuture对象
        CompletableFuture<Void> allFutures =
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}));
        log.info("after allOf");
        //allFutures(即所有futures)执行完毕后。执行thenApply()内部的逻辑,实现转化。因为此处依旧是交由子线程处理的,所以返回的依旧是CompletableFuture<List<User>>对象
        CompletableFuture<List<User>> listCompletableFuture =
                allFutures.thenApply(s -> futures.stream().map(r -> {
                    User user = r.join();
                    if (user.getName().startsWith("t")) {
                        sleepWithNoException(1000);
                    }
                    log.info("此处是子线程打印:{}", user);
                    return user;
                }).collect(Collectors.toList()));

        log.info("执行完毕:{}", listCompletableFuture.get());
    }

1.2.2 anyOf实战

获取执行最快的子任务,某些场景下毕竟有用,响应对象是CompletableFuture<Object>,还是一样的原因,因为聚合的子任务的响应对象可以是不同的。

获取结果时,需要强制。

    private static void testAnyOf() throws InterruptedException, ExecutionException {
        //提交一批任务
        List<CompletableFuture<User>> userFutures =
                sources.stream().map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor))
                        .collect(Collectors.toList());
        //使用anyOf聚合
        CompletableFuture<Object> anyFuture =
                CompletableFuture.anyOf(userFutures.toArray(new CompletableFuture[] {}));
        log.info("anyOf操作...");
        //获取执行最快的任务
        User user = (User) anyFuture.get();
        log.info("最终输出{}", user);
    }

1.2.3 join/get操作

    private static void testBatchJoin() {
        //当createUser抛出异常时,该方法也会抛出异常
        List<User> users = sources.stream()
                .map(s -> CompletableFuture.supplyAsync(() -> createUser(s)))  //提交任务  
                .map(CompletableFuture::join)     //等待结果
                .collect(Collectors.toList());    //转化列表
        log.info("打印最终参数:{}", users);
    }

这个方法的也是可以并发处理多个任务,但是不如allOf()的就是map(CompletableFuture::join)的操作实际上是主线程执行的。

2. 链式操作

2.1 子线程链式处理

CompletableFuture提供了丰富的链式操作逻辑。

方法 入参 作用
thenApply Function<? super T,? extends U> 将传入的对象转换为另一个对象,响应对象依旧是CompletableFuture
thenCompose Function<? super T, ? extends CompletionStage<U>> 连接两个CompletableFuture对象,响应对象依旧是CompletableFuture
thenAccept Consumer<? super T> action 消费传入的对象,响应对象void
thenRun Runnable 执行下一个任务

thenApply和thenCompose的区别

thenApply的案例:

    private static void testAllOfAndThenApply() throws InterruptedException, ExecutionException {

        //提交一批任务
        List<CompletableFuture<User>> futures =
                sources.stream().map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor))
                        .collect(Collectors.toList());
        log.info("begin allOf");
        //将这一批CompletableFuture对象,使用allOf操作得到一个CompletableFuture对象
        CompletableFuture<Void> allFutures =
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}));
        log.info("after allOf");
        //allFutures(即所有futures)执行完毕后。执行thenApply()内部的逻辑,实现转化。因为此处依旧是交由子线程处理的,所以依旧是Com
        CompletableFuture<List<User>> listCompletableFuture =
                allFutures.thenApply(s -> futures.stream().map(r -> {
                    User user = r.join();
                    if (user.getName().startsWith("t")) {
                        sleepWithNoException(1000);
                    }
                    log.info("run thenApply:{}", user);
                    return user;
                }).collect(Collectors.toList()));

        log.info("执行完毕:{}", listCompletableFuture.get());
    }

结果:可以看到thenApply使用的是同一个CompletableFuture,thenApply中的逻辑也是使用子线程进行处理。

17:34:50.827 [main] INFO com.tellme.obj.TestBf - begin allOf
17:34:50.831 [main] INFO com.tellme.obj.TestBf - after allOf
17:34:51.848 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=tom)
17:34:51.848 [pool-1-thread-3] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=tony)
17:34:52.848 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=liMing)
17:34:53.852 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run thenApply:TestBf.User(name=tom)
17:34:53.852 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run thenApply:TestBf.User(name=liMing)
17:34:54.857 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run thenApply:TestBf.User(name=tony)
17:34:54.857 [main] INFO com.tellme.obj.TestBf - 执行完毕:[TestBf.User(name=tom), TestBf.User(name=liMing), TestBf.User(name=tony)]

(等效)thenCompose的案例:

    private static void testAllOfAndThenCompose() throws InterruptedException, ExecutionException {

        //提交一批任务
        List<CompletableFuture<User>> futures =
                sources.stream()
                        .map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor))  //此时的线程为executor
                        .collect(Collectors.toList());
        log.info("begin allOf");
        //将这一批CompletableFuture对象,使用allOf操作得到一个CompletableFuture对象
        CompletableFuture<Void> allFutures =
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}));
        log.info("after allOf");

        CompletableFuture<List<User>> listCompletableFuture = allFutures
                .thenCompose(s -> CompletableFuture.supplyAsync(
                        () -> futures.stream().map(future -> {
                            log.info("print thenCompose request:{}", s);
                            User user = future.join();
                            if (user.getName().startsWith("t")) {
                                sleepWithNoException(1000);
                            }
                            log.info("print thenCompose response:{}", user);  //此时的线程为ForkJoinPool
                            return user;
                        }).collect(Collectors.toList())));

        log.info("执行完毕:{}", listCompletableFuture.get());
    }

结果:可以看到thenCompose是将两个CompletableFuture组合起来,然后返回结果,使用的子线程也并不是一个。

20:16:00.602 [main] INFO com.tellme.obj.TestBf - begin allOf
20:16:00.606 [main] INFO com.tellme.obj.TestBf - after allOf
20:16:01.636 [pool-1-thread-3] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=tony)
20:16:01.627 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=tom)
20:16:02.628 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=liMing)
20:16:02.633 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose request:null
20:16:03.636 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose response:TestBf.User(name=tom)
20:16:03.636 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose request:null
20:16:03.636 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose response:TestBf.User(name=liMing)
20:16:03.637 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose request:null
20:16:04.642 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose response:TestBf.User(name=tony)
20:16:04.642 [main] INFO com.tellme.obj.TestBf - 执行完毕:[TestBf.User(name=tom), TestBf.User(name=liMing), TestBf.User(name=tony)]

2.2 结果合并

方法 作用
thenCombine 两个CompletableFuture结合,有返回结果
thenAcceptBoth 两个CompletableFuture结合,无返回结果
    /**
     * 无返回值的结合
     */
    private static void testThenAcceptBoth() {
        CompletableFuture<User> userCompletableFuture1 = CompletableFuture.supplyAsync(() -> createUser("1"));
        CompletableFuture<User> userCompletableFuture2 = CompletableFuture.supplyAsync(() -> createUser("2"));
        userCompletableFuture1.thenAcceptBoth(userCompletableFuture2, (r1, r2) -> {
            log.info("打印数据{},{}", r1, r2);
        }).join();
    }

    /**
     * 有返回值的结合
     */
    private static void testTheCombine() {
        CompletableFuture<User> userCompletableFuture1 = CompletableFuture.supplyAsync(() -> createUser("1"));
        CompletableFuture<User> userCompletableFuture2 = CompletableFuture.supplyAsync(() -> createUser("2"));
        //两个结果结合
        CompletableFuture<String> resFuture =
                userCompletableFuture1.thenCombine(userCompletableFuture2, (r1, r2) -> r1.getName() + r2.getName());
        log.info("两个结果的结合:{}" + resFuture.join());
    }

2.3 结果(异常)回调

方法 参数 作用
exceptionally Function<Throwable, ? extends T> 当子线程抛出异常时,将回调该方法,完成降级,该方法有返回值
whenComplete BiConsumer<? super T, ? super Throwable> action 当子线程执行完后/抛出异常,将调用该方法,该方法无返回值
handle BiFunction<? super T, Throwable, ? extends U> 当子线程执行完后/抛出异常,将调用该方法,该方法有返回值

exceptionally使用场景:

  1. 并发处理多个任务时,若一个任务抛出异常时,不希望去终止所有的任务时,可以使用该方法。
    private static void testExceptionally() {
        List<User> users = sources.stream()
                .map(s -> CompletableFuture
                        .supplyAsync(() -> createUser(s), executor)   //执行此方法,当此方法抛出异常,可能影响导致并发执行失败
                        .exceptionally(ex -> new User()))        //当出现异常时,将回调exceptionally方法。完成降级
                .map(CompletableFuture::join).collect(Collectors.toList());
        System.out.println(users);
    }
  1. thenXxx链式调用:防止雪崩效应(即前面流程出现异常,导致theXx内部逻辑不执行)
    /**
     * 防止雪崩效应。
     */
    private static void testExceptionally() {
        String res = CompletableFuture.supplyAsync(() -> {
                    log.info("a1 请求");
                    return "a1";
                })
                .thenApply(s -> {
                    int i = 1 / 0;
                    log.info("a2 请求参数{}", s);
                    return "a2";
                })
                .exceptionally(ex -> {
                    return "a2 error";      //此处是降级逻辑
                }).thenApply(s -> {
                    log.info("a3 请求参数:{}", s);    //此时s的值为a2 error
                    return "a3";
                })
                .join();
        log.info("最终结果:{}", res);
    }

结果:

21:14:50.631 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - a1 请求
21:14:50.633 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - a3 请求参数:a2 error
21:14:50.634 [main] INFO com.tellme.obj.TestBf - 最终结果:a3

whenComplete使用场景:

无论是否出现异常,肯定会回调whenComplete方法,但是出现异常后,main方法会被中断。

whenComplete参数是BiConsumer<? super T, ? super Throwable>当触发方法后,并不会返回降级结果。

    private static void testWhenComplete() {

        String res = CompletableFuture.supplyAsync(() -> {
            log.info("run supplyAsync ");
            int i = 1 / 0;  //出现异常
            return createUser("1");
        }, executor).thenApply(u -> {
            log.info("run thenApply ");
            return u.getName();
        }).whenComplete((s, ex) -> {
            log.info("run whenComplete {}", s);
        }).join();
        log.info("main {}:", res);
    }

结果:

21:19:12.180 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run supplyAsync 
21:19:12.184 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run whenComplete null
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ArithmeticException: / by zero
    at com.tellme.obj.TestBf.lambda$testWhenComplete$5(TestBf.java:106)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
    ... 3 more

handle使用场景:

方法执行完毕,或者出现异常时,都会调用这个方法,都结果进行最终的后处理。

    private static void testHandle() {

        String join = CompletableFuture.supplyAsync(() -> {
                    log.info("run supplyAsync ");
                    return createUser("1");
                }, executor).thenApply(u -> {
                    log.info("run thenApply ");
                    return u.getName();
                }).handle((s, ex) -> s + "123")  // 方法执行完毕,或者出现异常时,都会调用这个方法,都结果进行最终的后处理。
                .join();

        System.out.println(join);
    }

结果:

21:22:33.607 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run supplyAsync 
null123

好文阅读

Java8的CompletableFuture进阶之道
Java8 CompletableFuture(6) thenCompose和thenCombine的区别

历史文章

多线程——线程池ThreadPoolExecutor
SpringBoot2.x整合线程池(ThreadPoolTaskExecutor)

上一篇 下一篇

猜你喜欢

热点阅读