异步&线程池

三、CompletableFuture

2020-06-20  本文已影响0人  紫荆秋雪_文

一、场景

当在比较复杂的情况下,在需要远程调用,必然需要花费更多的时间

二、创建异步对象

一、runAsync创建异步对象

    // 定义线程池
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        System.out.println("main。。。。。start。。。。。");

        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        }, executor);

        System.out.println("main。。。。。end。。。。。");
    }

二、supplyAsync创建异步对象

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main。。。。。start。。。。。");

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, executor);

        System.out.println("main。。。。。end。。。。。" + future.get());
    }

三、计算完成时回调方法

一、 whenComplete 获取结果

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main。。。。。start。。。。。");

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).whenCompleteAsync((res, ex) -> {
            System.out.println("异步任务完成。。。。。结果是:" + res);
        }, executor);

        System.out.println("main。。。。。end。。。。。" + future.get());
    }
main。。。。。start。。。。。
当前线程:pool-1-thread-1
运行结果:5
异步任务完成。。。。。结果是:5
main。。。。。end。。。。。5
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main。。。。。start。。。。。");

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 10 / 0;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).whenCompleteAsync((res, ex) -> {
            System.out.println("异步任务完成。。。。。结果是:" + res);
            System.out.println("异步任务完成。。。。。异常是:" + ex);
        }, executor);

        System.out.println("main。。。。。end。。。。。" + future.get());
    }
main。。。。。start。。。。。
当前线程:pool-1-thread-1
异步任务完成。。。。。结果是:null
异步任务完成。。。。。异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

二、 exceptionally感知到异常时,返回默认值

    public static void main(String[] args) throws Exception {
        System.out.println("main。。。。。start。。。。。");

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 10 / 0;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).whenCompleteAsync((res, ex) -> {
            System.out.println("异步任务完成。。。。。结果是:" + res);
            System.out.println("异步任务完成。。。。。异常是:" + ex);
        }, executor).exceptionally(ex -> 100);

        System.out.println("main。。。。。end。。。。。" + future.get());
    }
main。。。。。start。。。。。
当前线程:pool-1-thread-1
异步任务完成。。。。。结果是:null
异步任务完成。。。。。异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
main。。。。。end。。。。。100

三、handle方法

    public static void main(String[] args) throws Exception {
        System.out.println("main。。。。。start。。。。。");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).handleAsync((res, ex) -> {
            if (ex != null) {
                return res * 10;
            }
            return 100;
        }, executor);

        System.out.println("main。。。。。end。。。。。" + future.get());
    }
main。。。。。start。。。。。
当前线程:pool-1-thread-1
运行结果:5
main。。。。。end。。。。。100

四、线程串行化方法

1、thenRunAsync


    public static void main(String[] args) throws Exception {
        System.out.println("main。。。。。start。。。。。");
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).thenRunAsync(() -> {
            System.out.println("任务2启动了。。。。。");
        }, executor);

        System.out.println("main。。。。。end。。。。。" + future.get());
    }
main。。。。。start。。。。。
当前线程:pool-1-thread-1
运行结果:5
任务2启动了。。。。。
main。。。。。end。。。。。null

2、thenAcceptAsync

    public static void main(String[] args) throws Exception {
        System.out.println("main。。。。。start。。。。。");
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).thenAcceptAsync((res) -> {
            System.out.println("任务2启动了。。。。。,上一次任务结果:" + res);
        }, executor);

        System.out.println("main。。。。。end。。。。。" + future.get());
    }
main。。。。。start。。。。。
当前线程:pool-1-thread-1
运行结果:5
任务2启动了。。。。。,上一次任务结果:5
main。。。。。end。。。。。null

3、thenApplyAsync

    public static void main(String[] args) throws Exception {
        System.out.println("main。。。。。start。。。。。");
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).thenApplyAsync((res) -> {
            System.out.println("任务2启动了。。。。。,上一次任务结果:" + res);
            return "任务2返回结果结果:" + res;
        }, executor);

        System.out.println("main。。。。。end。。。。。" + future.get());
    }
