netty

Netty源码之服务端启动流程

2020-04-20  本文已影响0人  loveFXX

Netty服务端的启动整体效果图:


Netty服务端的启动.png

Netty服务端的启动是在NioEventLoop、ServerBootstrap、channel和ChannelPipeline初始化之后,便会走到register(channel)注册channel方法。在register0方法内,可以把ChannelPipeline的队列结构中ChannelInitializer进行转换成ServerBootstrapAccptor对象的队列。如果ServerBootstrap方法的handler含有自定义属性值,也会添加到ChannelPipeline队列中,这个在ServerBootstrapAccptor之前。所以,在NioEventLoop轮询内,便可以接收客户端的连接。其中,在服务端启动过程中,register0方法便是分析的核心。

register

io.netty.channel.AbstractChannel.AbstractUnsafe#register


register.png

这个代码的关键就是什么时候执行register0()方法。在前一遍文章的总结中,NioEventLoop的execute,会存储在任务队列taskQueue调用。
SingleThreadEventExecutor#execute(java.lang.Runnable, boolean)
在这个方法中,首先通过inEventLoop方法判断是否是当前线程。如果不是则会startThread启动一个线程


execute.png
thread属性值,此时为null
inEventLoop.png
startThread.png

使用executor.execute执行器调用则会ThreadPerTaskExecutor#execute方法开启任务


doStartThread.png

NioEventLoop.run()

调用SingleThreadEventExecutor.this.run()方法开启NioEventLoop任务
这个方法执行就是创建的NioEventLoop执行的,使用Reactor模式中的多Reactor线程模式

protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }

                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

其中,关键步骤就是:
这里包含了处理空轮询bug的问题,使用rebuildSelector0();方法重建选择器以及SelectionKey的重新注册获取
1、selectStrategy根据选择器策略执行select方法,轮询IO事件
2、processSelectedKeys处理SelectedKeys,处理网络IO事件
3、runAllTasks()运行所有任务

register0方法调用

1、执行时机

SingleThreadEventExecutor#runAllTasks(long)


runAllTasks.png

首先从任务队列taskQueue获取任务,通过safeExecute(task)执行任务,运行run方法。在这里便会获取所有eventLoop.execute添加的任务,并执行。所以,register0这里便会执行


safeExecute.png
2、register0具体执行逻辑
private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
3、doRegister()

channel注册到selector上,并获取到selectionKey对象


doRegister.png
4、registered =true
5、pipeline.invokeHandlerAddedIfNeeded
invokeHandlerAddedIfNeeded.png

获取pendingHandlerCallbackHead对象的值,并执行.execute()方法。pendingHandlerCallbackHead是从p.addLast添加


callHandlerAddedForAllHandlers.png

io.netty.channel.DefaultChannelPipeline.PendingHandlerAddedTask#execute


image.png
callHandlerAdded0.png
callHandlerAdded.png
执行ChannelHandler的handlerAdded方法,
6、initChannel方法调用

这里会先调用ChannelInitializer#handlerAdded


handlerAdded.png
initChannel.png
7、ServerBootstrapAcceptor添加到pipeline队列中

这里便会调用到 p.addLast(new ChannelInitializer<Channel>() {}匿名内部类的initChannel方法


init.png

如果服务端启动类设置了handler属性值,则会首先把AbstractBootstrap#handler的值添加到pipeline中
把pipeline.addLast添加ServerBootstrapAcceptor添加到NioEventLoop任务队列中

8、ServerBootstrapAcceptor参数
image.png

childAttrs是启动类设置的Attrs参数,childOptions是启动类设置的Options参数
childHandler = ServerInitializer是通过childHandler设置的自定义childHandler
childGroup=NioEventLoopGroup
channel = NioServerSocketChannel
pipeline.fireChannelRegistered()

9、fireChannelRegistered传递调用注册Channel
fireChannelRegistered.png
image.png
invokeChannelRegistered.png

执行ChannelInboundHandler#channelRegistered方法
io.netty.channel.DefaultChannelPipeline.HeadContext#channelRegistered


channelRegistered.png
首先执行invokeHandlerAddedIfNeeded(),然后向下传播fireChannelRegistered
10、findContextInbound

io.netty.channel.AbstractChannelHandlerContext#findContextInbound


findContextInbound.png

找到入站的所有实现类,此时pipeline链已经形成

11、firexxx向下传递方法

如果是isActive就绪状态
则会执行pipeline.fireChannelActive方法,最终会执行ChannelInboundHandler#channelActive方法,这里也会使用findContextInbound方法向下执行


fireChannelActive.png

io.netty.channel.AbstractChannelHandlerContext
fireChannelRegistered、fireChannelUnregistered、fireChannelActive、fireChannelInactive、fireExceptionCaught、fireUserEventTriggered、fireChannelRead、fireChannelReadComplete、fireChannelWritabilityChanged


MASK.png

总结:

服务端启动,便是准备好可接收客户端连接的状态。服务端启动包含NioEventLoop、ServerBootstrap、channel和ChannelPipeline等的初始化,以及ChannelPipeline队列的转换。
其中的关键就是register0调用时机及调用效果:
使用Reactor线程在NioEventLoop.run()方法内运行所有任务runAllTasks方法内执行调用。
效果是把channel注册到selector上,并执行所有实现了AbstractChannelHandlerContext的传递方法。把ServerBootstrapAcceptor添加到pipeline队列

上一篇下一篇

猜你喜欢

热点阅读