IT@程序员猿媛

netty源码分析(16)- 新连接NioSocketChann

2019-02-26  本文已影响9人  Jorgezhong

上一节最后提到新连接接入过程当中,最后一步是注册Selector,其注册过程和之前的章节《注册selector》类似,但是读事件,却又有区别。

入口:ServerBootstrap.ServerBootstrapAcceptor#channelRead()

 childGroup.register(child)
                        .addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });

实际上调用了AbstractChannel.AbstractUnsafe#register0(),触发了通道激活事件

              if (isActive()) {
                    if (firstRegistration) {
                        //触发通道激活事件,调用HeadContent的
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }

改方法从pipeline的头部开始,即DefaultChannelPipeline.HeadContext#channelActive()从而触发了readIfIsAutoRead()

    @Override
    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            ctx.fireChannelActive();

            readIfIsAutoRead();
        }

到此。读事件将从尾部的TailContent#read()被触发,从而依次执行ctx.read(),从尾部开始,每个outboundHandlerread()事件都被触发。直到头部。

    @Override
    public final ChannelPipeline read() {
        tail.read();
        return this;
    }


    @Override
    public ChannelHandlerContext read() {
        //获取最近的outboundhandler
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();

        //并依次执行其read方法
        if (executor.inEventLoop()) {
            next.invokeRead();
        } else {
            Tasks tasks = next.invokeTasks;
            if (tasks == null) {
                next.invokeTasks = tasks = new Tasks(next);
            }
            executor.execute(tasks.invokeReadTask);
        }

        return this;
    }

进入头部HeadContext#read(),并且最终更改了selectionKey,向selector注册了读事件

        @Override
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }
        @Override
        public final void beginRead() {
            assertEventLoop();

            if (!isActive()) {
                return;
            }

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }
    //io.netty.channel.nio.AbstractNioMessageChannel#doBeginRead
    @Override
    protected void doBeginRead() throws Exception {
        if (inputShutdown) {
            return;
        }
        super.doBeginRead();
    }

    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
上一篇下一篇

猜你喜欢

热点阅读