Java

Java异步

2020-05-21  本文已影响0人  愤怒的老照

1、同步、异步、阻塞、非阻塞

理解Java的异步调用,还需要从这四个基础概念理解;如果要从Java方面来理解这四个概念,需要对Java的IO模型有了解,Java 对于 IO 的封装分为 BIO、NIO 和 AIO,BIO 对应的是阻塞同步 IO,NIO 和 AIO 对应的都是非阻塞同步 IO。下面梳理一下对这几个概念的理解。https://www.jianshu.com/p/0efedb229e98

一般的系统无法避免IO依赖,无论是从本地磁盘或从网络读取,是远程通信和本地文件读写的核心。但是由于IO的速度比CPU慢很多,所以IO成了系统的瓶颈,也就很有必要学习IO模型,避免瓶颈,提升性能。

1.1 Linux IO模型

首先从操作系统层面来说,以Linux为例,Linux内核把所有的外来设备当做一个文件,对本地文件有file descriptor处理,对网络文件有socket file descriptor处理,所以可以把二者一起分析。

文件读取设计两个部分,一个是用户进程,一个是内核,用户进程是没有权利操作系统资源的,所以用户进程需要通过系统调用来调用内核,从而达到文件读取的目的。整个过程分三步:

从这个步骤来看,阻塞非阻塞是针对调用方法来说的,如果用户进程在内核将数据准备完成前一致处于阻塞等待状态,就是阻塞的;如果内核还没有准备好,用户调用后直接返回,并没有阻塞,此时就是非阻塞的。。。同步异步是针对内核数据到用户进程这一过程看的,同步是指读写事件就绪后(一般是采用轮询检查),用户进程再自己负责进行读写的操作,内核向用户进程复制数据的过程仍然是阻塞的;如果是内核将数据复制到用户进程,并在复制完成后通知用户,用户没有主动检查数据准备状态,就是异步的,同步和异步的本质区别是,内核数据复制到用户空间时,用户进程是否进行等待。

Linux中用的比较多的是IO多路复用模型,I/O多路复用可以监视多个描述符,一旦某个描述符读写操作就绪,便可以通知程序进行相应的读写操作,这个模型也是同步IO。

1.2 Java IO模型

下面来看Java中的IO模型Java对于IO的封装分为BIO、NIO和AIO。BIO对应的是阻塞同步IO,NIO和AIO对应的都是非阻塞同步IO。

1.2.1 BIO

Java中的BIO对应的是同步阻塞IO。BIO针对每一个客户端请求都建立一个Socket连接,当客户端没有数据时线程一直处于阻塞状态。这种IO模型的优点是简单,但是服务端的线程个数和客户端并发访问数呈正比,如果访问增多,线程也会增多导致OOM;可以通过线程池来解决,但是线程池可以接受的线程数量也是由系统决定的,并且线程上下文切换会导致CPU使用率不高,所以BIO不适合大规模使用。
BIO请求代码如下:

{
 ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//线程池

 ServerSocket serverSocket = new ServerSocket();
 serverSocket.bind(8088);
 while(!Thread.currentThread.isInturrupted()){//主线程死循环等待新连接到来
 Socket socket = serverSocket.accept();
 executor.submit(new ConnectIOnHandler(socket));//为新的连接创建新的线程
}

class ConnectIOnHandler extends Thread{
    private Socket socket;
    public ConnectIOnHandler(Socket socket){
       this.socket = socket;
    }
    public void run(){
      while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){死循环处理读写事件
          String someThing = socket.read()....//读取数据
          if(someThing!=null){
             ......//处理数据
             socket.write()....//写数据
          }

      }
    }
}

1.2.2 NIO

NIO对应的是同步非阻塞IO。NIO有几个关键的概念,selector、channel、buffer。

NIO代码如下:

 interface ChannelHandler{
      void channelReadable(Channel channel);
      void channelWritable(Channel channel);
   }
   class Channel{
     Socket socket;
     Event event;//读,写或者连接
   }

   //IO线程主循环:
   class IoThread extends Thread{
   public void run(){
   Channel channel;
   while(channel=Selector.select()){//选择就绪的事件和对应的连接,阻塞的不需要担心cpu空转
      if(channel.event==accept){
         registerNewChannelHandler(channel);//如果是新连接,则注册一个新的读写处理器
      }
      if(channel.event==write){
         getChannelHandler(channel).channelWritable(channel);//如果可以写,则执行写事件
      }
      if(channel.event==read){
          getChannelHandler(channel).channelReadable(channel);//如果可以读,则执行读事件
      }
    }
   }
   Map<Channel,ChannelHandler> handlerMap;//所有channel的对应事件处理器
  }

NIO由原来的阻塞读写(占用线程)变成了单线程轮询事件,找到可以进行读写的网络描述符进行读写。除了事件的轮询是阻塞的(没有可干的事情必须要阻塞),剩余的I/O操作都是纯CPU操作,没有必要开启多线程。

并且由于线程的节约,连接数大的时候因为线程切换带来的问题也随之解决,进而为处理海量连接提供了可能。

1.2.3 AIO

还不会。。。。。

2、Java代码中的异步

上面说的是Java IO模型的同步异步,在业务代码中可不可以体现异步呢?答案是肯定的。

2.1 同步代码

public class Main {                                                                          
    public static void main(String[] args) {                                                 
                                                                                             
        long begin = System.currentTimeMillis();                                             
                                                                                             
        calculate();                                                                         
        long end = System.currentTimeMillis();                                               
        System.out.println(String.format("耗时%ss", String.valueOf(end - begin)));             
    }                                                                                        
                                                                                             