main。。。。。start。。。。。
当前线程:pool-1-thread-1
运行结果:5
任务2启动了。。。。。,上一次任务结果:5
main。。。。。end。。。。。任务2返回结果结果:5

五、两任务组合

runAfterBothAsync

    public static void main(String[] args) throws Exception {
        System.out.println("main。。。。。start。。。。。");

        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程。。。" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("任务1线程结束。。。");
            return i;
        }, executor);

        CompletableFuture<String> future02= CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程。。。" + Thread.currentThread().getName());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2线程结束。。。");
            return "Raven";
        }, executor);

        future01.runAfterBothAsync(future02, () -> {
            System.out.println("任务3线程开始。。。");
        }, executor);

        System.out.println("main。。。。。end。。。。。");
    }
main。。。。。start。。。。。
任务1线程。。。pool-1-thread-1
任务1线程结束。。。
任务2线程。。。pool-1-thread-2
main。。。。。end。。。。。
任务2线程结束。。。
任务3线程开始。。。

thenAcceptBothAsync

    public static void main(String[] args) throws Exception {
        System.out.println("main。。。。。start。。。。。");

        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程。。。" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("任务1线程结束。。。");
            return i;
        }, executor);

        CompletableFuture<String> future02= CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程。。。" + Thread.currentThread().getName());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2线程结束。。。");
            return "Raven";
        }, executor);

        future01.thenAcceptBothAsync(future02, (f1, f2) -> {
            System.out.println("任务3线程开始。。。" + f1 + " ---- " + f2);
        }, executor);

        System.out.println("main。。。。。end。。。。。");
    }
main。。。。。start。。。。。
任务1线程。。。pool-1-thread-1
任务1线程结束。。。
任务2线程。。。pool-1-thread-2
main。。。。。end。。。。。
任务2线程结束。。。
任务3线程开始。。。5 ---- Raven

thenCombineAsync

    public static void main(String[] args) throws Exception {
        System.out.println("main。。。。。start。。。。。");

        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程。。。" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("任务1线程结束。。。");
            return i;
        }, executor);

        CompletableFuture<String> future02= CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程。。。" + Thread.currentThread().getName());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2线程结束。。。");
            return "Raven";
        }, executor);

        CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
            System.out.println("任务3线程开始。。。" + f1 + " ---- " + f2);
            return f1 + ":----" + f2 + "--->Haha";
        }, executor);

        System.out.println("main。。。。。end。。。。。" + future.get());
    }
main。。。。。start。。。。。
任务1线程。。。pool-1-thread-1
任务1线程结束。。。
任务2线程。。。pool-1-thread-2
任务2线程结束。。。
任务3线程开始。。。5 ---- Raven
main。。。。。end。。。。。5:----Raven--->Haha

六、两任务组合-任何一个完成

runAfterEitherAsync

    public static void main(String[] args) throws Exception {
        System.out.println("main。。。。。start。。。。。");

        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程。。。" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("任务1线程结束。。。");
            return i;
        }, executor);

        CompletableFuture<String> future02= CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程。。。" + Thread.currentThread().getName());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2线程结束。。。");
            return "Raven";
        }, executor);

        CompletableFuture<Void> future = future01.runAfterEitherAsync(future02, () -> {
            System.out.println("任务3线程开始。。。");
        }, executor);


        System.out.println("main。。。。。end。。。。。" + future.get());
    }

main。。。。。start。。。。。
任务1线程。。。pool-1-thread-1
任务1线程结束。。。
任务2线程。。。pool-1-thread-2
任务3线程开始。。。
main。。。。。end。。。。。null
任务2线程结束。。。

