Java 杂谈java

第十五节 netty源码分析之客户端源码分析02

2019-01-28  本文已影响0人  勃列日涅夫

handler调用

我们已经知道ChannelInitializer的添加过程其实,是会创建一个DefaultChannelHandlerContext然后添加到ChannelHandlerContext双线链表tail的前面
而我们自定义的handler就在ChannelInitializer的initChannel方法中,那么initChannel何时调用将我们的handler添加到链表中( p.addLast),下面我们开始分析
自定义 ChannelHandler 的添加过程, 发生在 AbstractUnsafe.register0 中, 在这个方法中调用了 pipeline.fireChannelRegistered()

@Override
    @SuppressWarnings("unchecked")
    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        // config使用bootstrap.handler(),就是最初ChannelInitializer,可参考上面分析
        p.addLast(config.handler());
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            //设置channel类型
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }

继续查看addLast方法

 @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
        //检查是否重复添加
            checkMultiplicity(handler);
        //创建DefaultChannelHandlerContext对象包装handler,注意构造参数中
            newCtx = newContext(group, filterName(name, handler), handler);
        //将生成的newCtx插入handlercontex链表中
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

执行完后pipeline中已包含handler ChannelInitializer

接着根据initAndRegister方法中的ChannelFuture regFuture = config().group().register(channel);//注册
根据上一节的分析:register—>SingleThreadEventLoop#register->AbstractChannel #register->AbstractChannel #register0->AbstractChannel #register0#pipeline.fireChannelRegistered();
fireChannelRegistered方法源码

 @Override
    public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }

附上代码

 static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
        // 由上面的方法此处调用head.invokeChannelRegistered() 方法
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

查看invokeChannelRegistered方法

 private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }

上面的方法handler()其实是fireChannelRegistered方法入参head,在head.fireChannelRegistered 其实是调用的 AbstractChannelHandlerContext.fireChannelRegistered
在head中channelRegistered方法中

 @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            invokeHandlerAddedIfNeeded();
            ctx.fireChannelRegistered();
        }

channelRegistered

  @Override
   public ChannelHandlerContext fireChannelRegistered() {
       invokeChannelRegistered(findContextInbound());
       return this;
   }

findContextInbound方法:

private AbstractChannelHandlerContext findContextInbound() {
  AbstractChannelHandlerContext ctx = this;
  do {
      ctx = ctx.next;
  } while (!ctx.inbound);
  return ctx;
}

从 head 开始遍历 Pipeline 的双向链表, 然后找到第一个属性 inbound 为 true 的 ChannelHandlerContext 实例. 接着掉ChannelInitializer 的channelRegistered方法

   public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
        // the handler.
        if (initChannel(ctx)) {
            // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
            // miss an event.
            ctx.pipeline().fireChannelRegistered();

            // We are done with init the Channel, removing all the state for the Channel now.
            removeState(ctx);
        } else {
            // Called initChannel(...) before which is the expected behavior, so just forward the event.
            ctx.fireChannelRegistered();
        }
    }
和之前分析一致,initChannel  它就是我们在初始化 Bootstrap 时, 调用 handler 方法传入的匿名内部类所实现的方法:
   xxx.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline p = ch.pipeline();
        p.addLast(new EchoClientHandler());
    }
});
因此当调用了这个方法后, 我们自定义的 ChannelHandler 就插入到 Pipeline 了, 接着removeState(ctx);移除ChannelInitializer这个handler。自定义handler就添加到了pipeline中
上一篇下一篇

猜你喜欢

热点阅读