    public static  void calculate(){                                                         
        // 模拟耗时                                                                              
        try {                                                                                
            TimeUnit.SECONDS.sleep(5);                                                       
        } catch (InterruptedException e) {                                                   
            e.printStackTrace();                                                             
        }                                                                                    
                                                                                             
        System.out.println("计算完成");                                                          
    }                                                                                        
}                                                                                            

代码中调用方必须要等到被调用方执行结束后,才可以继续执行。

2.2 基于Future的异步调用

Future 模式相当于一个占位符,代表一个操作的未来的结果,其简单的概念不在本文中介绍,直接给出总结:Future 模式可以细分为将来式和回调式两种模式。

2.2.1 将来式

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService service = Executors.newFixedThreadPool(10);
        long begin = System.currentTimeMillis();
        Future<String> future = service.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
               return calculate();
            }
        });
        long end = System.currentTimeMillis();
        System.out.println("do otherthing");
        System.out.println(future.get());
        System.out.println(String.format("耗时%ss", String.valueOf(end - begin)));
    }

    public static  String calculate(){
        // 模拟耗时
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "100";
    }
}

提交任务到线程池后,主线程可以继续做自己的事情,但是future.get()还是阻塞的,所以如果在提交任务后直接get,并不会提高效率,反而由于线程的开销会比同步调用更慢。除了get,还可以通过isDone方法轮询检查是否计算完成,本质上和get没有什么区别。

所以Future虽然可以实现异步,但是还是有缺点,对比JavaScript的异步,不满足一下几点:

这种方式在Future的第二种回调式中声明了。

2.2.2回调式

这种方式就是回调,但是Future 并没有实现 callback,addListener 这样的方法,想要在 JAVA 中体验到 callback 的特性,得引入一些额外的框架。有Netty、Guava第三方框架可以使用, jdk1.8 已经提供了一种更为高级的回调方式:CompletableFuture。

2.2.2.1 Guava-ListeningExecutorService
public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        long begin = System.currentTimeMillis();
        ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
        ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
            public Integer call() throws Exception {
                System.out.println("执行耗时操作...");
                calculate();
                return 100;
            }
        });

        Futures.addCallback(future, new FutureCallback<Integer>() {

            @Override
            public void onSuccess(Integer result) {
                System.out.println("计算成功:" + result);
            }

            @Override
            public void onFailure(Throwable t) {
                System.out.println("计算失败:" + t.getMessage());
            }
        });

        long end = System.currentTimeMillis();
        System.out.println("do otherthing");
        System.out.println(String.format("耗时%ss", String.valueOf(end - begin)));


    }

    public static  String calculate(){
        // 模拟耗时
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "100";
    }
}

这下可以彻底不用管结果了,😌😌😌,但是还是有链式调用的问题,所以还是看看CompletableFuture的用法,这是一个非常好用的异步编程类

2.3 JDK-CompletableFuture

2.3.1初始化

有几个常用的初始化静态工厂方法:

CompletableFuture.runAsync(Runnable runnable);
CompletableFuture.runAsync(Runnable runnable, Executor executor);

CompletableFuture.supplyAsync(Supplier<U> supplier);
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)

2.3.2 任务执行

//前三个任务 A 无返回值,所以对应的,第 2 行和第 3 行代码中,resultA 其实是 null。
CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); 
CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); 
CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");

// thenRun(Runnable runnable),任务 A 执行完执行 B,并且 B 不需要 A 的结果。
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
//thenAccept(Consumer action),任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 不返回值。
CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
//thenApply(Function fn),任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值。
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");

解释在注释里,如果任务 B 后面还有任务 C,往下继续调用 .thenXxx() 即可。

2.3.3 异常处理

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

看一个例子:

CompletableFuture.supplyAsync(() -> "resultA")
    .thenApply(resultA -> resultA + " resultB")
    .thenApply(resultB -> resultB + " resultC")
    .thenApply(resultC -> resultC + " resultD");

如果第一个出现了异常,其他的都不能执行,那么我们怎么处理异常呢?看下面的代码,我们在任务 A 中抛出异常,并对其进行处理:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
   throw new RuntimeException();
}).exceptionally(ex -> "errorResultA")
  .thenApply(resultA -> resultA + " resultB")
  .thenApply(resultB -> resultB + " resultC")
  .thenApply(resultC -> resultC + " resultD");

System.out.println(future.join());

上面的代码中,任务 A 抛出异常,然后通过 .exceptionally() 方法处理了异常,并返回新的结果,这个新的结果将传递给任务 B。所以最终的输出结果是:

errorResultA resultB resultC resultD

2.3.4 聚合任务

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");

//thenAcceptBoth 表示后续的处理不需要返回值
cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});
// thenCombine 表示需要返回值。
cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
//不需要result
cfA.runAfterBoth(cfB, () -> {});

2.3.5 取多个任务的结果

// N个Future全部完成才可以
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}
// 只需要满足一个就可以
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {...}
CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123);
CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC");

CompletableFuture<Void> future = CompletableFuture.allOf(cfA, cfB, cfC);
// 所以这里的 join() 将阻塞,直到所有的任务执行结束
future.join();

2.3.6 compose

与thenCombine不同的是,compose的后一个是基于前一个的结果,而thenCombine只是把两个的结果组合在一起。

CompletableFuture参考:https://www.javadoop.com/post/completable-future

上一篇 下一篇

猜你喜欢

热点阅读