acceptEitherAsync

    public static void main(String[] args) throws Exception {
        System.out.println("main。。。。。start。。。。。");

        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程。。。" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("任务1线程结束。。。");
            return i;
        }, executor);

        CompletableFuture<Integer> future02= CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程。。。" + Thread.currentThread().getName());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2线程结束。。。");
            return 22;
        }, executor);

        CompletableFuture<Void> future = future01.acceptEitherAsync(future02, (res) -> {
            System.out.println("任务3线程开始。。。" + res);
        }, executor);


        System.out.println("main。。。。。end。。。。。" + future.get());
    }
main。。。。。start。。。。。
任务1线程。。。pool-1-thread-1
任务1线程结束。。。
任务2线程。。。pool-1-thread-2
任务3线程开始。。。5
main。。。。。end。。。。。null
任务2线程结束。。。

applyToEitherAsync

    public static void main(String[] args) throws Exception {
        System.out.println("main。。。。。start。。。。。");

        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程。。。" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("任务1线程结束。。。");
            return i;
        }, executor);

        CompletableFuture<Integer> future02= CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程。。。" + Thread.currentThread().getName());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2线程结束。。。");
            return 22;
        }, executor);

        CompletableFuture<String> future = future01.applyToEitherAsync(future02, (res) -> {
            System.out.println("任务3线程开始。。。" + res);
            return "任务3线程开始。。。" + res;
        }, executor);


        System.out.println("main。。。。。end。。。。。" + future.get());
    }
main。。。。。start。。。。。
任务1线程。。。pool-1-thread-1
任务1线程结束。。。
任务2线程。。。pool-1-thread-2
任务3线程开始。。。5
main。。。。。end。。。。。任务3线程开始。。。5
任务2线程结束。。。

七、多任务组合

allOf:等待所有任务完成

   public static void main(String[] args) throws Exception {
        System.out.println("main。。。。。start。。。。。");

        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程。。。" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("任务1线程结束。。。");
            return i;
        }, executor);

        CompletableFuture<Integer> future02= CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程。。。" + Thread.currentThread().getName());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2线程结束。。。");
            return 22;
        }, executor);

        CompletableFuture<Integer> future03= CompletableFuture.supplyAsync(() -> {
            System.out.println("任务3线程。。。" + Thread.currentThread().getName());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务3线程结束。。。");
            return 23;
        }, executor);

        CompletableFuture<Void> future = CompletableFuture.allOf(future01, future02, future03);
        // 等待所有结果完成
        future.get();
        System.out.println("main。。。。。end。。。。。");
    }
main。。。。。start。。。。。
任务1线程。。。pool-1-thread-1
任务1线程结束。。。
任务2线程。。。pool-1-thread-2
任务3线程。。。pool-1-thread-3
任务2线程结束。。。
任务3线程结束。。。
main。。。。。end。。。。。

anyOf:只要有一个任务完成

    public static void main(String[] args) throws Exception {
        System.out.println("main。。。。。start。。。。。");

        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程。。。" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("任务1线程结束。。。");
            return i;
        }, executor);

        CompletableFuture<Integer> future02= CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程。。。" + Thread.currentThread().getName());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2线程结束。。。");
            return 22;
        }, executor);

        CompletableFuture<Integer> future03= CompletableFuture.supplyAsync(() -> {
            System.out.println("任务3线程。。。" + Thread.currentThread().getName());
            System.out.println("任务3线程结束。。。");
            return 23;
        }, executor);

        CompletableFuture<Object> future = CompletableFuture.anyOf(future01, future02, future03);
        // 等待所有结果完成
        Object obj = future.get();
        System.out.println("main。。。。。end。。。。。" + obj);
    }
main。。。。。start。。。。。
任务1线程。。。pool-1-thread-1
任务1线程结束。。。
任务2线程。。。pool-1-thread-2
main。。。。。end。。。。。5
任务3线程。。。pool-1-thread-3
任务3线程结束。。。
任务2线程结束。。。

上一篇下一篇

猜你喜欢

热点阅读