jdknew

CompletableFuture异步编排

2021-01-27  本文已影响0人  JBryan

在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。

1、创建异步对象

CompletableFuture提供了四个静态方法来创建一个异步操作。

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  1. runAsync的两个重载方法都是没有返回值的,而supplyAsync是有返回值的。
  2. 都可以指定线程池或者使用默认的线程池
1.1、runAsync无返回值
public class ThreadTest {

    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    @Test
    void test(){
        System.out.println("test()方法开始了.....");
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("异步线程开始执行.....");
            System.out.println("执行业务逻辑....");
            System.out.println("异步线程执行结束.....");
        }, executor);
        System.out.println("test()方法结束.....");
    }

}

执行结果:

test()方法开始了.....
test()方法结束.....
异步线程开始执行.....
执行业务逻辑....
异步线程执行结束.....
1.2、supplyAsync有返回值
    @Test
    void test() throws ExecutionException, InterruptedException {
        System.out.println("test()方法开始了.....");
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步线程开始执行.....");
            System.out.println("执行业务逻辑....");
            System.out.println("异步线程执行结束.....");
            return "我是返回值";
        }, executor);
        System.out.println("test()方法结束....."+stringCompletableFuture.get());
    }

执行结果:

test()方法开始了.....
异步线程开始执行.....
执行业务逻辑....
异步线程执行结束.....
test()方法结束.....我是返回值

stringCompletableFuture.get()会阻塞当前线程,直到异步任务执行结束,才会恢复当前线程执行。

2、完成时回调方法

当异步任务执行完成之后,回调自定义的方法,CompletableFuture提供了以下四种使用方式:

    public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(null, action);
    }

    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(asyncPool, action);
    }

    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action, Executor executor) {
        return uniWhenCompleteStage(screenExecutor(executor), action);
    }
    public CompletableFuture<T> exceptionally(
        Function<Throwable, ? extends T> fn) {
        return uniExceptionallyStage(fn);
    }

whenComplete方法和使用主线程执行任务,而whenCompleteAsync方法则会使用异步线程执行任务。

2.1、whenComplete同步执行回调

whenComplete()需要传一个BiConsumer接口的实现类,BiConsumer中accept(t,u)方法接收两个参数,第一个参数是future的返回结果,第二个参数是抛出的异常

    @Test
    void test() throws ExecutionException, InterruptedException {
        System.out.println("test()方法开始了.....");
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步线程开始执行.....");
            System.out.println("异步执行线程:"+Thread.currentThread().getName());
            System.out.println("执行业务逻辑....");
            System.out.println("异步线程执行结束.....");
            return "我是返回值";
        }, executor).whenComplete((result,exception)->{
            System.out.println("异步任务执行结束,返回结果:"+result);
            System.out.println("异步任务执行结束,异常:"+exception);
            System.out.println("whenComplete线程:"+Thread.currentThread().getName());
        }).exceptionally((t)->{
            System.out.println("异常:"+t);
            return "异常返回值";
        });
        System.out.println("test()方法结束.....");
    }

whenComplete方法虽然能获得异常信息,但是无法修改CompletableFuture的返回值,因此可以链式调用exceptionally方法,指定一个出现异常以后默认的返回值。执行结果:

test()方法开始了.....
异步线程开始执行.....
异步执行线程:pool-1-thread-1
执行业务逻辑....
异步线程执行结束.....
异步任务执行结束,返回结果:我是返回值
异步任务执行结束,异常:null
whenComplete线程:main
test()方法结束.....
2.2、whenCompleteAsync异步执行回调

whenCompleteAsync使用异步线程来执行回调方法

    @Test
    void test() throws ExecutionException, InterruptedException {
        System.out.println("test()方法开始了.....");
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步线程开始执行.....");
            System.out.println("异步执行线程:"+Thread.currentThread().getName());
            System.out.println("执行业务逻辑....");
            System.out.println("异步线程执行结束.....");
            return "我是返回值";
        }, executor).whenCompleteAsync((result,exception)->{
            System.out.println("异步任务执行结束,返回结果:"+result);
            System.out.println("异步任务执行结束,异常:"+exception);
            System.out.println("whenComplete线程:"+Thread.currentThread().getName());
        },executor).exceptionally((t)->{
            System.out.println("异常:"+t);
            return "异常返回值";
        });
        System.out.println("test()方法结束.....");
    }

执行结果:

test()方法开始了.....
异步线程开始执行.....
异步执行线程:pool-1-thread-1
执行业务逻辑....
异步线程执行结束.....
异步任务执行结束,返回结果:我是返回值
异步任务执行结束,异常:null
whenComplete线程:pool-1-thread-2
test()方法结束.....

3、handle

handle是方法执行结束后的处理,无论是成功结束,还是失败结束。complete系列方法只能感知异常,若要返回异常信息,只能在后面加exceptionally()方法。

@Test
    void test() throws ExecutionException, InterruptedException {
        System.out.println("test()方法开始了.....");
        CompletableFuture.supplyAsync(() -> {
            System.out.println("异步线程开始执行.....");
            System.out.println("异步执行线程:"+Thread.currentThread().getName());
            System.out.println("执行业务逻辑....");
            System.out.println("异步线程执行结束.....");
            return "我是返回值";
        }, executor).handle((result,exception)->{
            System.out.println("handle执行线程:"+Thread.currentThread().getName());
            if(result != null){
                return result;
            }
            if(exception != null){
                return "handle exception 返回";
            }
            return "handle 返回";
        });


        System.out.println("test()方法结束.....");
    }

