Java技术码农的世界程序员

Java CompletableFuture

2017-09-27  本文已影响793人  一字马胡

作者: 一字马胡
转载标志 【2017-11-03】

更新日志

日期 更新内容 备注
2017-11-03 添加转载标志 持续更新

一、java中创建线程的方法

在java中有三种方式创建一个线程。

1、继承Thread,重写run方法


class MyThread extends Thread {
   
    @Override
    public void run() {
        //doSomething....
     }

}

2、实现Runnable接口的run方法

class MyRunnable implements Runnable {

    @Override
    public void run() {
         //doSomething
     }

}

3、使用FutureTask,传递一个Callable参数

下面是FutureTask的类关系图:

FutureTask类继承关系图

FutureTask实现了Future接口、Runnable接口。下面是FutureTask的两个构造函数:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

从两个构造函数可以看出来,FutureTask需要我们传递一个Callable类型的参数或者一个Runnable对象。FutureTask的优势是相比于Runnable,可以有返回值,而Runnable返回值为void。下面的代码展示了FutureTask的使用方法。

public class FutureTaskDemo {
    
    private static final Callable<Void> voidCallable = () -> {

        System.out.println("void callable.");

        return null;
    };

    private static final Callable<String> stringCallable = () -> "string callable.";

    private static FutureTask<Void> voidFutureTask = new FutureTask<Void>(voidCallable);
    private static FutureTask<String> stringFutureTask = new FutureTask<String>(stringCallable);

