Java异步
1、同步、异步、阻塞、非阻塞
理解Java的异步调用,还需要从这四个基础概念理解;如果要从Java方面来理解这四个概念,需要对Java的IO模型有了解,Java 对于 IO 的封装分为 BIO、NIO 和 AIO,BIO 对应的是阻塞同步 IO,NIO 和 AIO 对应的都是非阻塞同步 IO。下面梳理一下对这几个概念的理解。https://www.jianshu.com/p/0efedb229e98
一般的系统无法避免IO依赖,无论是从本地磁盘或从网络读取,是远程通信和本地文件读写的核心。但是由于IO的速度比CPU慢很多,所以IO成了系统的瓶颈,也就很有必要学习IO模型,避免瓶颈,提升性能。
1.1 Linux IO模型
首先从操作系统层面来说,以Linux为例,Linux内核把所有的外来设备当做一个文件,对本地文件有file descriptor处理,对网络文件有socket file descriptor处理,所以可以把二者一起分析。
文件读取设计两个部分,一个是用户进程,一个是内核,用户进程是没有权利操作系统资源的,所以用户进程需要通过系统调用来调用内核,从而达到文件读取的目的。整个过程分三步:
- 用户进程调用read方法向内核发起读请求
- 内核将要读取的信息复制到缓冲区
- 内核将数据从缓冲区复制到用户进程空间
从这个步骤来看,阻塞非阻塞是针对调用方法来说的,如果用户进程在内核将数据准备完成前一致处于阻塞等待状态,就是阻塞的;如果内核还没有准备好,用户调用后直接返回,并没有阻塞,此时就是非阻塞的。。。同步异步是针对内核数据到用户进程这一过程看的,同步是指读写事件就绪后(一般是采用轮询检查),用户进程再自己负责进行读写的操作,内核向用户进程复制数据的过程仍然是阻塞的;如果是内核将数据复制到用户进程,并在复制完成后通知用户,用户没有主动检查数据准备状态,就是异步的,同步和异步的本质区别是,内核数据复制到用户空间时,用户进程是否进行等待。
Linux中用的比较多的是IO多路复用模型,I/O多路复用可以监视多个描述符,一旦某个描述符读写操作就绪,便可以通知程序进行相应的读写操作,这个模型也是同步IO。
1.2 Java IO模型
下面来看Java中的IO模型Java对于IO的封装分为BIO、NIO和AIO。BIO对应的是阻塞同步IO,NIO和AIO对应的都是非阻塞同步IO。
1.2.1 BIO
Java中的BIO对应的是同步阻塞IO。BIO针对每一个客户端请求都建立一个Socket连接,当客户端没有数据时线程一直处于阻塞状态。这种IO模型的优点是简单,但是服务端的线程个数和客户端并发访问数呈正比,如果访问增多,线程也会增多导致OOM;可以通过线程池来解决,但是线程池可以接受的线程数量也是由系统决定的,并且线程上下文切换会导致CPU使用率不高,所以BIO不适合大规模使用。
BIO请求代码如下:
{
ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//线程池
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(8088);
while(!Thread.currentThread.isInturrupted()){//主线程死循环等待新连接到来
Socket socket = serverSocket.accept();
executor.submit(new ConnectIOnHandler(socket));//为新的连接创建新的线程
}
class ConnectIOnHandler extends Thread{
private Socket socket;
public ConnectIOnHandler(Socket socket){
this.socket = socket;
}
public void run(){
while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){死循环处理读写事件
String someThing = socket.read()....//读取数据
if(someThing!=null){
......//处理数据
socket.write()....//写数据
}
}
}
}
1.2.2 NIO
NIO对应的是同步非阻塞IO。NIO有几个关键的概念,selector、channel、buffer。
- buffer是包含需要读取或写入的数据的缓冲区。NIO中所有数据的读写均通过缓冲区进行操作。相对于BIO流的好处是可以重复读取,并且可以双向通信。
- channel是一个双向的数据读写的通道,buffer就是建立在channel的基础上流通的。
- selector是NIO的核心,通过不断轮询注册在其之上的channel,当selector发现某个channel有数据状态有变化时,所以只需要一个线程,就可以解决所有的连接请求。
NIO代码如下:
interface ChannelHandler{
void channelReadable(Channel channel);
void channelWritable(Channel channel);
}
class Channel{
Socket socket;
Event event;//读,写或者连接
}
//IO线程主循环:
class IoThread extends Thread{
public void run(){
Channel channel;
while(channel=Selector.select()){//选择就绪的事件和对应的连接,阻塞的不需要担心cpu空转
if(channel.event==accept){
registerNewChannelHandler(channel);//如果是新连接,则注册一个新的读写处理器
}
if(channel.event==write){
getChannelHandler(channel).channelWritable(channel);//如果可以写,则执行写事件
}
if(channel.event==read){
getChannelHandler(channel).channelReadable(channel);//如果可以读,则执行读事件
}
}
}
Map<Channel,ChannelHandler> handlerMap;//所有channel的对应事件处理器
}
NIO由原来的阻塞读写(占用线程)变成了单线程轮询事件,找到可以进行读写的网络描述符进行读写。除了事件的轮询是阻塞的(没有可干的事情必须要阻塞),剩余的I/O操作都是纯CPU操作,没有必要开启多线程。
并且由于线程的节约,连接数大的时候因为线程切换带来的问题也随之解决,进而为处理海量连接提供了可能。
1.2.3 AIO
还不会。。。。。
2、Java代码中的异步
上面说的是Java IO模型的同步异步,在业务代码中可不可以体现异步呢?答案是肯定的。
2.1 同步代码
public class Main {
public static void main(String[] args) {
long begin = System.currentTimeMillis();
calculate();
long end = System.currentTimeMillis();
System.out.println(String.format("耗时%ss", String.valueOf(end - begin)));
}
public static void calculate(){
// 模拟耗时
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("计算完成");
}
}
代码中调用方必须要等到被调用方执行结束后,才可以继续执行。
2.2 基于Future的异步调用
Future 模式相当于一个占位符,代表一个操作的未来的结果,其简单的概念不在本文中介绍,直接给出总结:Future 模式可以细分为将来式和回调式两种模式。
2.2.1 将来式
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(10);
long begin = System.currentTimeMillis();
Future<String> future = service.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return calculate();
}
});
long end = System.currentTimeMillis();
System.out.println("do otherthing");
System.out.println(future.get());
System.out.println(String.format("耗时%ss", String.valueOf(end - begin)));
}
public static String calculate(){
// 模拟耗时
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "100";
}
}
提交任务到线程池后,主线程可以继续做自己的事情,但是future.get()还是阻塞的,所以如果在提交任务后直接get,并不会提高效率,反而由于线程的开销会比同步调用更慢。除了get,还可以通过isDone方法轮询检查是否计算完成,本质上和get没有什么区别。
所以Future虽然可以实现异步,但是还是有缺点,对比JavaScript的异步,不满足一下几点:
- 结果的获取还是通过get阻塞的方式,更好的方式是采用回调
- 如果多个Future形成依赖,会产生回调地狱,如果可以链式调用就更好了。
这种方式在Future的第二种回调式中声明了。
2.2.2回调式
这种方式就是回调,但是Future 并没有实现 callback,addListener 这样的方法,想要在 JAVA 中体验到 callback 的特性,得引入一些额外的框架。有Netty、Guava第三方框架可以使用, jdk1.8 已经提供了一种更为高级的回调方式:CompletableFuture。
2.2.2.1 Guava-ListeningExecutorService
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
long begin = System.currentTimeMillis();
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
public Integer call() throws Exception {
System.out.println("执行耗时操作...");
calculate();
return 100;
}
});
Futures.addCallback(future, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
System.out.println("计算成功:" + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("计算失败:" + t.getMessage());
}
});
long end = System.currentTimeMillis();
System.out.println("do otherthing");
System.out.println(String.format("耗时%ss", String.valueOf(end - begin)));
}
public static String calculate(){
// 模拟耗时
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "100";
}
}
这下可以彻底不用管结果了,😌😌😌,但是还是有链式调用的问题,所以还是看看CompletableFuture的用法,这是一个非常好用的异步编程类
2.3 JDK-CompletableFuture
2.3.1初始化
有几个常用的初始化静态工厂方法:
CompletableFuture.runAsync(Runnable runnable);
CompletableFuture.runAsync(Runnable runnable, Executor executor);
CompletableFuture.supplyAsync(Supplier<U> supplier);
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
- runAsync 方法接收的是 Runnable 的实例,意味着它没有返回值
- supplyAsync 方法对应的是有返回值的情况
- 这两个方法的带 executor,表示让任务在指定的线程池中执行,不指定的话,通常任务是在 ForkJoinPool.commonPool() 线程池中执行的,里面是守护线程。
2.3.2 任务执行
//前三个任务 A 无返回值,所以对应的,第 2 行和第 3 行代码中,resultA 其实是 null。
CompletableFuture.runAsync(() -> {}).thenRun(() -> {});
CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {});
CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");
// thenRun(Runnable runnable),任务 A 执行完执行 B,并且 B 不需要 A 的结果。
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
//thenAccept(Consumer action),任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 不返回值。
CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
//thenApply(Function fn),任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值。
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
解释在注释里,如果任务 B 后面还有任务 C,往下继续调用 .thenXxx() 即可。
2.3.3 异常处理
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
看一个例子:
CompletableFuture.supplyAsync(() -> "resultA")
.thenApply(resultA -> resultA + " resultB")
.thenApply(resultB -> resultB + " resultC")
.thenApply(resultC -> resultC + " resultD");
如果第一个出现了异常,其他的都不能执行,那么我们怎么处理异常呢?看下面的代码,我们在任务 A 中抛出异常,并对其进行处理:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException();
}).exceptionally(ex -> "errorResultA")
.thenApply(resultA -> resultA + " resultB")
.thenApply(resultB -> resultB + " resultC")
.thenApply(resultC -> resultC + " resultD");
System.out.println(future.join());
上面的代码中,任务 A 抛出异常,然后通过 .exceptionally() 方法处理了异常,并返回新的结果,这个新的结果将传递给任务 B。所以最终的输出结果是:
errorResultA resultB resultC resultD
2.3.4 聚合任务
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");
//thenAcceptBoth 表示后续的处理不需要返回值
cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});
// thenCombine 表示需要返回值。
cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
//不需要result
cfA.runAfterBoth(cfB, () -> {});
2.3.5 取多个任务的结果
// N个Future全部完成才可以
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}
// 只需要满足一个就可以
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {...}
CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123);
CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC");
CompletableFuture<Void> future = CompletableFuture.allOf(cfA, cfB, cfC);
// 所以这里的 join() 将阻塞,直到所有的任务执行结束
future.join();
2.3.6 compose
与thenCombine不同的是,compose的后一个是基于前一个的结果,而thenCombine只是把两个的结果组合在一起。
CompletableFuture参考:https://www.javadoop.com/post/completable-future