Java基础-IO

2020-02-17  本文已影响0人  16325

概念

BIO

NIO

            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(8090));
            serverSocketChannel.configureBlocking(false);
            //向通道注册选择器,并且注册接受事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                //获取已经准备好的通道数量
                int readyChannels = selector.selectNow();
                //如果没准备好,重试
                if (readyChannels == 0) continue;
                //获取准备好的通道中的事件集合
                Set selectedKeys = selector.selectedKeys();
                Iterator keyIterator = selectedKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey key = (SelectionKey) keyIterator.next();
                    if (key.isAcceptable()) {
                         这里注册的是accept事件,
                    } else if (key.isConnectable()) {
                    } else if (key.isReadable()) {
                    } else if (key.isWritable()) {
                    }
                    //注意每次迭代末尾的keyIterator.remove()调用。
                    //Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。
                    //下次该通道变成就绪时,Selector会再次将其放入已选择键集中
                    keyIterator.remove();
                }
            }
 

这里第一个while(true)难道不会把进程阻塞起来?
其实这里的阻塞和操作系统调用时候的阻塞并不是同一个含义,NIO的非阻塞指的是客户端链接,客户端数据的读写这些操作是非阻塞的,也就是说我们的java进程不需要在等待客户端链接的时候阻塞,而是有客户端链接之后,我们的进程就会可以拿到这个链接,注意这里是可以拿到,而并不是已经拿到,并没有回调的功能,如果注册了回调,客户端链接上来我们的回调函数执行,这就是AIO的范畴了。
所以,我们可以拿到链接,可以拿到客户端发来的数据,可以写数据。但是这只是可以,具体的拿这个操作,还是需要自己完成的。
这里可以想象一个极端的例子,客户端链接成功能后,五分钟内肯定会发来一条消息,那么我们真的可以不要代码中的while(true),而是等五分钟后,直接从selector中获取数据。

AIO

Reactor模型

1.同步的等待多个事件源到达(采用select()实现)
2.将事件多路分解以及分配相应的事件服务进行处理,这个分派采用server集中处理(dispatch)
3.分解的事件以及对应的事件服务应用从分派服务中分离出去(handler)

public NIOServer(int port) throws Exception {
      selector = Selector.open();
      serverSocket = ServerSocketChannel.open();
      serverSocket.socket().bind(new InetSocketAddress(port));
      serverSocket.configureBlocking(false);
      serverSocket.register(selector, SelectionKey.OP_ACCEPT);
  }
​
  @Override
  public void run() {
      while (!Thread.interrupted()) {
          try {
              //阻塞等待事件
              selector.select();
              // 事件列表
              Set selected = selector.selectedKeys();
              Iterator it = selected.iterator();
              while (it.hasNext()) {
                  it.remove();
                  //分发事件
                  dispatch((SelectionKey) (it.next()));
              }
          } catch (Exception e) {
​
          }
      }
  }
​
  private void dispatch(SelectionKey key) throws Exception {
      if (key.isAcceptable()) {
          register(key);//新链接建立,注册
      } else if (key.isReadable()) {
          read(key);//读事件处理
      } else if (key.isWritable()) {
          wirete(key);//写事件处理
      }
  }
​
  private void register(SelectionKey key) throws Exception {
      ServerSocketChannel server = (ServerSocketChannel) key
              .channel();
      // 获得和客户端连接的通道
      SocketChannel channel = server.accept();
      channel.configureBlocking(false);
      //客户端通道注册到selector 上
      channel.register(this.selector, SelectionKey.OP_READ);
  }

我们可以看到上述的NIO例子已经差不多拥有reactor的影子了:

1.基于事件驱动-> selector(支持对多个socketChannel的监听)
2.统一的事件分派中心-> dispatch
3.事件处理服务-> read & write

1.更少的资源利用,通常不需要一个客户端一个线程
2.更少的开销,更少的上下文切换以及locking
3.能够跟踪服务器状态
4.能够管理handler 对event的绑定

那么好的Reactor Pattern应该是怎样的?
首先我们基于Reactor Pattern 处理模式中,定义以下三种角色:

  • Reactor将I/O事件分派给对应的Handler

