第十二节 netty前传-NIO 实现reactor模式
2018-12-24 本文已影响1人
勃列日涅夫
本节所要了解java nio的reactor模式,参考来源Doug lee java并发的作者。当然作为netty的底层实现,对于nio的reactor模式的实现,对于学习netty也是尤为重要的一步。
首先先作为对比先看下经典BIO模型
图片.png- 伪代码片段
class Server implements Runnable {
public void run() {
try {
//创建socket server套接字
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
//启动额外线程处理socket客户端连接后的业务处理
new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
} catch (IOException ex) { /* ... */ }
}
//业务处理
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { /* ... */ }
}
private byte[] process(byte[] cmd) { /* ... */ }
}
}
nio不再详细介绍,可参考前面文章
主要说下reactor模式:简单来说reactor模式用于同时处理一个或多个传递给服务端的请求的事件的处理模式。 然后,服务端处理程序解析输入别的请求,并将它们同步分派给与之关联的请求异步处理程序。不恰当可类比web页面事件,当点击某个按钮时,浏览器收到这个信号(监听),分派给相关的js处理程序处理(handler)。
- 注意两点 1、同步分派 2、异步处理程序
而上面两点的分派由Dispatch承担,异步处理由handler处理。
详细可参考reactor wiki介绍
3、 reactor实现 -
借用doug的图 直观的感受下:
图片.png
关于Channels、Buffers、Selectors、SelectionKeys核心前面已介绍,下面实现会用到
基本代码讲解
- Reactor类 作为nio 响应器模式的核心部分。承载了selector选择器、ServerSocketChannel 服务端的通道。三个重要的功能
-
对ServerSocketChannel、Selector初始化。 serverSocket.register方法将服务通道注册到选择器,并绑定ACCEPT兴趣事件(初始绑定,当客户端通道连接后会先处理accpet事件)
-
初始附加一个Acceptor对象(通过SelectionKey 的attach方法)。目的是用于分发时,借助Acceptor处理相关逻辑.
-
dispatch方法的同步分发功能
- Acceptor 类的作用其一是获取客户端与服务端的连接,其二是获取连接后调用handler处理(为了简化,handler使用状态模式来模拟其他事件,所以这里一旦有客户端连接,就会通过初始设置READING = 0 读事件)
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
//注册ServerSocketChannel的兴趣事件为连接OP_ACCEPT
SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
//附加Acceptor,稍后调用attachment可以取得该对象
sk.attach(new Acceptor());
}
public void run() { //normally in a new Thread
try {
while (!Thread.interrupted()) {
//阻塞 直到有有一個通道返回
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
//循环检测是否有新事件注册
while (it.hasNext())
//同步分发
dispatch((SelectionKey)(it.next()));
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
//取得attach附加的对象处理
Runnable r = (Runnable)(k.attachment());
if (r != null)
r.run();
}
// class Reactor continued
class Acceptor implements Runnable {
// inner
public void run() {
try {
//接受到通道套接字的连接
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
}
catch(IOException ex) { /* ... */ }
}
}
}
- Handler 类 1、将自身绑定到选择器、并注册读事件( sk = socket.register( sel, 0); sk.attach(this);)2、 根据READING SENDING 状态判断事件
/**
* handler用到状态模式,根据当前读写的状态分别处理
*/
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(1024);
ByteBuffer output = ByteBuffer.allocate(1024);
static final int READING = 0 ,SENDING = 1;
int state = READING;
Handler(Selector sel , SocketChannel c) throws IOException {
socket = c;
//设置通道为非阻塞
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register( sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
//select阻塞后,可以用wakeup唤醒;执行wakeup时,如果没有阻塞的select 那么执行完wakeup后下一个执行select就会立即返回。
sel.wakeup();
}
boolean inputIsComplete() {/* 相关处理略... */ return true; }
boolean outputIsComplete() { /*相关处理略 ... */return true; }
void process(){}
public void run() {
try {
if (state == READING)
read();
else if (state ==SENDING)
send();
} catch (IOException ex) { /* ... */ }
}
void read()throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
// 处理完成后设置发送状态
state =SENDING;
//注册写事件
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send()throws IOException {
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
}