netty accept事件处理
源码分析基于netty 4
前面已经说了netty启动过程。接着讲讲netty对accept事件的处理
前面说过,NioEventLoop的run方法正是loop的核心
run()比较复杂, 涉及wakenUp,ioRatio等内容, 抽取主要流程如下:
for (;;) {
try {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} catch (Throwable t) {
handleLoopException(t);
}
}
processSelectedKeys处理io事件
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
一般会走processSelectedKeysOptimized逻辑
NioEventLoop中的selectedKeys是netty自定义的SelectedSelectionKeySet,继承自AbstractSet,里面有两个SelectionKey数组, add时会添加key到其中一个数组, flip方法会取出当前存储的数组, 并修改存储标示使add添加到另一数组。
NioEventLoop中聚合了java.nio.channels.Selector。注意openSelector方法,通过反射, 将SelectedSelectionKeySet设置到Selector的selectedKeys属性, 这样jdk底层产生的selectedKey会add到SelectedSelectionKeySet中
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
回到processSelectedKeysOptimized方法
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 网络事件
processSelectedKey(k, (AbstractNioChannel) a);
} else {
//
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
}
}
网络事件会触发processSelectedKey(SelectionKey k, AbstractNioChannel ch)
:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
}
可以看到这里netty对各种网络事件做了处理
这里只关注OP_READ和OP_ACCEPT事件,来看对应的处理方法,主要是unsafe.read()
方法(虽然方法名为read,但netty将它作为基本逻辑处理抽象,它也可以处理其他事件,如accept),该方法是抽象方法,有两个实现类:NioByteUnsafe和AbstractNioMessageChannel
回顾前面说得的启动过程,channel注册时,NioServerSocketChannel注册了eventloop.selector的OP_READ事件, 所以当OP_READ发生时,会调用到NioMessageUnsafe.read(NioServerSocketChannel继承自AbstractNioMessageChannel)
public void read() {
...
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 触发read事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
// 触发readComplete事件
pipeline.fireChannelReadComplete();
}
这里主要是通过模板方法doReadMessages, 将readBuf交给子类处理, 所以来看看NioServerSocketChannel实现的doReadMessages方法:
SocketChannel ch = javaChannel().accept();
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
比较简单, 就是把accpet的java.nio.channels.SocketChannel封装为netty的NioSocketChannel对象
值得注意的是, NioSocketChannel的构造过程中, 会调用到父类AbstractNioByteChannel如下构造方法:
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
主要之前说过的readInterestOp属性,这里NioSocketChannel关注的事件是OP_READ
接下来,会触发pipeline事件,
这里的pipeline.fireChannelRead事件比较重要(pipeline机制后面会讲), 他会触发ServerBootstrapAcceptor.channelRead方法(前面说过了, channl注册时会把ServerBootstrapAcceptor添加到pipeline中):
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 添加用户定义的Handler到pipeline
child.pipeline().addLast(childHandler);
// 注册NioSocketChannel到childGroup
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
}
这里把NioSocketChannel注册到EventLoop中, 注册过程前面说过了。
要注意的是,这里注册的loop是childGroup,注册的事件是OP_READ。到这里,childGroup总算启动了。