JAVA 多线程与高并发学习笔记(十八)——Completabl

2022-10-29  本文已影响0人  简单一点点

本部分介绍Java 8 中提供的具备异步回调能力的工具类——CompletableFuture,该类实现了Future接口,还具备函数式编程能力。

CompletableFuture详解

CompletableFuture类实现了Future和CompletionStage两个接口,该类的实例作为一个异步任务。

CompletionStage接口

CompletionStage代表某个同步或者异步计算的一个阶段,或者一些列异步任务中的一个子任务。

每个CompletionStage子任务所包装的可以是一个Function、Consumer或者Runnable函数式接口实例。这三个函数式接口特点如下:

  1. Function接口的特点是有输入、有输出。
  2. Runnable接口的特点是无输入、无输出。
  3. Consumer接口的特点是有输入、无输出。

多个CompletionStage构成了一条任务流水线,多个子任务之间可以使用链式调用,下面是个简单的例子:

oneStage.thenApply(x -> square(x))
            .thenAccept(y -> System.out.println(y))
            .thenRun(() -> System.out.println())

上例子说明如下:

  1. oneStage是一个CompletionStage子任务。
  2. x -> square(x) 是一个Function类型的Lambda表达式,被thenApply方法包装成了CompletionStage子任务,它又包含输入和输出。
  3. y -> System.out.println(y)是一个Consumer类型的Lambda表达式,被thenAccept包装成了一个CompletionStage子任务,它只有输入(即上个任务的输出)。
  4. () -> System.out.println()是一个Runnable类型的Lambda表达式,被thenRun方法包装成了一个CompletionStage子任务,它没有输入输出。

使用runAsync和supplyAsync创建子任务

CompletableFuture定义了一组用于创建CompletionStage子任务的方法。

// 子任务包装一个Runnable实例,并调用ForkJoinPool。commonPool线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable)

// 子任务包装一个Runnable实例,并调用指定的executor线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

// 子任务包装一个Supplier实例,并调用ForkJoinPool。commonPool线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

// 子任务包装一个Supplier实例,并调用指定的executor线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

执行子任务时,如果没有执行Executor线程池,默认情况下会使用公共的ForkJoinPool线程池。

设置子任务回调钩子

可以为CompletionStage子任务设置特定的回调钩子,当计算结果完成或者抛出异常的时候,执行这些特定的钩子。

设置子任务回调钩子的主要函数如下:

// 设置子任务完成时的回调钩子
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throeable> action)

// 设置子任务完成时的回调钩子,可能不在同一线程执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)

// 设置子任务完成时的回调钩子,提交给线程池executor执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

// 设置异常处理的回调钩子
public CompletableFuture<T> exceptionally(Fuction<Throwable, ? extends T> fn)

下面看个简单例子:


public class CompletableFutureDemo {

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("抛出异常");
            throw new RuntimeException("发生异常");
        });

        // 设置异步任务执行完成后的回调钩子
        future.whenComplete(new BiConsumer<Void, Throwable>() {
            @Override
            public void accept(Void aVoid, Throwable throwable) {
                System.out.println("执行完成");
            }
        });

        future.exceptionally(new Function<Throwable, Void>() {
            @Override
            public Void apply(Throwable t) {
                System.out.println("执行失败:" + t.getMessage());
                return null;
            }
        });

        future.get();
    }
}

运行结果:

抛出异常
执行完成
执行失败:java.lang.RuntimeException: 发生异常

调用cancel方法取消CompletableFuture时,任务被视为异常完成,completeExceptionally方法所设置的异常回调钩子也会被执行。

如果没有设置异常回调钩子,发生内部异常时会有两种情况发生:

  1. 在调用get方法启动任务时,如果遇到内部异常,get方法就会抛出ExecutionException。
  2. 在调用join和getNow启动任务时(大多数情况下都是如此),如果遇到内部异常,会抛出CompletionException。

调用handle方法统一处理异常和结果

除了分别通过whenComplete、exceptionally设置完成钩子、异常钩子之外,还可以调用handle方法统一处理结果和异常。

handle方法有3个重载版本:

// 在执行任务的同一个线程中处理异常和结果
public<U> CompletionStage<U> handle (BiFunction<? super T, Throwable, ? extends U> fn);

// 可能不再执行任务的同一个线程中处理异常和结果
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);

// 在指定线程池executor中处理异常和结果
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);

将前面的例子改成handle版本。


public class CompletableFutureDemo {

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("抛出异常");
            throw new RuntimeException("发生异常");
        });

        future.handle(new BiFunction<Void, Throwable, Void>() {
            @Override
            public Void apply(Void input, Throwable throwable) {
                if(throwable == null) {
                    System.out.println("没有发生异常");
                } else {
                    System.out.println("发生了异常");
                }
                return null;
            }
        });

        future.get();
    }
}

线程池的使用