执行结果:

test()方法开始了.....
异步线程开始执行.....
异步执行线程:pool-1-thread-1
执行业务逻辑....
异步线程执行结束.....
handle执行线程:main
test()方法结束.....

handle()方法也是在主线程中执行的。

4、线程串行化

假设B线程需要A线程执行完成之后才能继续执行,则需要线程串行化。

  1. thenRun系列方法,没有返回值,不需要依赖A返回的结果。
  2. thenAccept系列方法,没有返回值,需要依赖A返回的结果。
  3. thenApply系列方法,有返回值,依赖A返回的结果。
  4. 不带Async的,是在主线程中执行,带Async的,异步执行
    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) 
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) 
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) 
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor) 
    
    public CompletableFuture<Void> thenRun(Runnable action)
    public CompletableFuture<Void> thenRunAsync(Runnable action) 
    public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) 
4.1、thenRun系列方法
     @Test
    void testThen(){
        System.out.println("test()开始.....");
        CompletableFuture.supplyAsync(() -> {
            System.out.println("异步线程开始执行.....");
            System.out.println("异步执行线程:"+Thread.currentThread().getName());
            System.out.println("执行业务逻辑....");
            System.out.println("异步线程执行结束.....");
            return "我是返回值";
        }, executor).thenRun(()->{
            System.out.println("thenRun执行线程:"+Thread.currentThread().getName());
        });
        System.out.println("test()结束.....");
    }

执行结果:

test()开始.....
异步线程开始执行.....
异步执行线程:pool-1-thread-1
执行业务逻辑....
异步线程执行结束.....
thenRun执行线程:main
test()结束.....

将thenRun()换成thenRunAsync(),则会异步执行。

    @Test
    void testThen(){
        System.out.println("test()开始.....");
        CompletableFuture.supplyAsync(() -> {
            System.out.println("异步线程开始执行.....");
            System.out.println("异步执行线程:"+Thread.currentThread().getName());
            System.out.println("执行业务逻辑....");
            System.out.println("异步线程执行结束.....");
            return "我是返回值";
        }, executor).thenRunAsync(()->{
            System.out.println("thenRun执行线程:"+Thread.currentThread().getName());
        });
        System.out.println("test()结束.....");
    }

执行结果:

test()开始.....
异步线程开始执行.....
异步执行线程:pool-1-thread-1
执行业务逻辑....
异步线程执行结束.....
test()结束.....
thenRun执行线程:ForkJoinPool.commonPool-worker-1
4.2、thenAccept系列方法
    @Test
    void testThen(){
        System.out.println("test()开始.....");
        CompletableFuture.supplyAsync(() -> {
            System.out.println("异步线程开始执行.....");
            System.out.println("异步执行线程:"+Thread.currentThread().getName());
            System.out.println("执行业务逻辑....");
            System.out.println("异步线程执行结束.....");
            return "我是返回值";
        }, executor).thenAcceptAsync((result)->{
            System.out.println("thenAcceptAsync执行线程:"+Thread.currentThread().getName());
            System.out.println("thenAcceptAsync()接收到返回值:"+result);
        });
        System.out.println("test()结束.....");
    }

执行结果:

test()开始.....
异步线程开始执行.....
异步执行线程:pool-1-thread-1
执行业务逻辑....
异步线程执行结束.....
test()结束.....
thenAcceptAsync执行线程:ForkJoinPool.commonPool-worker-1
thenAcceptAsync()接收到返回值:我是返回值
4.3、thenApply系列方法
    @Test
    void testThen() throws ExecutionException, InterruptedException {
        System.out.println("test()开始.....");
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步线程开始执行.....");
            System.out.println("异步执行线程:" + Thread.currentThread().getName());
            System.out.println("执行业务逻辑....");
            System.out.println("异步线程执行结束.....");
            return "我是返回值";
        }, executor).thenApplyAsync((result) -> {
            System.out.println("thenApplyAsync执行线程:" + Thread.currentThread().getName());
            return "thenApplyAsync的返回结果";
        });
        System.out.println("test()结束....." + future.get() );
    }

执行结果:

test()开始.....
异步线程开始执行.....
异步执行线程:pool-1-thread-1
执行业务逻辑....
异步线程执行结束.....
thenApplyAsync执行线程:ForkJoinPool.commonPool-worker-1
test()结束.....thenApplyAsync的返回结果

5、多任务组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOf,等待所有任务都执行完成。

    @Test
    void testAll() throws ExecutionException, InterruptedException {
        System.out.println("test()开始.....");
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future1执行结束");
            return "future1结果";
        }, executor);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future2执行结束");
            return "future2结果";
        }, executor);
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future3执行结束"); 
            return "future3结果";
        }, executor);
        CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
        allOf.get();
        System.out.println("test()结束....." );
    }

执行结果:

test()开始.....
future1执行结束
future2执行结束
future3执行结束
test()结束.....

anyOf,等待任何一个任务执行完成。

    @Test
    void testAll() throws ExecutionException, InterruptedException {
        System.out.println("test()开始.....");
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future1执行结束");
            return "future1结果";
        }, executor);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future2执行结束");
            return "future2结果";
        }, executor);
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future3执行结束");
            return "future3结果";
        }, executor);
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
        System.out.println("test()结束....."+anyOf.get());
    }

执行结果:

test()开始.....
future1执行结束
future2执行结束
test()结束.....future1结果
future3执行结束
上一篇下一篇

猜你喜欢

热点阅读