单Reactor单线程模型

image.png
/**
    * 等待事件到来,分发事件处理
    */
  class Reactor implements Runnable {
​
      private Reactor() throws Exception {
​
          SelectionKey sk =
                  serverSocket.register(selector,
                          SelectionKey.OP_ACCEPT);
          // attach Acceptor 处理新连接
          sk.attach(new Acceptor());
      }
​
      public void run() {
          try {
              while (!Thread.interrupted()) {
                  selector.select();
                  Set selected = selector.selectedKeys();
                  Iterator it = selected.iterator();
                  while (it.hasNext()) {
                      it.remove();
                      //分发事件处理
                      dispatch((SelectionKey) (it.next()));
                  }
              }
          } catch (IOException ex) {
              //do something
          }
      }
​
      void dispatch(SelectionKey k) {
          // 若是连接事件获取是acceptor
          // 若是IO读写事件获取是handler
          Runnable runnable = (Runnable) (k.attachment());
          if (runnable != null) {
              runnable.run();
          }
      }
​
  }
  /**
    * 连接事件就绪,处理连接事件
    */
  class Acceptor implements Runnable {
      @Override
      public void run() {
          try {
              SocketChannel c = serverSocket.accept();
              if (c != null) {// 注册读写
                  new Handler(c, selector);
              }
          } catch (Exception e) {
​
          }
      }
  }
  /**
    * 处理读写业务逻辑
    */
  class Handler implements Runnable {
      public static final int READING = 0, WRITING = 1;
      int state;
      final SocketChannel socket;
      final SelectionKey sk;
​
      public Handler(SocketChannel socket, Selector sl) throws Exception {
          this.state = READING;
          this.socket = socket;
          sk = socket.register(selector, SelectionKey.OP_READ);
          sk.attach(this);
          socket.configureBlocking(false);
      }
​
      @Override
      public void run() {
          if (state == READING) {
              read();
          } else if (state == WRITING) {
              write();
          }
      }
​
      private void read() {
          process();
          //下一步处理写事件
          sk.interestOps(SelectionKey.OP_WRITE);
          this.state = WRITING;
      }
​
      private void write() {
          process();
          //下一步处理读事件
          sk.interestOps(SelectionKey.OP_READ);
          this.state = READING;
      }
​
      /**
        * task 业务处理
        */
      public void process() {
          //do something
      }
  }

这是最基本的单Reactor单线程模型。其中Reactor线程,负责多路分离套接字,有新连接到来触发connect 事件之后,交由Acceptor进行处理,有IO读写事件之后交给hanlder 处理。Acceptor主要任务就是构建handler ,在获取到和client相关的SocketChannel之后 ,绑定到相应的hanlder上,对应的SocketChannel有读写事件之后,基于reactor 分发,hanlder就可以处理了(所有的IO事件都绑定到selector上,有Reactor分发)。该模型 适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,所以实际使用的不多。

单Reactor多线程模型

image.png
/**
    * 多线程处理读写业务逻辑
    */
  class MultiThreadHandler implements Runnable {
      public static final int READING = 0, WRITING = 1;
      int state;
      final SocketChannel socket;
      final SelectionKey sk;
​
      //多线程处理业务逻辑
      ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
​
​
      public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {
          this.state = READING;
          this.socket = socket;
          sk = socket.register(selector, SelectionKey.OP_READ);
          sk.attach(this);
          socket.configureBlocking(false);
      }
​
      @Override
      public void run() {
          if (state == READING) {
              read();
          } else if (state == WRITING) {
              write();
          }
      }
​
      private void read() {
          //任务异步处理
          executorService.submit(() -> process());
​
          //下一步处理写事件
          sk.interestOps(SelectionKey.OP_WRITE);
          this.state = WRITING;
      }
​
      private void write() {
          //任务异步处理
          executorService.submit(() -> process());
​
          //下一步处理读事件
          sk.interestOps(SelectionKey.OP_READ);
          this.state = READING;
      }
​
      /**
        * task 业务处理
        */
      public void process() {
          //do IO ,task,queue something
      }
  }