默认情况下,通过静态方法runAsync和supplyAsync创建的CompletableFuture任务会使用公共的ForkJoinPool线程池。默认的线程数是CPU的核数。

如果所有CompletableFuture任务共享一个线程池,那么一旦有任务执行一些很慢的IO操作,会导致所有线程阻塞,造成线程饥饿。所以建议大家根据不同的业务类型创建不同的线程池。

异步任务的串行执行

如果两个异步任务需要串行,可以通过CompletionStage接口的thenApply、thenAccept、thenRun和thenCompose方法实现。

theApply方法

theApply方法有三个重载版本,声明如下:

// 后一个任务与前一个任务在同一个线程中执行
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)

// 后一个任务与前一个任务不在同一个线程中执行
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn)

// 后一个任务在指定的executor线程池中执行
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)

参数fn表示要串行执行的第二个异步任务,泛型参数T是上一个任务所返回结果的类型,泛型参数U是当前任务的返回值类型。

看个简单例子。

public class ThenApplyDemo {

    public static void main(String[] args) throws Exception {
        CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
            @Override
            public Long get() {
                long firstStep = 10L + 10L;
                System.out.println("first step outcome is " + firstStep);

                return firstStep;
            }
        }).thenApplyAsync(new Function<Long, Long>() {
            @Override
            public Long apply(Long aLong) { // 参数是第一步的结果
                long secondStep = aLong * 2;
                System.out.println("Second outcome is " + secondStep);
                return secondStep;
            }
        });

        long result = future.get();
        System.out.println("outcome is " + result);
    }
}

thenRun方法

thenRun方法不关心任务的处理结果,只需要前一个任务执行完成,就开始执行后一个串行任务。

thenApply方法也有三个重载版本,声明如下:

// 后一个任务与前一个任务在同一个线程中执行
public CompletionStage<Void> thenRun(Runnable action);

// 后一个任务与前一个任务不再同一个线程中执行
public CompletionStage<Void> thenRunAsync(Runnable action);

// 后一个任务在executor线程池中执行
public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor);

thenAccept方法

thenAccept方法接收前一个任务的处理结果,但是没有输出。

thenAccept方法有三个重载版本,声明如下:

// 后一个任务与前一个任务在同一个线程中执行
public CompletionStage<Void> thenAccept(Consumer<? super T> action);

// 后一个任务与前一个任务不再同一个线程中执行
public CompletionStage<Void> thenRunAsync(Consumer<? super T> action);

// 后一个任务在executor线程池中执行
public CompletionStage<Void> thenRunAsync(Consumer<? super T> action, Executor executor);

thenCompose方法

thenCompose方法在第一个任务操作完成时,将它的结果作为参数传递给第二个任务。

thenCompose方法有3个重载版本,声明如下:

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U> > fn);

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U> > fn);

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U> > fn, Executor executor);

thenCompose方法要求第二个任务的返回值是一个CompletionStage异步实例。

将前面的例子改成theCompose版本:


public class ThenComposeDemo {

    public static void main(String[] args) throws Exception {
        CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
            @Override
            public Long get() {
                long firstStep = 10L + 10L;
                System.out.println("first step outcome is " + firstStep);

                return firstStep;
            }
        }).thenCompose(new Function<Long, CompletionStage<Long>>() {
            @Override
            public CompletionStage<Long> apply(Long firstStepOutcome) {
                return CompletableFuture.supplyAsync(new Supplier<Long>() {
                    @Override
                    public Long get() {
                        long secondStep = firstStepOutcome * 2;
                        System.out.println("Second outcome is " + secondStep);
                        return secondStep;
                    }
                });
            }
        });
        long result = future.get();
        System.out.println("outcome is " + result);

    }
}

异步任务的合并执行

对两个异步任务合并可以通过CompletionStage接口的thenCombine、runAfterBoth和thenAcceptBoth三个方法来实现。

thenCombine方法

thenCombine会在两个任务都执行完成后,把两个任务的结果一起交给thenCombine来处理。

public <U, V> CompletionStage<V> thenCombine(
    CompletionStage<? extends U> other, // 待合并实例
    BiFunction<? super T, ? super U, ? extends V> fn); 

public <U, V> CompletionStage<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T, ? super U, ? extends V> fn
);

public <U, V> CompletionStage<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T, ? super U, ? extends V> fn,
    Executor executor
);

下面看一个使用thenCombine分三步计算(10+10)*(10+10)的例子:


public class ThenCombineDemo {

    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future1 =
                CompletableFuture.supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        Integer firstStep = 10 + 10;
                        System.out.println("firstStep outcome is " + firstStep);
                        return firstStep;
                    }
                });
        CompletableFuture<Integer> future2 =
                CompletableFuture.supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        Integer secondStep = 10 + 10;
                        System.out.println("secondStep outcome is " + secondStep);
                        return secondStep;
                    }
                });
        CompletableFuture<Integer> future3 = future1.thenCombine(future2,
                new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) {
                        return integer * integer2;
                    }
                });
        Integer result = future3.get();
        System.out.println(" outcome is " + result);
    }
}

