Java CompletableFuture
作者: 一字马胡
转载标志 【2017-11-03】
更新日志
日期 | 更新内容 | 备注 |
---|---|---|
2017-11-03 | 添加转载标志 | 持续更新 |
一、java中创建线程的方法
在java中有三种方式创建一个线程。
1、继承Thread,重写run方法
class MyThread extends Thread {
@Override
public void run() {
//doSomething....
}
}
2、实现Runnable接口的run方法
class MyRunnable implements Runnable {
@Override
public void run() {
//doSomething
}
}
3、使用FutureTask,传递一个Callable参数
下面是FutureTask的类关系图:
FutureTask类继承关系图FutureTask实现了Future接口、Runnable接口。下面是FutureTask的两个构造函数:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
从两个构造函数可以看出来,FutureTask需要我们传递一个Callable类型的参数或者一个Runnable对象。FutureTask的优势是相比于Runnable,可以有返回值,而Runnable返回值为void。下面的代码展示了FutureTask的使用方法。
public class FutureTaskDemo {
private static final Callable<Void> voidCallable = () -> {
System.out.println("void callable.");
return null;
};
private static final Callable<String> stringCallable = () -> "string callable.";
private static FutureTask<Void> voidFutureTask = new FutureTask<Void>(voidCallable);
private static FutureTask<String> stringFutureTask = new FutureTask<String>(stringCallable);
public static void main(String ... args) {
new Thread(voidFutureTask).start();
new Thread(stringFutureTask).start();
try {
String result = stringFutureTask.get();
System.out.println("->get result :" + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
二、Future
在了解CompletableFuture之前,需要了解Future,Future代表一个将来某个时刻会完成的内容,Future提供了一些方法可以获取有用的信息,比如获取我们提交的任务是否已经完成或者被取消,如果完成了我们可以使用get获取内容。下面是Future接口的内容:
boolean isCancelled(); //是否已经被取消
boolean isDone(); //是否完成了
V get() throws InterruptedException, ExecutionException; //阻塞获取结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException; //可以设置最多等待的时间,会在时间允许方位内正常返回或者抛出超时异常
Future的优势是不需要阻塞线程等待结果,我们可以让线程做一些其他的事情,然后在合适的时候来获取结果,但是Future的缺陷也是很明显的,比如我们在需要得到结果的时候必须要阻塞线程等待结果,所以在java8中加入了CompletableFuture这个类,这个类是很强大的,下文中会主要介绍CompletableFuture这个类。
三、CompletableFuture
首先看一下CompletableFuture的类关系图:
CompletableFuture的类关系图从类图中可以看出,CompletableFuture更像是在增强Future,事实上也是这样,CompletableFuture在实现了Future的同时还实现了一个接口:CompletionStage,这个类就是所有CompletableFuture与Future不一样的地方,CompletionStage代表的是阶段,代表某个阶段完成之后要做什么,都在这个接口中定义好了,下面需要好好的研究一下这个接口,事实上,本文主要的内容便是了解CompletionStage,以及学习使用CompletableFuture(实际上是CompletionStage接口的功能),关于CompletionStage的实现方面的内容将不在本文的讨论范围内。
首先一个问题是:如何生成一个CompletableFuture。下面几个静态方法都可以产生CompletableFuture:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor);
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor)
下面的代码展示了如何产生CompletableFuture的方法:
/**
* Created by hujian06 on 2017/9/27.
*/
public class CompletableFutureDemo {
enum TYPE {
SUPPLY_ASYNC_0,
SUPPLY_ASYNC_1,
RUN_ASYNC_0,
RUN_ASYNC_1
}
/**
* CompletableFuture generator
* @param type type
* @param supplier supplier
* @param runnable runnable
* @param executor executor
* @param <U> type
* @return the CompletableFuture
*/
private static synchronized <U> CompletableFuture<U> createCompletableFuture(TYPE type, Supplier<U> supplier, Runnable runnable,
Executor executor) {
CompletableFuture<U> completableFuture = null;
switch (type) {
case SUPPLY_ASYNC_0:
completableFuture = CompletableFuture.supplyAsync(supplier);
case SUPPLY_ASYNC_1:
if (supplier == null) {
System.out.println("supplier is null:SUPPLY_ASYNC_1");
break;
}
if (executor == null) {
System.out.println("executor is null:SUPPLY_ASYNC_1");
executor = Executors.newSingleThreadExecutor();
}
completableFuture = CompletableFuture.supplyAsync(supplier, executor);
break;
case RUN_ASYNC_0:
CompletableFuture.runAsync(runnable);
break;
case RUN_ASYNC_1:
if (runnable == null) {
System.out.println("runnable is null:RUN_ASYNC_1");
break;
}
if (executor == null) {
System.out.println("executor is null:RUN_ASYNC_1");
break;
}
CompletableFuture.runAsync(runnable, executor);
break;
}
return completableFuture;
}
private static Supplier<String> stringSupplier = () -> "supply of string";
private static Runnable runnable = () -> System.out.println("void runnable");
private static Executor executor = Executors.newFixedThreadPool(4);
public static void main(String ... args) {
CompletableFuture<Void> voidCompletableFutureV1
= createCompletableFuture(TYPE.RUN_ASYNC_1, null, runnable, executor);
CompletableFuture<Void> voidCompletableFuture
= createCompletableFuture(TYPE.RUN_ASYNC_0, null, runnable, null);
CompletableFuture<String> stringCompletableFutureV1
= createCompletableFuture(TYPE.SUPPLY_ASYNC_1, stringSupplier, null, executor);
CompletableFuture<String> stringCompletableFuture
= createCompletableFuture(TYPE.SUPPLY_ASYNC_0, stringSupplier, null, null);
String result0 = stringCompletableFuture.getNow("Empty V0");
String result1 = stringCompletableFutureV1.getNow("Empty V1");
System.out.println("Result:" + result0 + " , " + result1);
}
}
CompletionStage接口提供的方法
上图中详细列出了CompletionStage接口提供的方法,大体上分为几类,每个类都会有三个类似的方法,一个是使用当前线程执行,一个使用共享的线程池来执行,一个使用自定义的(参数传递)Executor来执行,类似下面的代码所示。
thenXXX() :使用当前线程继续执行接下来的内容
thenXXXAsync() :使用共享线程池执行接下来的内容
thenXXXAsync(with Executor) :使用参数中提供的Executor来执行接下来的内容
下面分不同的类别列举出CompletionStage提供的接口,如果你想了解CompletableFuture这个类的所以情况,那么你应该阅读源码,或者研读文档。
1、Apply类
你可以通过此类操作将CompletableFuture的结果转换成例外一种结果,下面是一个将String类型的结果转换为Integer类型的操作示例:
stringCompletableFuture.thenApply(new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return Integer.parseInt(s);
}
});
2、Accept类
你只是想消耗CompletableFuture返回的结果,那么就可以使用该类型的操作,下面是使用示例,该示例等待CompletableFuture的结果,然后使用相同的线程打印在标准输出上:
stringCompletableFuture.thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println("->" + s);
}
});
你需要关注的是,一点使用了Accept类型的操作,CompletableFuture的结果将被消耗。
3、Run类
如果你想将一个Runnable安排在一个CompletableFuture完成之后,可以使用该操作,使用该操作之后,一旦CompletableFuture完成,就会执行指定的Runnable任务,下面是使用示例:
stringCompletableFuture.thenRun(new Runnable() {
@Override
public void run() {
System.out.println("Another job here");
}
});
4、Combine类
Combine如同他的含义一样用于combine两个CompletableFuture的结果,你可以将两个CompletableFuture的结果进行转换然后返回,比如下面的代码展示了如何等待另外一个CompletableFuture,然后将当前CompletableFuture的结果和等待的CompletableFuture的结果进行一些组合然后返回:
stringCompletableFuture.thenCombine(stringCompletableFutureV1, new
BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) {
return "Combine:" + s + " : " + s2;
}
});
5、AcceptBoth类
如果你想等待两个CompletableFuture结束,然后再消费他们,那你可以使用该操作,需要注意的是,你指定的消费者会在两个CompletableFuture都完成之后再执行,因为你指定的消费者需要等待两个CompletableFuture完成的结果作为参数来运行,经过思考,只有当两个CompletableFuture都结束之后,该消费者才能正常运行,下面展示了简单使用方法:
stringCompletableFuture.thenAcceptBoth(stringCompletableFutureV1, new BiConsumer<String, String>() {
@Override
public void accept(String s, String s2) {
System.out.println("Consume:" + s + ":" + s2);
}
});
6、runAfterBoth类
相比于AcceptBoth,runAfterBoth虽然会等待两个CompletableFuture都结束再运行,但是不再依赖两个CompletableFuture的运行结果,所以你需要选择合适的类型里为你的业务工作:
stringCompletableFuture.runAfterBoth(stringCompletableFutureV1, new Runnable() {
@Override
public void run() {
System.out.println("do some jobs here");
}
});
7、applyToEither类
使用该类型可以达到的效果是:哪个CompletableFuture先结束,就会被消费者消费,所以有竞争关系,先运行完成就可以被消费者消费,第二个CompletableFuture的结果将不会被消费,等待一个CompletableFuture运行完成之后,你可以对结果进行转换输出,下面是一个简单的示例:
stringCompletableFuture.applyToEither(stringCompletableFutureV1, new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return Integer.parseInt(s);
}
});
8、acceptEither类
这个类型的操作也只会等待一个CompletableFuture结束,然后就会被消费者消费,而且该类型的操作仅仅能进行对CompletableFuture结果的消费,无法转换输出,下面是示例:
stringCompletableFuture.acceptEither(stringCompletableFutureV1, new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println("->" + s);
}
});
9、runAfterEither类
等待一个CompletableFuture结束,然后运行指定的Runnable,下面是示例:
stringCompletableFuture.runAfterEither(stringCompletableFutureV1, new Runnable() {
@Override
public void run() {
System.out.println("other job");
}
});
10、thenCompose类
使用该类型的操作,你可以使用当前CompletableFuture的结果来产生一个全新的CompletableFuture,then的含义便是等待当前的CompletableFuture运行结束,然后再运行我们指定的Supplier,下面是一个简单的实例,展示了如何依赖当前的CompletableFuture运行结果来组装出一个新的CompletableFuture:
stringCompletableFuture.thenCompose(new Function<String, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(String s) {
return createCompletableFuture(TYPE.SUPPLY_ASYNC_0, new Supplier<String>() {
@Override
public String get() {
return s + ":" + s;
}
}, null, null);
}
});
11、exceptionally类
这是单独的一个类型,使用这个方法可以对当前的CompletableFuture抛出的异常进行一些处理,类似于Fallback,你可以在这里返回一些兜底的数据来屏蔽当前CompletableFuture运行抛出的异常,当然你也可以在该方法里面直接抛出当前CompletableFuture抛出的异常,下面是一个简单的示例:
stringCompletableFuture.exceptionally(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) {
return "fallback";
}
});
12、whenComplete类
当前CompletableFuture将会把运行运行结果作为参数传递给指定的function,你可以通过参数来判断当前CompletableFuture是否成功完成还是抛出了异常,然后根据你的逻辑消费异常或者正常的结果,下面是一个简单的示例:
stringCompletableFuture.whenComplete(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
if (throwable == null) {
System.out.println(s);
} else {
throwable.printStackTrace();
}
}
});
13、handle类
比起whenComplete类,该类型不仅可以获取到当前CompletableFuture是成功了还是抛出了异常之外,还可以对结果进行转换(如果当前CompletableFuture成功完成了),下面是简单示例:
stringCompletableFuture.handle(new BiFunction<String, Throwable, Integer>() {
@Override
public Integer apply(String s, Throwable throwable) {
if (throwable != null) {
throwable.printStackTrace();
} else {
return Integer.parseInt(s);
}
return -1;
}
});
四、结尾
在CompletableFuture类中,我们在上面的分析中都是要么组合两个CompletableFuture,要么就是对一个CompletableFuture的操作,下面介绍的两个静态方法可以组合多个CompletableFuture,然后执行执行操作。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
首先是allOff方法,你可以传递任意多的CompletableFuture给allOff,然后可以将其结果看成是单个CompletableFuture一样继续可以施加操作,效果是allOff的语义决定了其表现行为,allOff方法将等待所有的CompletableFuture都结束之后才会继续之后的操作,下面是简单示例:
CompletableFuture
.allOf(stringCompletableFuture, stringCompletableFutureV1, voidCompletableFuture, voidCompletableFutureV1)
.thenCombine(stringCompletableFuture, new BiFunction<Void, String, String>() {
@Override
public String apply(Void aVoid, String s) {
return s;
}
})
.thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});
其次是anyOf方法,这个方法和allOf方法一样,但是anyOf方法发现有一个CompletableFuture完成之后便立刻执行之后的操作,使用方法和allOf事一样的,知识语义不一样,需要根据不同的场景选择使用allOf或者anyOf。
下面是一个可以等待所有CompletableFuture都结束的工具类,这个工具类的特点是可以设定等待时间,然后在超时的时候会检测出到底是哪个CompletableFuture超时,本文到此也就结束了:
/**
* This method will detect which services are timed out
* @param className the caller class name
* @param timeout timeout to return
* @param futureMap the map : key is service desc, value is the future.
*/
public static void awaitFuturesCompletedOrTimeoutWithReportTimeoutService
(String className, int timeout, Map<String, CompletableFuture<?>> futureMap)
throws ExecutionException, InterruptedException {
if (futureMap == null || futureMap.isEmpty()) {
return;
}
CompletableFuture<?>[] futures = new CompletableFuture[futureMap.size()];
futureMap.values().toArray(futures);
try {
CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
//detect which services are timed out
futureMap.forEach((service, cf) -> {
try {
//do not assign timeout = 0 NANOSECONDS since the get(timeout, unit) will throw
//a TimeoutException exception immediately,but we want to try to get the result
//with a small timeout value firstly, it's worth taking 1/100000 MILLISECONDS to
//detect which services are timed out.
cf.get(1, TimeUnit.NANOSECONDS);
} catch (InterruptedException | ExecutionException exception) {
LOGGER.warn("Failed to detect which services are timed out for class:{}, at detect service: {}, "
+ "error:", className, service, exception);
} catch (TimeoutException timeoutException) {
LOGGER.warn("Timed out for waiting futures completed:className={}, timeoutMillis={}, Service:{}",
className, timeout, service);
}
});
}
}