    public static void main(String ... args) {
        
        new Thread(voidFutureTask).start();
        new Thread(stringFutureTask).start();

        try {
            String result = stringFutureTask.get();
            System.out.println("->get result :" + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

二、Future

在了解CompletableFuture之前,需要了解Future,Future代表一个将来某个时刻会完成的内容,Future提供了一些方法可以获取有用的信息,比如获取我们提交的任务是否已经完成或者被取消,如果完成了我们可以使用get获取内容。下面是Future接口的内容:


    boolean isCancelled(); //是否已经被取消

    boolean isDone(); //是否完成了

    V get() throws InterruptedException, ExecutionException; //阻塞获取结果

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException; //可以设置最多等待的时间,会在时间允许方位内正常返回或者抛出超时异常

Future的优势是不需要阻塞线程等待结果,我们可以让线程做一些其他的事情,然后在合适的时候来获取结果,但是Future的缺陷也是很明显的,比如我们在需要得到结果的时候必须要阻塞线程等待结果,所以在java8中加入了CompletableFuture这个类,这个类是很强大的,下文中会主要介绍CompletableFuture这个类。

三、CompletableFuture

首先看一下CompletableFuture的类关系图:

CompletableFuture的类关系图

从类图中可以看出,CompletableFuture更像是在增强Future,事实上也是这样,CompletableFuture在实现了Future的同时还实现了一个接口:CompletionStage,这个类就是所有CompletableFuture与Future不一样的地方,CompletionStage代表的是阶段,代表某个阶段完成之后要做什么,都在这个接口中定义好了,下面需要好好的研究一下这个接口,事实上,本文主要的内容便是了解CompletionStage,以及学习使用CompletableFuture(实际上是CompletionStage接口的功能),关于CompletionStage的实现方面的内容将不在本文的讨论范围内。

首先一个问题是:如何生成一个CompletableFuture。下面几个静态方法都可以产生CompletableFuture:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor);

public static CompletableFuture<Void> runAsync(Runnable runnable);

public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor)

下面的代码展示了如何产生CompletableFuture的方法:

/**
 * Created by hujian06 on 2017/9/27.
 */
public class CompletableFutureDemo {

    enum TYPE {
        SUPPLY_ASYNC_0,
        SUPPLY_ASYNC_1,
        RUN_ASYNC_0,
        RUN_ASYNC_1
    }

    /**
     * CompletableFuture generator
     * @param type type
     * @param supplier supplier
     * @param runnable runnable
     * @param executor executor
     * @param <U> type
     * @return the CompletableFuture
     */
    private static synchronized  <U> CompletableFuture<U> createCompletableFuture(TYPE type, Supplier<U> supplier, Runnable runnable,
                                                                    Executor executor) {
        CompletableFuture<U> completableFuture = null;
        switch (type) {
            case SUPPLY_ASYNC_0:
                completableFuture = CompletableFuture.supplyAsync(supplier);
            case SUPPLY_ASYNC_1:
                if (supplier == null) {
                    System.out.println("supplier is null:SUPPLY_ASYNC_1");
                    break;
                }

                if (executor == null) {
                    System.out.println("executor is null:SUPPLY_ASYNC_1");
                    executor = Executors.newSingleThreadExecutor();
                }
                completableFuture = CompletableFuture.supplyAsync(supplier, executor);
                break;
            case RUN_ASYNC_0:
                CompletableFuture.runAsync(runnable);
                break;
            case RUN_ASYNC_1:
                if (runnable == null) {
                    System.out.println("runnable is null:RUN_ASYNC_1");
                    break;
                }

                if (executor == null) {
                    System.out.println("executor is null:RUN_ASYNC_1");
                    break;
                }
                CompletableFuture.runAsync(runnable, executor);
                break;
        }

        return completableFuture;
    }

    private static Supplier<String> stringSupplier = () -> "supply of string";

    private static Runnable runnable = () -> System.out.println("void runnable");

    private static Executor executor = Executors.newFixedThreadPool(4);

    public static void main(String ... args) {


        CompletableFuture<Void> voidCompletableFutureV1
                = createCompletableFuture(TYPE.RUN_ASYNC_1, null, runnable, executor);
        CompletableFuture<Void> voidCompletableFuture
                = createCompletableFuture(TYPE.RUN_ASYNC_0, null, runnable, null);

        CompletableFuture<String> stringCompletableFutureV1
                = createCompletableFuture(TYPE.SUPPLY_ASYNC_1, stringSupplier, null, executor);
        CompletableFuture<String> stringCompletableFuture
                = createCompletableFuture(TYPE.SUPPLY_ASYNC_0, stringSupplier, null, null);

        String result0 = stringCompletableFuture.getNow("Empty V0");
        String result1 = stringCompletableFutureV1.getNow("Empty V1");

        System.out.println("Result:" + result0 + " , " + result1);
    }

}

CompletionStage接口提供的方法

上图中详细列出了CompletionStage接口提供的方法,大体上分为几类,每个类都会有三个类似的方法,一个是使用当前线程执行,一个使用共享的线程池来执行,一个使用自定义的(参数传递)Executor来执行,类似下面的代码所示。


thenXXX() :使用当前线程继续执行接下来的内容
thenXXXAsync() :使用共享线程池执行接下来的内容
thenXXXAsync(with Executor) :使用参数中提供的Executor来执行接下来的内容

下面分不同的类别列举出CompletionStage提供的接口,如果你想了解CompletableFuture这个类的所以情况,那么你应该阅读源码,或者研读文档

1、Apply类

你可以通过此类操作将CompletableFuture的结果转换成例外一种结果,下面是一个将String类型的结果转换为Integer类型的操作示例:

        stringCompletableFuture.thenApply(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) {
                return Integer.parseInt(s);
            }
        });

2、Accept类

你只是想消耗CompletableFuture返回的结果,那么就可以使用该类型的操作,下面是使用示例,该示例等待CompletableFuture的结果,然后使用相同的线程打印在标准输出上:

        stringCompletableFuture.thenAccept(new Consumer<String>() {
            @Override
            public void accept(String s) {
                System.out.println("->" + s);
            }
        });

你需要关注的是,一点使用了Accept类型的操作,CompletableFuture的结果将被消耗。

3、Run类

如果你想将一个Runnable安排在一个CompletableFuture完成之后,可以使用该操作,使用该操作之后,一旦CompletableFuture完成,就会执行指定的Runnable任务,下面是使用示例:

        stringCompletableFuture.thenRun(new Runnable() {
            @Override
            public void run() {
                System.out.println("Another job here");
            }
        });

4、Combine类

