java8_CompletableFuture
2016-11-11 本文已影响42人
梓青
标签:java
可完成的Future
private static final Logger LOGGER = Logger.getGlobal();
private ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
//返回结果的任务(耗时)
private String getData(){
//do something time-consuming background
try{
Thread.sleep(1000);
}catch (InterruptedException e){
LOGGER.info("线程中断");
}
return "result data";
}
回顾Future使用
/**
* 回顾Future使用
*/
private void FutureTest() throws InterruptedException, ExecutionException {
Callable<String> callable=()->getData();
FutureTask<String> futureTask = new FutureTask<String>(callable);
//使用1
Thread getDataThread = new Thread(futureTask);
//做一些其他的事
//获取任务结果
//如果调用还未返回结果,则阻塞线程
LOGGER.info(futureTask.get());
//使用2(线程池提交,返回future)
Future<String> resultFuture = threadExecutor.submit(callable);
LOGGER.info(resultFuture.get());
}
CompletableFuture
tip:**Async()方法都有两种版本; 1,在普通的ForkJoinPool运行; 2,提供自定义的线程池
/**
* compeletableFuture
*/
private void compeletableFutureTest() {
Supplier<String> stringSupplier=()->getData();
Function<String,Integer> function=(str)->str.length();
Consumer<Integer> consumer=(length)->LOGGER.info("长度为"+length);
**********创建**********
// 1.静态方法supplyAsync(),接收消费者函数,一步执行返回结果
// 2.静态方法runAsync(),接收Runable(),返回CompletetableFunture<void>对象
CompletableFuture<Void> resultFuture=
CompletableFuture.supplyAsync(stringSupplier)//Step1:获取数据,返回C<String>
.thenApply(function)//Step2:进一步处理数据,得到结果,返回C<Integer>
.thenAccept(consumer);//Step3:处理数据,返回C<Void>
**********操作**********
//thenApply(T->U);为结果提供一个操作,返回另一种结果
//thenCompose(T->Completetable<U>);为结果提供一个操作,返回另一个可完成的Future,用于组合
//thenAccept(T->Void);为结果提供一个操作,不返回结果
//handle((T,Throwable)->U);处理结果或者错误
//whenComplete((T,Throwable)->void);处理结果或者错误,不返回结果
//thenRun(Runable);执行Runable对象
**********组合**********
//CompletableFuture<T>.thenCombine(CompletableFuture<U>,(T,U)->V);执行两个对象,完成后处理结果,返回最终结果
//CompletableFuture<T>.thenAcceptBoth(CompletableFuture<U>,(T,U)->Void);执行两个对象,完成后处理结果,无返回
//CompletableFuture<T>.runAfterBoth(CompletableFuture<U>,Runable);执行两个对象,完成后执行runable
//CompletableFuture<T>.applyToEither(CompletableFuture<U>,T->U);其中一个对象执行完成,应用函数
//CompletableFuture<T>.acceptEither(CompletableFuture<U>,T->Void);同上,无返回
//CompletableFuture<T>.runAfterEither(CompletableFuture<U>,Runable);同上,执行runable
//CompletableFuture.allOf(CompletableFuture<?>...);所有对象完成后结束,无返回
//CompletableFuture.anyOf(CompletableFuture<?>...);任何一个对象完成后结束,无返回
}
实例
private void testCfAllOf() {
Integer[] array = {1, 2, 3, 4};
CompletableFuture[] cfs = Arrays.stream(array)
.map(i -> {
return CompletableFuture.runAsync(() -> {
//睡眠i秒
try {
Thread.sleep(i * 1000);
LOGGER.info("thread_" + i + " done!");
} catch (InterruptedException e) {
LOGGER.info("thread error , id:" + i);
}
});
})
.toArray(CompletableFuture[]::new);
//调用get()/join()都可以阻塞获取结果,等待所有CF完成
CompletableFuture.allOf(cfs).join();
LOGGER.info("allOf done!");
}