Java 杂谈javaJava设计模式

第十二节 netty前传-NIO 实现reactor模式

2018-12-24  本文已影响1人  勃列日涅夫

本节所要了解java nio的reactor模式,参考来源Doug lee java并发的作者。当然作为netty的底层实现,对于nio的reactor模式的实现,对于学习netty也是尤为重要的一步。

首先先作为对比先看下经典BIO模型

图片.png
class Server implements Runnable {
  public void run() {
    try {
//创建socket server套接字
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
//启动额外线程处理socket客户端连接后的业务处理
        new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
    } catch (IOException ex) { /* ... */ }
  }
//业务处理
  static class Handler implements Runnable {
    final Socket socket;
    Handler(Socket s) { socket = s; }
    public void run() {
      try {
byte[] input = new byte[MAX_INPUT];
        socket.getInputStream().read(input);
        byte[] output = process(input);
        socket.getOutputStream().write(output);
      } catch (IOException ex) { /* ... */ }
    }
    private byte[] process(byte[] cmd) { /* ... */ }
  }
}

nio不再详细介绍,可参考前面文章

主要说下reactor模式:简单来说reactor模式用于同时处理一个或多个传递给服务端的请求的事件的处理模式。 然后,服务端处理程序解析输入别的请求,并将它们同步分派给与之关联的请求异步处理程序。不恰当可类比web页面事件,当点击某个按钮时,浏览器收到这个信号(监听),分派给相关的js处理程序处理(handler)。

关于Channels、Buffers、Selectors、SelectionKeys核心前面已介绍,下面实现会用到

基本代码讲解

  1. 对ServerSocketChannel、Selector初始化。 serverSocket.register方法将服务通道注册到选择器,并绑定ACCEPT兴趣事件(初始绑定,当客户端通道连接后会先处理accpet事件)

  2. 初始附加一个Acceptor对象(通过SelectionKey 的attach方法)。目的是用于分发时,借助Acceptor处理相关逻辑.

  3. dispatch方法的同步分发功能

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        //注册ServerSocketChannel的兴趣事件为连接OP_ACCEPT
        SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
        //附加Acceptor,稍后调用attachment可以取得该对象
        sk.attach(new Acceptor());
    }
    public void run() {  //normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                //阻塞 直到有有一個通道返回
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                //循环检测是否有新事件注册
                while (it.hasNext())
                    //同步分发
                    dispatch((SelectionKey)(it.next()));
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }
    void dispatch(SelectionKey k) {
        //取得attach附加的对象处理
        Runnable r = (Runnable)(k.attachment());
        if (r != null)
            r.run();
    }

    // class Reactor continued
class Acceptor implements Runnable {
        // inner
        public void run() {
            try {
          //接受到通道套接字的连接
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    new Handler(selector, c);
            }
            catch(IOException ex) { /* ... */ }
        }
    }
}
/**
 * handler用到状态模式,根据当前读写的状态分别处理
 */
final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(1024);
    ByteBuffer output = ByteBuffer.allocate(1024);
    static final int READING  = 0 ,SENDING = 1;
    int state = READING;
    Handler(Selector sel , SocketChannel c) throws IOException {
        socket = c;
        //设置通道为非阻塞
        c.configureBlocking(false);
// Optionally try first read now
        sk = socket.register( sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        //select阻塞后,可以用wakeup唤醒;执行wakeup时,如果没有阻塞的select  那么执行完wakeup后下一个执行select就会立即返回。
        sel.wakeup();
    }
    boolean inputIsComplete()  {/* 相关处理略... */ return true; }
    boolean outputIsComplete() { /*相关处理略 ... */return true; }
    void process(){}
    public void run() {
        try {
            if (state == READING)
                read();
            else if (state ==SENDING)
                send();
        } catch (IOException ex) { /* ... */ }
    }
    void read()throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            // 处理完成后设置发送状态
            state =SENDING;
            //注册写事件
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }
    void send()throws IOException {
        socket.write(output);
        if (outputIsComplete())
            sk.cancel();
    }
}

参考:
github java设计模式
Doug Lea - java 并发包的作者讲解NIO

上一篇 下一篇

猜你喜欢

热点阅读