NIO的Reactor模型
《同步非阻塞IO》里面已经用“餐馆服务员”的例子解释了NIO的同步非阻塞模型,如果讲的不够明白。。。
那我也没什么办法啊,毕竟我也没给别人讲过,不知道痛点在哪里
这次会通过一些demo来解释一下Reactor模型,内容的话,基本照搬Doug Lea的Scalable IO in Java
个人觉得这个Reactor模型的发展史,基本上是一个餐馆压榨劳动力的发展史,所以这次还是会基于餐馆模型来讲
传统IO
public static void main(String[] args) throws IOException{
ServerSocket serverSocket = new ServerSocket(9999);
while (true){
// 给一桌客人服务完之后站在门口傻等着
Socket socket = serverSocket.accept();
// 在桌子前傻等着客人点菜
InputStream in = socket.getInputStream();
byte[] b = new byte[in.available()];
int index = in.read(b);
StringBuffer sb = new StringBuffer();
while (index != -1) {
sb.append(new String(b));
}
OutputStream out = socket.getOutputStream();
out.write("server speaking".getBytes());
socket.close();
}
}
即使有其它客人需要服务,服务员也不会去,而是在傻等着。那如果我是老板,让自己的员工的工作这么不饱和的,于是就有了NIO...
NIO
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
// 最开始店里没人,肯定只关心有新客人来的情况
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true){
// 这时候没啥事,在这等人叫
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
it.remove();
SelectionKey key = it.next();
if (key.isAcceptable()) {
// accept
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
// read
String msg = readFromChannel((SocketChannel) key.channel());
System.out.println(msg);
}
}
}
}
这种情况就好多了,服务员永远处于忙碌的状态,只有当没有任何客人需要被服务的时候才能闲下来。
现在服务员的工作已经很饱和了,但是作为老板我会想,服务员做的事情有多又杂,招个人得要啥都会,是不是不太好招人啊,于是我打算给服务员分配一下工作内容,比如站在门口接客的那个叫接待,在里面点餐擦桌子端菜送水的叫跑堂...
单线程Reactor模型
- 反应器Reactor(我是老板。。。)
public class Reactor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws IOException {
// 创建选择器
selector = Selector.open();
// 开启通道
serverSocketChannel = ServerSocketChannel.open();
// 绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
// 非阻塞模式
serverSocketChannel.configureBlocking(false);
}
public static void main(String[] args){
try {
Reactor reactor = new Reactor(9999);
reactor.run();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
// 告诉服务员你现在叫“接待”,有客人来了就把他们带接进来
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
it.remove();
key = it.next();
dispatch(key);
}
keys.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private static void dispatch(SelectionKey k) throws IOException {
if (!k.isValid()){
return;
}
if (k.isAcceptable()) {
// 是外面来客人了,所以我现在是"接待"
Acceptor acceptor = (Acceptor) k.attachment();
acceptor.handler();
}
if (k.isReadable()) {
// 客人要点菜了,所以现在我是"点餐员"
ReadEventHandler handler = (ReadEventHandler) k.attachment();
handler.handler();
}
}
- Acceptor(接待员)
public class Acceptor extends NioEventHandler {
public Acceptor(Selector selector, SelectionKey key) {
super(selector, key);
}
@Override
public void handler() throws IOException {
// 把客人接到座位上
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// 然后告诉客人,如果要点菜,就呼叫"点餐员"
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
key.attach(new ReadEventHandler(selector, key));
}
}
- Handler(点餐员、传菜员什么的。。)
public class ReadEventHandler extends NioEventHandler {
public ReadEventHandler(Selector selector, SelectionKey key) {
super(selector, key);
}
@Override
public void handler() throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 点菜
String msg = readFromChannel(socketChannel);
// 你也可以在这里招呼厨房开始做菜
// TODO mainServer
// 然后告诉客人菜做好了就会上
// socketChannel.register(selector, SelectionKey.OP_WRITE);
}
什嘛?少了个NioEventHandler类?别急,后面会有的
现在分工已经很明确了,我们店里虽然只有一个服务员,但是接待员、点菜员、传菜员这些,只要你想要的,都可以有!是不是很有B格
最关键的是,我可以按不同的角色去招人,这样就很方便,比如接待员得是漂亮小姐姐,而传菜员走路要快而稳...
Reactor多线程模型
- 上面缺的NioEventHandler
@Slf4j
public abstract class NioEventHandler implements Runnable {
protected final Selector selector;
protected final SelectionKey selectionKey;
public NioEventHandler(Selector selector, SelectionKey key){
this.selector = selector;
this.selectionKey = key;
}
protected abstract void handler() throws IOException;
@Override
public void run() {
try {
handler();
} catch (IOException e) {
log.error("handler field with io Exception ", e);
}
}
}
- 稍微改一下Reactor的dispatch
private static void dispatch(SelectionKey k) throws IOException {
if (!k.isValid()){
return;
}
if (k.isAcceptable()) {
// 是外面来客人了,所以我现在是"接待"
Acceptor acceptor = (Acceptor) k.attachment();
acceptor.handler();
}
if (k.isReadable()) {
// 客人要点菜了,所以叫一个点餐员来服务
ReadEventHandler handler = (ReadEventHandler) k.attachment();
new Thread(handler).start();
}
}
店里的客人变多了,一个服务员有点忙不过来了,于是我招了一个漂亮的小姐姐专门做接待,之前的服务员继续让他干苦力,然后再招几个苦力,完美!
Reactor主从多线程模型
对Reactor的启动方法做点小小的改动
public static void main(String[] args){
try {
// 启动3个reactor,分别监听9999,10000,10001端口
for (int i = 0; i < 3; i ++) {
Reactor reactor = new Reactor(9999 + i);
new Thread(reactor).start();
}
} catch (Exception e) {
e.printStackTrace();
}
}
我一直尝试把这个模型和餐馆模型匹配起来。我觉得应该可以这么看。因为有我这么机智的老板,店里客人越来越多,扩招服务员已经不能满足顾客需求了,于是我选择开分店...其实这不是一个好的比喻,但是理解起来会容易一些
关于主从多线程模型,在Doug Lea的Scalable IO in Java中,是通过增加Selector线程池来实现的,这是个很优雅的方式。我为什么直接多线程启动了几个Acceptor呢。因为这样更贴近原文中“Multiple Reactor Threads”或者说“主从多线程模型”这个标题(嗯,其实主要是因为代码改起来简单)
我在Reactor模型中开到的吸引
NIO提供了多路复用的可能,让线程从IO等待的牢笼中解脱出来,Reactor模式“don‘t call us, we‘ll call you”的思想毫无疑问是相较于Socket编程的一次巨大的变革。但是Reactor模型的演进所最吸引我的是将一次完整的IO链路拆分成了不同的事件,拆分到更细的粒度总是更方便分治,就好像服务员的角色被细分成不同的工种之后,就更容易招到更专业的服务员。
Reactor模型其实更优雅
- 相比于我上面那一堆直接new一个Thread的操作,使用线程池明显更加优雅,也更加方便进行资源管理(只是我一个萌新并不会写线程池)
- 服务端来讲,除了accept和read事件,当然还有write事件,还有核心业务处理逻辑,这些其实都可以抽象成Handler来进行处理