Combine如同他的含义一样用于combine两个CompletableFuture的结果,你可以将两个CompletableFuture的结果进行转换然后返回,比如下面的代码展示了如何等待另外一个CompletableFuture,然后将当前CompletableFuture的结果和等待的CompletableFuture的结果进行一些组合然后返回:

      stringCompletableFuture.thenCombine(stringCompletableFutureV1, new 
           BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) {
                return  "Combine:" + s + " : " + s2;
            }
        });

5、AcceptBoth类

如果你想等待两个CompletableFuture结束,然后再消费他们,那你可以使用该操作,需要注意的是,你指定的消费者会在两个CompletableFuture都完成之后再执行,因为你指定的消费者需要等待两个CompletableFuture完成的结果作为参数来运行,经过思考,只有当两个CompletableFuture都结束之后,该消费者才能正常运行,下面展示了简单使用方法:

        stringCompletableFuture.thenAcceptBoth(stringCompletableFutureV1, new BiConsumer<String, String>() {
            @Override
            public void accept(String s, String s2) {
                System.out.println("Consume:" + s + ":" + s2);
            }
        });

6、runAfterBoth类

相比于AcceptBoth,runAfterBoth虽然会等待两个CompletableFuture都结束再运行,但是不再依赖两个CompletableFuture的运行结果,所以你需要选择合适的类型里为你的业务工作:

        stringCompletableFuture.runAfterBoth(stringCompletableFutureV1, new Runnable() {
            @Override
            public void run() {
                System.out.println("do some jobs here");
            }
        });

7、applyToEither类

使用该类型可以达到的效果是:哪个CompletableFuture先结束,就会被消费者消费,所以有竞争关系,先运行完成就可以被消费者消费,第二个CompletableFuture的结果将不会被消费,等待一个CompletableFuture运行完成之后,你可以对结果进行转换输出,下面是一个简单的示例:

        stringCompletableFuture.applyToEither(stringCompletableFutureV1, new Function<String, Integer>() {
            @Override
            public Integer apply(String s) {
                return Integer.parseInt(s);
            }
        });

8、acceptEither类

这个类型的操作也只会等待一个CompletableFuture结束,然后就会被消费者消费,而且该类型的操作仅仅能进行对CompletableFuture结果的消费,无法转换输出,下面是示例:

        stringCompletableFuture.acceptEither(stringCompletableFutureV1, new Consumer<String>() {
            @Override
            public void accept(String s) {
                System.out.println("->" + s);
            }
        });

9、runAfterEither类

等待一个CompletableFuture结束,然后运行指定的Runnable,下面是示例:

        stringCompletableFuture.runAfterEither(stringCompletableFutureV1, new Runnable() {
            @Override
            public void run() {
                System.out.println("other job");
            }
        });

10、thenCompose类

使用该类型的操作,你可以使用当前CompletableFuture的结果来产生一个全新的CompletableFuture,then的含义便是等待当前的CompletableFuture运行结束,然后再运行我们指定的Supplier,下面是一个简单的实例,展示了如何依赖当前的CompletableFuture运行结果来组装出一个新的CompletableFuture:

        stringCompletableFuture.thenCompose(new Function<String, CompletionStage<String>>() {
            @Override
            public CompletionStage<String> apply(String s) {
                return createCompletableFuture(TYPE.SUPPLY_ASYNC_0, new Supplier<String>() {
                    @Override
                    public String get() {
                        return s + ":" + s;
                    }
                }, null, null);
            }
        });

11、exceptionally类

这是单独的一个类型,使用这个方法可以对当前的CompletableFuture抛出的异常进行一些处理,类似于Fallback,你可以在这里返回一些兜底的数据来屏蔽当前CompletableFuture运行抛出的异常,当然你也可以在该方法里面直接抛出当前CompletableFuture抛出的异常,下面是一个简单的示例:

        stringCompletableFuture.exceptionally(new Function<Throwable, String>() {
            @Override
            public String apply(Throwable throwable) {
                return "fallback";
            }
        });

12、whenComplete类

