Java8并发编程(使用CompletableFuture)
java8异步处理
非阻塞IO/异步/并行
非阻塞指线程处理异步任务时,当异步任务获取到数据时使用回调函数处理数据,而不是CPU空闲等待数据返回后再处理。
使用场景
Restful服务往往部署在不同的物理机器上,通过Http协议进行调用,如果直接在主线程中进行IO操作,主线程将阻塞以等待请求完成。而Java原生多线程编程繁琐而且容易出错,线程池也无法完美解决主线程阻塞的问题,Future以同步代码风格编写异步调用,提供主线程无需阻塞的方法,对于微服务架构是一个很好的并发解决方案。
在实际的web项目中,传统的服务器后端,往往使用线程池做请求处理,每一个请求到来独占一个线程,当涉及io操作时,线程阻塞以等待IO完成,当IO耗时太长或者IO操作太频繁时,线程长时间无法释放,导致线程池可用线程不足,后续的请求被迫排队,服务器吞吐量受到严重影响。
另一个场景不太常见,一段代码中包含多个计算密集型任务(如超大矩阵计算)时,在同一个线程中线性执行使得整段代码运行时间过长,这时使用异步执行可以适当加速可并行执行的代码。
scala事件驱动
Scala的Future和Promise使用 ExecutionContext
管理异步计算任务,宏观上控制并行的粒度。你可以把它看成是一个线程池,它给任务分配新的线程或者在本线程下执行,虽然不推荐这种降级为线性执行的方式。
ExecutionContext
用 ForkJoinPool
实现线程管理,有特殊需求时需要特别配置 ForkJoinPool
的最大线程数。最大线程数设置一般参照CPU核数。虽然线程很轻量,但是当CPU线程数远小于活跃的线程数量时,线程切换频繁发生时的开销还是挺可观的。这时候并行粒度太小,并行太多反而会降低性能。
Future
有两种状态
- 未完成
- 完成
完成状态有两种方式
- 成功,带有返回值
- 失败,带有异常
Java CompletableFuture Demo
使用前提:java8
Java8吸收了Scala的Future和其他基于事件驱动的异步调用框架,比如Netty的ChannelFuture等的优点,在原先的Future接口之上实现了CompletableFuture。
CompletableFuture 通过回调函数,实现非阻塞的IO的异步调用。
使用场景:Restful服务往往部署在不同的物理机器上,通过Http协议进行调用,如果直接在主线程中进行IO操作,主线程将阻塞以等待请求完成。而Java原生多线程编程繁琐而且容易出错,线程池也无法完美解决主线程阻塞的问题,Future以同步代码风格编写异步调用,提供主线程无需阻塞的方法,对于微服务架构是一个很好的并发解决方案。
全部Demo代码
Main.java
import java.io.IOException;
importjava.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args throws Exception {
HttpTask task = new HttpTask();
System.out.println("main threadstarts in thread id: "+Thread.currentThread().getId());
CompletableFuturefutureNonBlocking =CompletableFuture.supplyAsync(()-> {
try {
return task.doHtt("https://guazi.com");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedExceptione) {
e.printStackTrace();
}
return "error";
});
CompletableFuture futureBlocking= CompletableFuture.supplyAsync(( -> {
try {
return task.doHtt("https://guazi.com");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedExceptione) {
e.printStackTrace();
}
return "error";
});
// 非阻塞
System.out.printl("-----------------非阻塞位1----------------------\n");
// future.thenAcceptAsync() 方法阻塞本线程,http请求成功后执行回调函数
futureNonBlocking.thenAcceptAsyn(result -> System.out.printl("\nfrom non blocking future:\n+result+"\n"));
System.out.printl("-----------------非阻塞位2----------------------\n");
// 阻塞
System.out.println("\nfromblocking future:\n+futureBlocking.get()+"\n");
// future.get() 方法阻塞本线程,直http请求成功
System.out.printl("-----------------阻塞位1----------------------\n");
System.out.println("main threadends");
}
}
HttpTask.java
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import java.io.IOException;
public class HttpTask {
public String doHttp(String url)throws IOException,InterruptedException {
long threadId =Thread.currentThread().getId();
OkHttpClient client = newOkHttpClient();
System.out.println("http iostarts in thread id: "+ threadId);
Request request = newRequest.Builder()
.url(url)
.build();
Response response = client.newCal(request).execute();
String result = response.headers(.toString();
// mock delay
for (int i=1; i<=3; ++i){
// 阻塞本线程1秒
Thread.sleep(1000);
System.out.println("delayed + i + " seconds in thread id:"+ threadId);
}
return "******** headers inthread id "+threadId+"***********\n"+resul+"***************** headers******************";
}
}
输出结果
终端输出本Demo中有三个线程执行,已通过线程ID标识,非阻塞线程(这里线程id为10)异步执行,结果通过回调函数处理。线程id为11的同样异步执行,但是Future接口的get()方法使主线程(id为1)阻塞以等待结果到达。
总结
线程10 → 异步 + 非阻塞IO
线程11 → 异步 + 阻塞IO
推荐使用 异步 + 非阻塞IO 的方式,这也是 Java1.8
的 CompletableFuture
和 Java1.5
的 Future
的不同之处。
可以看到:
阻塞位置1 总是在 blocking future
的 get()
方法之后执行。
使用姿势
不关心返回结果
这种情况毋庸置疑: 异步 + 非阻塞
必须获取到返回结果程序才能继续执行
这种情况下有两种选择
- 异步 + 阻塞 (前文已述) 说明一点,这种方式并不是完全失去了性能上的提升,因为在另一进程在未执行完成时,本进程不是必须在空等,而是可以做自己的数据处理,两个进程并发执行,虽然有时候快的那个要等待慢的那个,但是这比非异步也是有很大的性能提升的.如果实际场景中返回结果耗时太长,比如下载批量图片,请使用方法2
- 使用回调函数,类似javascript对异步的处理. 这种方法可以实现 异步+非阻塞的效果,但是复杂逻辑场景中会出现回调函数嵌套层数增加的混乱. trade-offs :-) :-)