相对于第一种单线程的模式来说,在处理业务逻辑,也就是获取到IO的读写事件之后,交由线程池来处理,这样可以减小主reactor的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。

多Reactor多线程模型

image.png
/**
    * 多work 连接事件Acceptor,处理连接事件
    */
  class MultiWorkThreadAcceptor implements Runnable {
​
      // cpu线程数相同多work线程
      int workCount =Runtime.getRuntime().availableProcessors();
      SubReactor[] workThreadHandlers = new SubReactor[workCount];
      volatile int nextHandler = 0;
​
      public MultiWorkThreadAcceptor() {
          this.init();
      }
​
      public void init() {
          nextHandler = 0;
          for (int i = 0; i < workThreadHandlers.length; i++) {
              try {
                  workThreadHandlers[i] = new SubReactor();
              } catch (Exception e) {
              }
​
          }
      }
​
      @Override
      public void run() {
          try {
              SocketChannel c = serverSocket.accept();
              if (c != null) {// 注册读写
                  synchronized (c) {
                      // 顺序获取SubReactor,然后注册channel 
                      SubReactor work = workThreadHandlers[nextHandler];
                      work.registerChannel(c);
                      nextHandler++;
                      if (nextHandler >= workThreadHandlers.length) {
                          nextHandler = 0;
                      }
                  }
              }
          } catch (Exception e) {
          }
      }
  }
  /**
    * 多work线程处理读写业务逻辑
    */
  class SubReactor implements Runnable {
      final Selector mySelector;
​
      //多线程处理业务逻辑
      int workCount =Runtime.getRuntime().availableProcessors();
      ExecutorService executorService = Executors.newFixedThreadPool(workCount);
​
​
      public SubReactor() throws Exception {
          // 每个SubReactor 一个selector 
          this.mySelector = SelectorProvider.provider().openSelector();
      }
​
      /**
        * 注册chanel
        *
        * @param sc
        * @throws Exception
        */
      public void registerChannel(SocketChannel sc) throws Exception {
          sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
      }
​
      @Override
      public void run() {
          while (true) {
              try {
              //每个SubReactor 自己做事件分派处理读写事件
                  selector.select();
                  Set<SelectionKey> keys = selector.selectedKeys();
                  Iterator<SelectionKey> iterator = keys.iterator();
                  while (iterator.hasNext()) {
                      SelectionKey key = iterator.next();
                      iterator.remove();
                      if (key.isReadable()) {
                          read();
                      } else if (key.isWritable()) {
                          write();
                      }
                  }
​
              } catch (Exception e) {
​
              }
          }
      }
​
      private void read() {
          //任务异步处理
          executorService.submit(() -> process());
      }
​
      private void write() {
          //任务异步处理
          executorService.submit(() -> process());
      }
​
      /**
        * task 业务处理
        */
      public void process() {
          //do IO ,task,queue something
      }
  }
​

第三种模型比起第二种模型,是将Reactor分成两部分:

  1. mainReactor负责监听server socket,用来处理新连接的建立,将建立的socketChannel指定注册给subReactor。
  2. subReactor维护自己的selector, 基于mainReactor 注册的socketChannel多路分离IO读写事件,读写网 络数据,对业务处理的功能,另其扔给worker线程池来完成。

第三种模型中,我们可以看到,mainReactor 主要是用来处理网络IO 连接建立操作,通常一个线程就可以处理,而subReactor主要做和建立起来的socket做数据交互和事件业务处理操作,它的个数上一般是和CPU个数等同,每个subReactor一个县城来处理。此种模型中,每个模块的工作更加专一,耦合度更低,性能和稳定性也大量的提升,支持的可并发客户端数量可达到上百万级别。关于此种模型的应用,目前有很多优秀的矿建已经在应用了,比如mina 和netty 等。上述中去掉线程池的第三种形式的变种,也 是Netty NIO的默认模式。下一节我们将着重讲解netty的架构模式。

上一篇下一篇

猜你喜欢

热点阅读