当前CompletableFuture将会把运行运行结果作为参数传递给指定的function,你可以通过参数来判断当前CompletableFuture是否成功完成还是抛出了异常,然后根据你的逻辑消费异常或者正常的结果,下面是一个简单的示例:

        stringCompletableFuture.whenComplete(new BiConsumer<String, Throwable>() {
            @Override
            public void accept(String s, Throwable throwable) {
                if (throwable == null) {
                    System.out.println(s);
                } else {
                    throwable.printStackTrace();
                }
            }
        });

13、handle类

比起whenComplete类,该类型不仅可以获取到当前CompletableFuture是成功了还是抛出了异常之外,还可以对结果进行转换(如果当前CompletableFuture成功完成了),下面是简单示例:


        stringCompletableFuture.handle(new BiFunction<String, Throwable, Integer>() {
            @Override
            public Integer apply(String s, Throwable throwable) {
                if (throwable != null) {
                    throwable.printStackTrace();
                } else {
                    return Integer.parseInt(s);
                }
                
                return -1;
            }
        });

四、结尾

在CompletableFuture类中,我们在上面的分析中都是要么组合两个CompletableFuture,要么就是对一个CompletableFuture的操作,下面介绍的两个静态方法可以组合多个CompletableFuture,然后执行执行操作。


    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }

    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
        return orTree(cfs, 0, cfs.length - 1);
    }

首先是allOff方法,你可以传递任意多的CompletableFuture给allOff,然后可以将其结果看成是单个CompletableFuture一样继续可以施加操作,效果是allOff的语义决定了其表现行为,allOff方法将等待所有的CompletableFuture都结束之后才会继续之后的操作,下面是简单示例:

        CompletableFuture
                .allOf(stringCompletableFuture, stringCompletableFutureV1, voidCompletableFuture, voidCompletableFutureV1)
                .thenCombine(stringCompletableFuture, new BiFunction<Void, String, String>() {
                    @Override
                    public String apply(Void aVoid, String s) {
                        return s;
                    }
                })
                .thenAccept(new Consumer<String>() {
                    @Override
                    public void accept(String s) {
                        System.out.println(s);
                    }
                });

其次是anyOf方法,这个方法和allOf方法一样,但是anyOf方法发现有一个CompletableFuture完成之后便立刻执行之后的操作,使用方法和allOf事一样的,知识语义不一样,需要根据不同的场景选择使用allOf或者anyOf。

下面是一个可以等待所有CompletableFuture都结束的工具类,这个工具类的特点是可以设定等待时间,然后在超时的时候会检测出到底是哪个CompletableFuture超时,本文到此也就结束了:

 /**
     * This method will detect which services are timed out
     * @param className the caller class name
     * @param timeout timeout to return
     * @param futureMap the map : key is service desc, value is the future.
     */
    public static void awaitFuturesCompletedOrTimeoutWithReportTimeoutService
    (String className, int timeout, Map<String, CompletableFuture<?>> futureMap)
            throws ExecutionException, InterruptedException {
        if (futureMap == null || futureMap.isEmpty()) {
            return;
        }

        CompletableFuture<?>[] futures = new CompletableFuture[futureMap.size()];

        futureMap.values().toArray(futures);

        try {
            CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            //detect which services are timed out
            futureMap.forEach((service, cf) -> {
                try {
                    //do not assign timeout = 0 NANOSECONDS since the get(timeout, unit) will throw
                    //a TimeoutException exception immediately,but we want to try to get the result
                    //with a small timeout value firstly, it's worth taking 1/100000 MILLISECONDS to
                    //detect which services are timed out.
                    cf.get(1, TimeUnit.NANOSECONDS);
                } catch (InterruptedException | ExecutionException exception) {
                    LOGGER.warn("Failed to detect which services are timed out for class:{}, at detect service: {}, "
                            + "error:", className, service, exception);
                } catch (TimeoutException timeoutException) {

                    LOGGER.warn("Timed out for waiting futures completed:className={}, timeoutMillis={}, Service:{}",
                            className, timeout, service);
                }
            });
        }
    }

上一篇下一篇

猜你喜欢

热点阅读