runAfterBoth方法

runAfterBoth方法不关心没异步任务的输入参数和处理结果。

public CompletionStage<Void> runAfterBoth(
    CompletionStage<?> other, Runnable action
);

public CompletionStage<Void> runAfterBothAsync(
    CompletionStage<?> other, Runnable action
);

public CompletionStage<Void> runAfterBothAsync(
    CompletionStage<?> other, Runnable action, Executor executor
);

thenAcceptBoth方法

thenAcceptBoth方法可以接收前两个任务的处理结果,但是第三个任务却不返回结果。

public <U> CompletionStage<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConusmer<? super T, ? super U> action
);

public <U> CompletionStage<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConusmer<? super T, ? super U> action
);

public <U> CompletionStage<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConusmer<? super T, ? super U> action,
    Executor executor
);

allOf等待所有的任务结束

allOf会等待所有任务结束,以合并所有任务。

异步任务的选择执行

对有两个异步任务的选择可以通过CompletionStage接口的applyToEither、runAfterEither和acceptEither三个方法来实现。

applyToEither方法

两个CompletionStage谁返回结果的速度快,applyToEither方法就用这个结果进行下一步操作。

// 和other任务返回较快的结果用于执行fn回调函数
public <U> CompletionStage<U> applyToEither(
    CompletionStage<? extends T> other, Function<? super T, U> fn
);

// 功能与上一个相同,但是不一定在同一个线程执行fn
public <U> CompletionStage<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn
);

// 功能与上一个相同,在指定线程执行fn
public <U> CompletionStage<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor
);

看个例子。


public class ApplyToEitherDemo {

    public static void main(String[] args) throws Exception{
        CompletableFuture<Integer> future1 =
                CompletableFuture.supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        Integer firstStep = 10 + 10;
                        System.out.println("firstStep outcome is " + firstStep);
                        return firstStep;
                    }
                });
        CompletableFuture<Integer> future2 =
                CompletableFuture.supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        Integer secondStep = 100 + 100;
                        System.out.println("secondStep outcome is " + secondStep);
                        return secondStep;
                    }
                });

        CompletableFuture<Integer> future3 =
                future1.applyToEither(future2,
                        new Function<Integer, Integer>() {
                            @Override
                            public Integer apply(Integer integer) {
                                return integer;
                            }
                        });
        Integer result = future3.get();
        System.out.println("outcome is " + result);
    }
}

runAfterEither方法

runAfterEither方法的功能为前面两个CompletionStage实例,任何一个执行完成都会执行第三部回调。

// 和other任务返回较快的结果用于执行fn回调函数
public CompletionStage<Void> runAfterEither(
    CompletionStage<?> other, Runnable fn
);

// 功能与上一个相同,但是不一定在同一个线程执行fn
public CompletionStage<Void> runAfterEitherAsync(
    CompletionStage<?> other, Runnable fn
);

// 功能与上一个相同,在指定线程执行fn
public CompletionStage<Void> runAfterEitherAsync(
    CompletionStage<?> other, Runnable fn, Executor executor
);

acceptEither方法

acceptEither用哪个最快的CompletionStage的结果作为下一步的输入,但是第三步没有输出。

// 和other任务返回较快的结果用于执行fn回调函数
public CompletionStage<Void> acceptEither(
    CompletionStage<?> other, Consumer<? super T> fn
);

public CompletionStage<Void> acceptEitherAsync(
    CompletionStage<?> other, Consumer<? super T> fn
);

public CompletionStage<Void> acceptEitherAsync(
    CompletionStage<?> other, Consumer<? super T> fn, Executor executor
);

CompletableFuture的综合案例

使用CompletableFuture实现喝茶案例。


public class CompletableFutureDemo2 {

    private static final int SLEEP_GAP = 3;

    public static void main(String[] args) {
        // 洗水壶->烧开水
        CompletableFuture<Boolean> hotJob =
                CompletableFuture.supplyAsync(() -> {
                    System.out.println("洗好水壶");
                    System.out.println("烧开水");
                    try {
                        Thread.sleep(SLEEP_GAP);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("水开了");
                    return true;
                });
        // 洗茶杯->那茶叶
        CompletableFuture<Boolean> washJob =
                CompletableFuture.supplyAsync(() -> {
                    System.out.println("洗茶杯");
                    try {
                        Thread.sleep(SLEEP_GAP);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("洗完了");

                    return true;
                });
        CompletableFuture<String> drinkJob =
                hotJob.thenCombine(washJob, (hotOK, washOk) -> {
                    if(hotOK && washOk) {
                        System.out.println("泡茶喝,茶喝完");
                        return "茶喝完了";
                    }
                    return "没有喝到差";
                });
        System.out.println(drinkJob.join());
    }
}
上一篇 下一篇

猜你喜欢

热点阅读