Netty源码之服务端启动流程
Netty服务端的启动整体效果图:
Netty服务端的启动.png
Netty服务端的启动是在NioEventLoop、ServerBootstrap、channel和ChannelPipeline初始化之后,便会走到register(channel)注册channel方法。在register0方法内,可以把ChannelPipeline的队列结构中ChannelInitializer进行转换成ServerBootstrapAccptor对象的队列。如果ServerBootstrap方法的handler含有自定义属性值,也会添加到ChannelPipeline队列中,这个在ServerBootstrapAccptor之前。所以,在NioEventLoop轮询内,便可以接收客户端的连接。其中,在服务端启动过程中,register0方法便是分析的核心。
register
io.netty.channel.AbstractChannel.AbstractUnsafe#register
register.png
这个代码的关键就是什么时候执行register0()方法。在前一遍文章的总结中,NioEventLoop的execute,会存储在任务队列taskQueue调用。
SingleThreadEventExecutor#execute(java.lang.Runnable, boolean)
在这个方法中,首先通过inEventLoop方法判断是否是当前线程。如果不是则会startThread启动一个线程
execute.png
thread属性值,此时为null
inEventLoop.png
startThread.png
使用executor.execute执行器调用则会ThreadPerTaskExecutor#execute方法开启任务
doStartThread.png
NioEventLoop.run()
调用SingleThreadEventExecutor.this.run()方法开启NioEventLoop任务
这个方法执行就是创建的NioEventLoop执行的,使用Reactor模式中的多Reactor线程模式
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
其中,关键步骤就是:
这里包含了处理空轮询bug的问题,使用rebuildSelector0();方法重建选择器以及SelectionKey的重新注册获取
1、selectStrategy根据选择器策略执行select方法,轮询IO事件
2、processSelectedKeys处理SelectedKeys,处理网络IO事件
3、runAllTasks()运行所有任务
register0方法调用
1、执行时机
SingleThreadEventExecutor#runAllTasks(long)
runAllTasks.png
首先从任务队列taskQueue获取任务,通过safeExecute(task)执行任务,运行run方法。在这里便会获取所有eventLoop.execute添加的任务,并执行。所以,register0这里便会执行
safeExecute.png
2、register0具体执行逻辑
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
3、doRegister()
channel注册到selector上,并获取到selectionKey对象
doRegister.png
4、registered =true
5、pipeline.invokeHandlerAddedIfNeeded
invokeHandlerAddedIfNeeded.png获取pendingHandlerCallbackHead对象的值,并执行.execute()方法。pendingHandlerCallbackHead是从p.addLast添加
callHandlerAddedForAllHandlers.png
io.netty.channel.DefaultChannelPipeline.PendingHandlerAddedTask#execute
image.png
callHandlerAdded0.png
callHandlerAdded.png
执行ChannelHandler的handlerAdded方法,
6、initChannel方法调用
这里会先调用ChannelInitializer#handlerAdded
handlerAdded.png
initChannel.png
7、ServerBootstrapAcceptor添加到pipeline队列中
这里便会调用到 p.addLast(new ChannelInitializer<Channel>() {}匿名内部类的initChannel方法
init.png
如果服务端启动类设置了handler属性值,则会首先把AbstractBootstrap#handler的值添加到pipeline中
把pipeline.addLast添加ServerBootstrapAcceptor添加到NioEventLoop任务队列中
8、ServerBootstrapAcceptor参数
image.pngchildAttrs是启动类设置的Attrs参数,childOptions是启动类设置的Options参数
childHandler = ServerInitializer是通过childHandler设置的自定义childHandler
childGroup=NioEventLoopGroup
channel = NioServerSocketChannel
pipeline.fireChannelRegistered()
9、fireChannelRegistered传递调用注册Channel
fireChannelRegistered.pngimage.png
invokeChannelRegistered.png
执行ChannelInboundHandler#channelRegistered方法
io.netty.channel.DefaultChannelPipeline.HeadContext#channelRegistered
channelRegistered.png
首先执行invokeHandlerAddedIfNeeded(),然后向下传播fireChannelRegistered
10、findContextInbound
io.netty.channel.AbstractChannelHandlerContext#findContextInbound
findContextInbound.png
找到入站的所有实现类,此时pipeline链已经形成
11、firexxx向下传递方法
如果是isActive就绪状态
则会执行pipeline.fireChannelActive方法,最终会执行ChannelInboundHandler#channelActive方法,这里也会使用findContextInbound方法向下执行
fireChannelActive.png
io.netty.channel.AbstractChannelHandlerContext
fireChannelRegistered、fireChannelUnregistered、fireChannelActive、fireChannelInactive、fireExceptionCaught、fireUserEventTriggered、fireChannelRead、fireChannelReadComplete、fireChannelWritabilityChanged
MASK.png
总结:
服务端启动,便是准备好可接收客户端连接的状态。服务端启动包含NioEventLoop、ServerBootstrap、channel和ChannelPipeline等的初始化,以及ChannelPipeline队列的转换。
其中的关键就是register0调用时机及调用效果:
使用Reactor线程在NioEventLoop.run()方法内运行所有任务runAllTasks方法内执行调用。
效果是把channel注册到selector上,并执行所有实现了AbstractChannelHandlerContext的传递方法。把ServerBootstrapAcceptor添加到pipeline队列