Netty框架源码分析

5.Netty框架-网络IO编程模板(NIO实现Reactor线

2020-08-07  本文已影响0人  还算年轻

一、单线程Reactor线程模型

1.单线程Reactor线程模型:新连接的接入、数据的读写都是用一个线程:
   public class SingleReactor implements Runnable{

   //单线程reactor 模型:1.一个线程绑定一个selector 和一个serverSocketChannel 
   ServerSocketChannel serverSocketChannel;
   
   Selector selector;
   
   //OPEN&注册accepet
   public void open() throws IOException{
       selector = Selector.open();
       serverSocketChannel = ServerSocketChannel.open();
       //设置非阻塞
       serverSocketChannel.configureBlocking(false);
       //绑定ip&端口
       serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8080));
       //将channel注册到selector并拿到选择键(channel注册的标识)
       SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
       register.attach(new AcceptHandle());
   }

   //轮训注册的事件&分发
   @Override
   public void run() {
       try {
           open();
       } catch (IOException e1) {
            e1.printStackTrace();
       }
       try {
           //循环查询感兴趣的时间
           while (!Thread.interrupted()) {
               try {
                   //1.阻塞去查
                   selector.select();
                   //2.拿到查询结果(注册的标识)
                   Set<SelectionKey> keys = selector.selectedKeys();
                   //3.迭代感兴趣的key
                   Iterator<SelectionKey> iterator = keys.iterator();
                   while (iterator.hasNext()) {
                       SelectionKey next = iterator.next();
                       dispatch(next);
                   }
                   keys.clear();
               } catch (IOException e) {
                   e.printStackTrace();
               }
           }
       } catch (Exception e) {
           e.printStackTrace();
       }
   }   
   
   //分发&拿到绑定的处理器
   void dispatch(SelectionKey next){
       //1.拿出SelectionKey
       Runnable attachment = (Runnable) next.attachment();
       //2.调用handle处理器
       if (attachment != null) {
           attachment.run();
       }   
   }
   
   
   //接受连接处理器
   //连接处理器&为新连接创造一个输入输出的Handle处理器
   class AcceptHandle implements Runnable{
       @Override
       public void run() {
           try {
               //1.接受新连接&调用下一个处理器注册读取事件
               SocketChannel accept = serverSocketChannel.accept();
               new IOEchoHandler(selector,accept);
           } catch (IOException e) {
               e.printStackTrace();
           }
       }   
   }
   
   //处理具体的IO时间处理器
   class IOEchoHandler implements Runnable{

       SocketChannel channel;//读写channel
       SelectionKey register;//channel注册结果返回的标识
       
       static final int RECIEVING = 0, SENDING = 1;
       int state = RECIEVING;
       
       //读写数据的缓存区
       final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
          
       IOEchoHandler(Selector selector, SocketChannel c) throws IOException {
           channel = c;
           c.configureBlocking(false);//设置阻塞
           //仅仅取得选择键,后设置感兴趣的IO事件
           register = channel.register(selector, 0);
           //将Handler作为选择键的附件
           register.attach(this);
           //第二步,注册Read就绪事件
           register.interestOps(SelectionKey.OP_READ);
           selector.wakeup();
       }
        
       
       @Override
       public void run() {
           try {
               if (state == SENDING) {
                   //写入通道
                   channel.write(byteBuffer);
                   //写完后,准备开始从通道读,byteBuffer切换成写模式
                   byteBuffer.clear();
                   //写完后,注册read就绪事件
                   register.interestOps(SelectionKey.OP_READ);
                   //写完后,进入接收的状态
                   state = RECIEVING;
               } else if (state == RECIEVING) {//一开始是接受情况
                   //从通道读到byteBuffer
                   int length = 0;
                   while ((length = channel.read(byteBuffer)) > 0) {
                       System.out.println(new String(byteBuffer.array(), 0, length));
                   }
                   //读完后,准备开始写入通道,byteBuffer切换成读模式
                   byteBuffer.flip();
                   //读完后,注册write就绪事件
                   register.interestOps(SelectionKey.OP_WRITE);
                   //读完后,进入发送的状态
                   state = SENDING;
               }
               //处理结束了, 这里不能关闭select key,需要重复使用
               //sk.cancel();
           } catch (IOException ex) {
               ex.printStackTrace();
           }
       }
   }
   
   
   public static void main(String[] args) throws IOException {
       new Thread(new SingleReactor()).start();
   }
}

二、多线程Reactor线程模型

1.多线程Reactor线程模型:一个线程用于新连接的接入、数据的读写用一个线程池:
class MultiThreadEchoHandler implements Runnable {
    final SocketChannel channel;
    final SelectionKey sk;
    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    static final int RECIEVING = 0, SENDING = 1;
    int state = RECIEVING;
    //引入线程池
    static ExecutorService pool = Executors.newFixedThreadPool(4);

    MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        c.configureBlocking(false);
        //仅仅取得选择键,后设置感兴趣的IO事件
        sk = channel.register(selector, 0);
        //将本Handler作为sk选择键的附件,方便事件dispatch
        sk.attach(this);
        //向sk选择键注册Read就绪事件
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    //线程处理handle
    public void run() {
        //异步任务,在独立的线程池中执行
        pool.execute(new AsyncTask());
    }

    //异步任务,不在Reactor线程中执行
    public synchronized void asyncRun() {
        try {
            if (state == SENDING) {
                //写入通道
                channel.write(byteBuffer);
                //写完后,准备开始从通道读,byteBuffer切换成写模式
                byteBuffer.clear();
                //写完后,注册read就绪事件
                sk.interestOps(SelectionKey.OP_READ);
                //写完后,进入接收的状态
                state = RECIEVING;
            } else if (state == RECIEVING) {
                //从通道读
                int length = 0;
                while ((length = channel.read(byteBuffer)) > 0) {
                   System.out.println(new String(byteBuffer.array(), 0, length));
                }
                //读完后,准备开始写入通道,byteBuffer切换成读模式
                byteBuffer.flip();
                //读完后,注册write就绪事件
                sk.interestOps(SelectionKey.OP_WRITE);
                //读完后,进入发送的状态
                state = SENDING;
            }
            //处理结束了, 这里不能关闭select key,需要重复使用
            //sk.cancel();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    //异步任务的内部类
    class AsyncTask implements Runnable {
        public void run() {
            MultiThreadEchoHandler.this.asyncRun();
        }
    }

}

三、主从线程Reactor线程模型

1.主从线程Reactor线程模型:一个连接池用于新连接的接入、数据的读写用另一个线程池:
class MultiThreadEchoServerReactor {
    
    
    ServerSocketChannel serverSocket;
    AtomicInteger next = new AtomicInteger(0);
    //selectors集合,引入多个selector选择器
    Selector[] selectors = new Selector[2];
    //引入多个子反应器
    SubReactor[] subReactors = null;


    MultiThreadEchoServerReactor() throws IOException {
        //初始化多个selector选择器
        selectors[0] = Selector.open();
        selectors[1] = Selector.open();
        serverSocket = ServerSocketChannel.open();

        InetSocketAddress address =
                new InetSocketAddress("127.0.0.1",8080);
        serverSocket.socket().bind(address);
        //非阻塞
        serverSocket.configureBlocking(false);

        //第一个selector,负责监控新连接事件
        SelectionKey sk = serverSocket.register(selectors[0], SelectionKey.OP_ACCEPT);
        //附加新连接处理handler处理器到SelectionKey(选择键)
        sk.attach(new AcceptorHandler());

        //构建两个反应器
        //第一个子反应器,一子反应器负责一个选择器
        SubReactor subReactor1 = new SubReactor(selectors[0]);
        //第二个子反应器,一子反应器负责一个选择器
        SubReactor subReactor2 = new SubReactor(selectors[1]);
        subReactors = new SubReactor[]{subReactor1, subReactor2};
    }

    //开启两个反应器线程
    private void startService() {
        // 一子反应器对应一条线程
        new Thread(subReactors[0]).start();
        new Thread(subReactors[1]).start();
    }

    //反应器&分发
    class SubReactor implements Runnable {
        //每条线程负责一个选择器的查询
        final Selector selector;

        public SubReactor(Selector selector) {
            this.selector = selector;
        }
   
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    selector.select();
                    Set<SelectionKey> keySet = selector.selectedKeys();
                    Iterator<SelectionKey> it = keySet.iterator();
                    while (it.hasNext()) {
                        //Reactor负责dispatch收到的事件
                        SelectionKey sk = it.next();
                        dispatch(sk);
                    }
                    keySet.clear();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }


        void dispatch(SelectionKey sk) {
            Runnable handler = (Runnable) sk.attachment();
            //调用之前attach绑定到选择键的handler处理器对象
            if (handler != null) {
                handler.run();
            }
        }
    }


    // Handler:新连接处理器
    class AcceptorHandler implements Runnable {
        public void run() {
            try {
                SocketChannel channel = serverSocket.accept();
                if (channel != null)
                    new MultiThreadEchoHandler(selectors[next.get()], channel);
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (next.incrementAndGet() == selectors.length) {
                next.set(0);
            }
        }
    }


    public static void main(String[] args) throws IOException {
        MultiThreadEchoServerReactor server =
                new MultiThreadEchoServerReactor();
        server.startService();
    }

}
上一篇 下一篇

猜你喜欢

热点阅读