一文聊透 Netty IO 事件的编排利器 pipeline |

2022-05-15  本文已影响0人  bin的技术小屋

我们接着上篇文章《一文聊透 Netty IO 事件的编排利器 pipeline (上)》 的内容继续~~

4. 向pipeline添加channelHandler

在我们详细介绍了全部的 inbound 类事件和 outbound 类事件的掩码表示以及事件的触发和传播路径后,相信大家现在可以通过 ChannelInboundHandler 和 ChannelOutboundHandler 来根据具体的业务场景选择合适的 ChannelHandler 类型以及监听合适的事件来完成业务需求了。

本小节就该介绍一下自定义的 ChannelHandler 是如何添加到 pipeline 中的,netty 在这个过程中帮我们作了哪些工作?

           final EchoServerHandler serverHandler = new EchoServerHandler();

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)

             .............

             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();          
                     p.addLast(serverHandler);

                     ......可添加多个channelHandler......
                 }
             });

以上是笔者简化的一个 netty 服务端配置 ServerBootstrap 启动类的一段示例代码。我们可以看到再向 channel 对应的 pipeline 中添加 ChannelHandler 是通过 ChannelPipeline#addLast 方法将指定 ChannelHandler 添加到 pipeline 的末尾处。

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    //向pipeline的末尾处批量添加多个channelHandler
    ChannelPipeline addLast(ChannelHandler... handlers);

    //指定channelHandler的executor,由指定的executor执行channelHandler中的回调方法
    ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

     //为channelHandler指定名称
    ChannelPipeline addLast(String name, ChannelHandler handler);

    //为channelHandler指定executor和name
    ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
}
public class DefaultChannelPipeline implements ChannelPipeline {

    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        ObjectUtil.checkNotNull(handlers, "handlers");

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }

    @Override
    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }
}

最终 addLast 的这些重载方法都会调用到 DefaultChannelPipeline#addLast(EventExecutorGroup, String, ChannelHandler) 这个方法从而完成 ChannelHandler 的添加。

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //检查同一个channelHandler实例是否允许被重复添加
            checkMultiplicity(handler);

            //创建channelHandlerContext包裹channelHandler并封装执行传播事件相关的上下文信息
            newCtx = newContext(group, filterName(name, handler), handler);

            //将channelHandelrContext插入到pipeline中的末尾处。双向链表操作
            //此时channelHandler的状态还是ADD_PENDING,只有当channelHandler的handlerAdded方法被回调后,状态才会为ADD_COMPLETE
            addLast0(newCtx);

            //如果当前channel还没有向reactor注册,则将handlerAdded方法的回调添加进pipeline的任务队列中
            if (!registered) {
                //这里主要是用来处理ChannelInitializer的情况
                //设置channelHandler的状态为ADD_PENDING 即等待添加,当状态变为ADD_COMPLETE时 channelHandler中的handlerAdded会被回调
                newCtx.setAddPending();
                //向pipeline中添加PendingHandlerAddedTask任务,在任务中回调handlerAdded
                //当channel注册到reactor后,pipeline中的pendingHandlerCallbackHead任务链表会被挨个执行
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            //如果当前channel已经向reactor注册成功,那么就直接回调channelHandler中的handlerAddded方法
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                //这里需要确保channelHandler中handlerAdded方法的回调是在channel指定的executor中
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        //回调channelHandler中的handlerAddded方法
        callHandlerAdded0(newCtx);
        return this;
    }

这个方法的逻辑还是比较复杂的,涉及到很多细节,为了清晰地为大家讲述,笔者这里还是采用总分总的结构,先描述该方法的总体逻辑,然后在针对核心细节要点展开细节分析。

因为向 pipeline 中添加 channelHandler 的操作可能会在多个线程中进行,所以为了确保添加操作的线程安全性,这里采用一个 synchronized 语句块将整个添加逻辑包裹起来。

  1. 通过 checkMultiplicity 检查被添加的 ChannelHandler 是否是共享的(标注 @Sharable 注解),如果不是共享的那么则不会允许该 ChannelHandler 的同一实例被添加进多个 pipeline 中。如果是共享的,则允许该 ChannelHandler 的同一个实例被多次添加进多个 pipeline 中。
    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            //只有标注@Sharable注解的channelHandler,才被允许同一个实例被添加进多个pipeline中
            //注意:标注@Sharable之后,一个channelHandler的实例可以被添加到多个channel对应的pipeline中
            //可能被多线程执行,需要确保线程安全
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }

这里大家需要注意的是,如果一个 ChannelHandler 被标注了 @Sharable 注解,这就意味着它的一个实例可以被多次添加进多个 pipeline 中(每个 channel 对应一个 pipeline 实例),而这多个不同的 pipeline 可能会被不同的 reactor 线程执行,所以在使用共享 ChannelHandler 的时候需要确保其线程安全性。

比如下面的实例代码:

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
            .............需要确保线程安全.......
}
  final EchoServerHandler serverHandler = new EchoServerHandler();

  ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
               ..................
            .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(serverHandler);
                 }
             });

EchoServerHandler 为我们自定义的 ChannelHandler ,它被 @Sharable 注解标注,全局只有一个实例,被添加进多个 Channel 的 pipeline 中。从而会被多个 reactor 线程执行到。

共享channelHandler.png
  1. 为 ChannelHandler 创建其 ChannelHandlerContext ,用于封装 ChannelHandler 的名称,状态信息,执行上下文信息,以及用于感知 ChannelHandler 在 pipeline 中的位置信息。newContext 方法涉及的细节较多,后面我们单独介绍。

  2. 通过 addLast0 将新创建出来的 ChannelHandlerContext 插入到 pipeline 中末尾处。方法的逻辑很简单其实就是一个普通的双向链表插入操作。

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

但是这里大家需要注意的点是:虽然此时 ChannelHandlerContext 被物理的插入到了 pipeline 中,但是此时 channelHandler 的状态依然为 INIT 状态,从逻辑上来说并未算是真正的插入到 pipeline 中,需要等到 ChannelHandler 的 handlerAdded 方法被回调时,状态才变为 ADD_COMPLETE ,而只有 ADD_COMPLETE 状态的 ChannelHandler 才能响应 pipeline 中传播的事件。

channelhandelr的状态.png

在上篇文章《一文搞懂Netty发送数据全流程》中的《3.1.5 触发nextChannelHandler的write方法回调》小节中我们也提过,在每次 write 事件或者 flush 事件传播的时候,都需要通过 invokeHandler 方法来判断 channelHandler 的状态是否为 ADD_COMPLETE ,否则当前 channelHandler 则不能响应正在 pipeline 中传播的事件。必须要等到对应的 handlerAdded 方法被回调才可以,因为 handlerAdded 方法中可能包含一些 ChannelHandler 初始化的重要逻辑。

    private boolean invokeHandler() {
        // 这里是一个优化点,netty 用一个局部变量保存 handlerState
        // 目的是减少 volatile 变量 handlerState 的读取次数
        int handlerState = this.handlerState;
        return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
    }

    void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            // 当前channelHandler虽然添加到pipeline中,但是并没有调用handlerAdded
            // 所以不能调用当前channelHandler中的回调方法,只能继续向前传递write事件
            write(msg, promise);
        }
    }

    private void invokeFlush() {
        if (invokeHandler()) {
            invokeFlush0();
        } else {
            //如果该ChannelHandler虽然加入到pipeline中但handlerAdded方法并未被回调,则继续向前传递flush事件
            flush();
        }
    }

事实上不仅仅是 write 事件和 flush 事件在传播的时候需要判断 ChannelHandler 的状态,所有的 inbound 类事件和 outbound 类事件在传播的时候都需要通过 invokeHandler 方法来判断当前 ChannelHandler 的状态是否为 ADD_COMPLETE ,需要确保在 ChannelHandler 响应事件之前,它的 handlerAdded 方法被回调。

  1. 如果向 pipeline 中添加 ChannelHandler 的时候, channel 还没来得及注册到 reactor中,那么需要将当前 ChannelHandler 的状态先设置为 ADD_PENDING ,并将回调该 ChannelHandler 的 handlerAdded 方法封装成 PendingHandlerAddedTask 任务添加进 pipeline 中的任务列表中,等到 channel 向 reactor 注册之后,reactor 线程会挨个执行 pipeline 中任务列表中的任务。

这段逻辑主要用来处理 ChannelInitializer 的添加场景,因为目前只有 ChannelInitializer 这个特殊的 channelHandler 会在 channel 没有注册之前被添加进 pipeline 中

         if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
         }

向 pipeline 的任务列表 pendingHandlerCallbackHead 中添加 PendingHandlerAddedTask 任务:

public class DefaultChannelPipeline implements ChannelPipeline {

    // pipeline中的任务列表
    private PendingHandlerCallback pendingHandlerCallbackHead;

    // 向任务列表尾部添加PendingHandlerAddedTask
    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;

        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
            pendingHandlerCallbackHead = task;
        } else {
            // Find the tail of the linked-list.
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }
}

PendingHandlerAddedTask 任务负责回调 ChannelHandler 中的 handlerAdded 方法。

private final class PendingHandlerAddedTask extends PendingHandlerCallback {
        ...............

        @Override
        public void run() {
            callHandlerAdded0(ctx);
        }

       ...............
}

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
       try {
            ctx.callHandlerAdded();
        } catch (Throwable t) {
           ...............
        }
    }
pipeline任务列表.png
  1. 除了 ChannelInitializer 这个特殊的 ChannelHandler 的添加是在 channel 向 reactor 注册之前外,剩下的这些用户自定义的 ChannelHandler 的添加,均是在 channel 向 reactor 注册之后被添加进 pipeline 的。这种场景下的处理就会变得比较简单,在 ChannelHandler 被插入到 pipeline 中之后,就会立即回调该 ChannelHandler 的 handlerAdded 方法。但是需要确保 handlerAdded 方法的回调在 channel 指定的 executor 中进行。
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }  
      
            callHandlerAdded0(newCtx);

如果当前执行线程并不是 ChannelHandler 指定的 executor ( !executor.inEventLoop() ),那么就需要确保 handlerAdded 方法的回调在 channel 指定的 executor 中进行。

    private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
        newCtx.setAddPending();
        executor.execute(new Runnable() {
            @Override
            public void run() {
                callHandlerAdded0(newCtx);
            }
        });
    }

这里需要注意的是需要在回调 handlerAdded 方法之前将 ChannelHandler 的状态提前设置为 ADD_COMPLETE 。 因为用户可能在 ChannelHandler 中的 handerAdded 回调中触发一些事件,而如果此时 ChannelHandler 的状态不是 ADD_COMPLETE 的话,就会停止对事件的响应,从而错过事件的处理。

这种属于一种用户极端的使用情况。

    final void callHandlerAdded() throws Exception {
        if (setAddComplete()) {
            handler().handlerAdded(this);
        }
    }

5. ChanneHandlerContext 的创建

在介绍完 ChannelHandler 向 pipeline 添加的整个逻辑过程后,本小节我们来看下如何为 ChannelHandler 创建对应的 ChannelHandlerContext ,以及 ChannelHandlerContext 中具体包含了哪些上下文信息。

public class DefaultChannelPipeline implements ChannelPipeline {
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
             
            ................

            //创建channelHandlerContext包裹channelHandler并封装执行传播相关的上下文信息
            newCtx = newContext(group, filterName(name, handler), handler);

             ................
        }

    }

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

}

在创建 ChannelHandlerContext 之前,需要做两个重要的前置操作:

5.1 filterName

    private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
            // 如果没有指定name,则会为handler默认生成一个name,该方法可确保默认生成的name在pipeline中不会重复
            return generateName(handler);
        }

        // 如果指定了name,需要确保name在pipeline中是唯一的
        checkDuplicateName(name);
        return name;
    }

如果用户再向 pipeline 添加 ChannelHandler 的时候,为其指定了具体的名称,那么这里需要确保用户指定的名称在 pipeline 中是唯一的。

    private void checkDuplicateName(String name) {
        if (context0(name) != null) {
            throw new IllegalArgumentException("Duplicate handler name: " + name);
        }
    }

    /**
     * 通过指定名称在pipeline中查找对应的channelHandler 没有返回null
     * */
    private AbstractChannelHandlerContext context0(String name) {
        AbstractChannelHandlerContext context = head.next;
        while (context != tail) {
            if (context.name().equals(name)) {
                return context;
            }
            context = context.next;
        }
        return null;
    }

如果用户没有为 ChannelHandler 指定名称,那么就需要为 ChannelHandler 在 pipeline 中默认生成一个唯一的名称。

    // pipeline中channelHandler对应的name缓存
    private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
            new FastThreadLocal<Map<Class<?>, String>>() {
        @Override
        protected Map<Class<?>, String> initialValue() {
            return new WeakHashMap<Class<?>, String>();
        }
    };

    private String generateName(ChannelHandler handler) {
        // 获取pipeline中channelHandler对应的name缓存
        Map<Class<?>, String> cache = nameCaches.get();
        Class<?> handlerType = handler.getClass();
        String name = cache.get(handlerType);
        if (name == null) {
            // 当前handler还没对应的name缓存,则默认生成:simpleClassName + #0
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }

        if (context0(name) != null) {
            // 不断重试名称后缀#n + 1 直到没有重复
            String baseName = name.substring(0, name.length() - 1); 
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (context0(newName) == null) {
                    name = newName;
                    break;
                }
            }
        }
        return name;
    }

    private static String generateName0(Class<?> handlerType) {
        return StringUtil.simpleClassName(handlerType) + "#0";
    }

pipeline 中使用了一个 FastThreadLocal 类型的 nameCaches 来缓存各种类型 ChannelHandler 的基础名称。后面会根据这个基础名称不断的重试生成一个没有冲突的正式名称。缓存 nameCaches 中的 key 表示特定的 ChannelHandler 类型,value 表示该特定类型的 ChannelHandler 的基础名称 simpleClassName + #0

自动为 ChannelHandler 生成默认名称的逻辑是:

虽然用户不大可能将同一类型的 channelHandler 重复添加到 pipeline 中,但是 netty 为了防止这种反复添加同一类型 ChannelHandler 的行为导致的名称冲突,从而利用 nameCaches 来缓存同一类型 ChannelHandler 的基础名称 simpleClassName + #0,然后通过不断的重试递增名称后缀,来生成一个在pipeline中唯一的名称。

5.2 childExecutor

通过前边的介绍我们了解到,当我们向 pipeline 添加 ChannelHandler 的时候,netty 允许我们为 ChannelHandler 指定特定的 executor 去执行 ChannelHandler 中的各种事件回调方法。

通常我们会为 ChannelHandler 指定一个EventExecutorGroup,在创建ChannelHandlerContext 的时候,会通过 childExecutor 方法从 EventExecutorGroup 中选取一个 EventExecutor 来与该 ChannelHandler 绑定。

EventExecutorGroup 是 netty 自定义的一个线程池模型,其中包含多个 EventExecutor ,而 EventExecutor 在 netty 中是一个线程的执行模型。相关的具体实现和用法笔者已经在《Reactor在Netty中的实现(创建篇)》一文中给出了详尽的介绍,忘记的同学可以在回顾下。

在介绍 executor 的绑定逻辑之前,这里笔者需要先为大家介绍一个相关的重要参数:SINGLE_EVENTEXECUTOR_PER_GROUP ,默认为 true 。

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
  .........
.childOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP,true)

我们知道在 netty 中,每一个 channel 都会对应一个独立的 pipeline ,如果我们开启了 SINGLE_EVENTEXECUTOR_PER_GROUP 参数,表示在一个 channel 对应的 pipeline 中,如果我们为多个 ChannelHandler 指定了同一个 EventExecutorGroup ,那么这多个 channelHandler 只能绑定到 EventExecutorGroup 中的同一个 EventExecutor 上。

什么意思呢??比如我们有下面一段初始化pipeline的代码:

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                 ........................
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline pipeline = ch.pipeline();
                     pipeline.addLast(eventExecutorGroup,channelHandler1)
                     pipeline.addLast(eventExecutorGroup,channelHandler2)
                     pipeline.addLast(eventExecutorGroup,channelHandler3)
                 }
             });

eventExecutorGroup 中包含 EventExecutor1,EventExecutor2 , EventExecutor3 三个执行线程。

假设此时第一个连接进来,在创建 channel1 后初始化 pipeline1 的时候,如果在开启 SINGLE_EVENTEXECUTOR_PER_GROUP 参数的情况下,那么在 channel1 对应的 pipeline1 中 channelHandler1,channelHandler2 , channelHandler3 绑定的 EventExecutor 均为 EventExecutorGroup 中的 EventExecutor1 。

第二个连接 channel2 对应的 pipeline2 中 channelHandler1 , channelHandler2 ,channelHandler3 绑定的 EventExecutor 均为 EventExecutorGroup 中的 EventExecutor2 。

第三个连接 channel3 对应的 pipeline3 中 channelHandler1 , channelHandler2 ,channelHandler3 绑定的 EventExecutor 均为 EventExecutorGroup 中的 EventExecutor3 。

以此类推........

SINGLE_EVENTEXECUTOR_PER_GROUP绑定.png

如果在关闭 SINGLE_EVENTEXECUTOR_PER_GROUP 参数的情况下,
channel1 对应的 pipeline1 中 channelHandler1 会绑定到 EventExecutorGroup 中的 EventExecutor1 ,channelHandler2 会绑定到 EventExecutor2 ,channelHandler3 会绑定到 EventExecutor3 。

同理其他 channel 对应的 pipeline 中的 channelHandler 绑定逻辑同 channel1 。它们均会绑定到 EventExecutorGroup 中的不同 EventExecutor 中。

SINGLE_EVENTEXECUTOR_PER_GROUP关闭.png

当我们了解了 SINGLE_EVENTEXECUTOR_PER_GROUP 参数的作用之后,再来看下面这段绑定逻辑就很容易理解了。

     // 在每个pipeline中都会保存EventExecutorGroup中绑定的线程
    private Map<EventExecutorGroup, EventExecutor> childExecutors;

    private EventExecutor childExecutor(EventExecutorGroup group) {
        if (group == null) {
            return null;
        }

        Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
        if (pinEventExecutor != null && !pinEventExecutor) {
            //如果没有开启SINGLE_EVENTEXECUTOR_PER_GROUP,则按顺序从指定的EventExecutorGroup中为channelHandler分配EventExecutor
            return group.next();
        }

        //获取pipeline绑定到EventExecutorGroup的线程(在一个pipeline中会为每个指定的EventExecutorGroup绑定一个固定的线程)
        Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
        if (childExecutors == null) {
            childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
        }

        //获取该pipeline绑定在指定EventExecutorGroup中的线程
        EventExecutor childExecutor = childExecutors.get(group);
        if (childExecutor == null) {
            childExecutor = group.next();
            childExecutors.put(group, childExecutor);
        }
        return childExecutor;
    }

如果我们并未特殊指定 ChannelHandler 的 executor ,那么默认会是对应 channel 绑定的 reactor 线程负责执行该 ChannelHandler 。

如果我们未开启 SINGLE_EVENTEXECUTOR_PER_GROUP ,netty 就会从我们指定的 EventExecutorGroup 中按照 round-robin 的方式为 ChannelHandler 绑定其中一个 eventExecutor 。

SINGLE_EVENTEXECUTOR_PER_GROUP关闭.png

如果我们开启了 SINGLE_EVENTEXECUTOR_PER_GROUP相同的 EventExecutorGroup 在同一个 pipeline 实例中的绑定关系是固定的。在 pipeline 中如果多个 channelHandler 指定了同一个 EventExecutorGroup ,那么这些 channelHandler 的 executor 均会绑定到一个固定的 eventExecutor 上。

SINGLE_EVENTEXECUTOR_PER_GROUP绑定.png

这种固定的绑定关系缓存于每个 pipeline 中的 Map<EventExecutorGroup, EventExecutor> childExecutors 字段中,key 是用户为 channelHandler 指定的 EventExecutorGroup ,value 为该 EventExecutorGroup 在 pipeline 实例中的绑定 eventExecutor 。

接下来就是从 childExecutors 中获取指定 EventExecutorGroup 在该 pipeline 实例中的绑定 eventExecutor,如果绑定关系还未建立,则通过 round-robin 的方式从 EventExecutorGroup 中选取一个 eventExecutor 进行绑定,并在 childExecutor 中缓存绑定关系。

如果绑定关系已经建立,则直接为 ChannelHandler 指定绑定好的 eventExecutor。

5.3 ChanneHandlerContext

在介绍完创建 ChannelHandlerContext 的两个前置操作后,我们回头来看下 ChannelHandlerContext 中包含了哪些具体的上下文信息。

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    // ChannelHandlerContext包裹的channelHandler
    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, handler.getClass());
        //包裹的channelHandler
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }
}
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    //对应channelHandler的名称
    private final String name;

    //ChannelHandlerContext中持有pipeline的引用
    private final DefaultChannelPipeline pipeline;

    // channelHandler对应的executor 默认为reactor
    final EventExecutor executor;

    //channelHandlerContext中保存channelHandler的执行条件掩码(是什么类型的ChannelHandler,对什么事件感兴趣)
    private final int executionMask;

    //false表示 当channelHandler的状态为ADD_PENDING的时候,也可以响应pipeline中的事件
    //true表示只有在channelHandler的状态为ADD_COMPLETE的时候才能响应pipeline中的事件
    private final boolean ordered;

    //channelHandelr的状态,初始化为INIT
    private volatile int handlerState = INIT;

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                                  String name, Class<? extends ChannelHandler> handlerClass) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        //channelHandlerContext中保存channelHandler的执行条件掩码(是什么类型的ChannelHandler,对什么事件感兴趣)
        this.executionMask = mask(handlerClass);
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

}
ChannelHandlerContext.png

这里笔者重点介绍 orderd 属性和 executionMask 属性,其他的属性大家很容易理解。

  ordered = executor == null || executor instanceof OrderedEventExecutor;

当我们不指定 channelHandler 的 executor 时或者指定的 executor 类型为 OrderedEventExecutor 时,ordered = true。

那么这个 ordered 属性对于 ChannelHandler 响应 pipeline 中的事件有什么影响呢?

我们之前介绍过在 ChannelHandler 响应 pipeline 中的事件之前都会调用 invokeHandler() 方法来判断是否回调 ChannelHandler 的事件回调方法还是跳过。

   private boolean invokeHandler() {
        int handlerState = this.handlerState;
        return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
    }

另一个重要的属性 executionMask 保存的是当前 ChannelHandler 的一些执行条件信息掩码,比如:

    private static final FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>> MASKS =
            new FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>>() {
                @Override
                protected Map<Class<? extends ChannelHandler>, Integer> initialValue() {
                    return new WeakHashMap<Class<? extends ChannelHandler>, Integer>(32);
                }
            };

    static int mask(Class<? extends ChannelHandler> clazz) {
        // 因为每建立一个channel就会初始化一个pipeline,这里需要将ChannelHandler对应的mask缓存
        Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
        Integer mask = cache.get(clazz);
        if (mask == null) {
            // 计算ChannelHandler对应的mask(什么类型的ChannelHandler,对什么事件感兴趣)
            mask = mask0(clazz);
            cache.put(clazz, mask);
        }
        return mask;
    }

这里需要一个 FastThreadLocal 类型的 MASKS 字段来缓存 ChannelHandler 对应的执行掩码。因为 ChannelHandler 类一旦被定义出来它的执行掩码就固定了,而 netty 需要接收大量的连接,创建大量的 channel ,并为这些 channel 初始化对应的 pipeline ,需要频繁的记录 channelHandler 的执行掩码到 context 类中,所以这里需要将掩码缓存起来。

    private static int mask0(Class<? extends ChannelHandler> handlerType) {
        int mask = MASK_EXCEPTION_CAUGHT;
        try {
            if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
                //如果该ChannelHandler是Inbound类型的,则先将inbound事件全部设置进掩码中
                mask |= MASK_ALL_INBOUND;

                //最后在对不感兴趣的事件一一排除(handler中的事件回调方法如果标注了@Skip注解,则认为handler对该事件不感兴趣)
                if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_REGISTERED;
                }
                if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_UNREGISTERED;
                }
                if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_ACTIVE;
                }
                if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_INACTIVE;
                }
                if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
                    mask &= ~MASK_CHANNEL_READ;
                }
                if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_READ_COMPLETE;
                }
                if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
                }
                if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
                    mask &= ~MASK_USER_EVENT_TRIGGERED;
                }
            }

            if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
                //如果handler为Outbound类型的,则先将全部outbound事件设置进掩码中
                mask |= MASK_ALL_OUTBOUND;

                //最后对handler不感兴趣的事件从掩码中一一排除
                if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                        SocketAddress.class, ChannelPromise.class)) {
                    mask &= ~MASK_BIND;
                }
                if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
                        SocketAddress.class, ChannelPromise.class)) {
                    mask &= ~MASK_CONNECT;
                }
                if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
                    mask &= ~MASK_DISCONNECT;
                }
                if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
                    mask &= ~MASK_CLOSE;
                }
                if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
                    mask &= ~MASK_DEREGISTER;
                }
                if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
                    mask &= ~MASK_READ;
                }
                if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
                        Object.class, ChannelPromise.class)) {
                    mask &= ~MASK_WRITE;
                }
                if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
                    mask &= ~MASK_FLUSH;
                }
            }

            if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
                mask &= ~MASK_EXCEPTION_CAUGHT;
            }
        } catch (Exception e) {
            // Should never reach here.
            PlatformDependent.throwException(e);
        }

        //计算出的掩码需要缓存,因为每次向pipeline中添加该类型的handler的时候都需要获取掩码(创建一个channel 就需要为其初始化pipeline)
        return mask;
    }

计算 ChannelHandler 的执行掩码 mask0 方法虽然比较长,但是逻辑却十分简单。在本文的第三小节《3. pipeline中的事件分类》中,笔者为大家详细介绍了各种事件类型的掩码表示,这里我来看下如何利用这些基本事件掩码来计算出 ChannelHandler 的执行掩码的。

如果 ChannelHandler 是 ChannelInboundHandler 类型的,那么首先会将所有 Inbound 事件掩码设置进执行掩码 mask 中。

最后挨个遍历所有 Inbound 事件,从掩码集合 mask 中排除该 ChannelHandler 不感兴趣的事件。这样一轮下来,就得到了 ChannelHandler 的执行掩码。

从这个过程中我们可以看到,ChannelHandler 的执行掩码包含的是该 ChannelHandler 感兴趣的事件掩码集合。当事件在 pipeline 中传播的时候,在 ChannelHandlerContext 中可以利用这个执行掩码来判断,当前 ChannelHandler 是否符合响应该事件的资格。

同理我们也可以计算出 ChannelOutboundHandler 类型的 ChannelHandler 对应的执行掩码。

那么 netty 框架是如何判断出我们自定义的 ChannelHandler 对哪些事件感兴趣,对哪些事件不感兴趣的呢?

这里我们以 ChannelInboundHandler 类型举例说明,在本文第三小节中,笔者对所有 Inbound 类型的事件作了一个全面的介绍,但是在实际开发中,我们可能并不需要监听所有的 Inbound 事件,可能只是需要监听其中的一到两个事件。

对于我们不感兴趣的事件,我们只需要在其对应的回调方法上标注 @Skip 注解即可,netty 就会认为该 ChannelHandler 对标注 @Skip 注解的事件不感兴趣,当不感兴趣的事件在 pipeline 传播的时候,该 ChannelHandler 就不需要执行响应。

    private static boolean isSkippable(
            final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
        return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
            @Override
            public Boolean run() throws Exception {
                Method m;
                try {
                    // 首先查看类中是否覆盖实现了对应的事件回调方法
                    m = handlerType.getMethod(methodName, paramTypes);
                } catch (NoSuchMethodException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(
                            "Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
                    }
                    return false;
                }
                return m != null && m.isAnnotationPresent(Skip.class);
            }
        });
    }

那我们在编写自定义 ChannelHandler 的时候是不是要在 ChannelInboundHandler 或者 ChannelOutboundHandler 接口提供的所有事件回调方法上,对我们不感兴趣的事件繁琐地一一标注 @Skip 注解呢?

其实是不需要的,netty 为我们提供了 ChannelInboundHandlerAdapter 类和 ChannelOutboundHandlerAdapter 类,netty 事先已经在这些 Adapter 类中的事件回调方法上全部标注了 @Skip 注解,我们在自定义实现 ChannelHandler 的时候只需要继承这些 Adapter 类并覆盖我们感兴趣的事件回调方法即可。

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {

    @Skip
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    @Skip
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();
    }

    @Skip
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

    @Skip
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Skip
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    @Skip
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }

    @Skip
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    @Skip
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }

    @Skip
    @Override
    @SuppressWarnings("deprecation")
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

6. 从 pipeline 删除 channelHandler

从上个小节的内容中我们可以看到向 pipeline 中添加 ChannelHandler 的逻辑还是比较复杂的,涉及到的细节比较多。

那么在了解了向 pipeline 中添加 ChannelHandler 的过程之后,从 pipeline 中删除 ChannelHandler 的逻辑就变得很好理解了。

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    //从pipeline中删除指定的channelHandler
    ChannelPipeline remove(ChannelHandler handler);
    //从pipeline中删除指定名称的channelHandler
    ChannelHandler remove(String name);
    //从pipeline中删除特定类型的channelHandler
    <T extends ChannelHandler> T remove(Class<T> handlerType);
}

netty 提供了以上三种方式从 pipeline 中删除指定 ChannelHandler ,下面我们以第一种方式为例来介绍 ChannelHandler 的删除过程。

public class DefaultChannelPipeline implements ChannelPipeline {

    @Override
    public final ChannelPipeline remove(ChannelHandler handler) {
        remove(getContextOrDie(handler));
        return this;
    }

}

6.1 getContextOrDie

首先需要通过 getContextOrDie 方法在 pipeline 中查找到指定的 ChannelHandler 对应的 ChannelHandelrContext 。以便确认要删除的 ChannelHandler 确实是存在于 pipeline 中。

context 方法是通过遍历 pipeline 中的双向链表来查找要删除的 ChannelHandlerContext 。

    private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
        if (ctx == null) {
            throw new NoSuchElementException(handler.getClass().getName());
        } else {
            return ctx;
        }
    }

    @Override
    public final ChannelHandlerContext context(ChannelHandler handler) {
        ObjectUtil.checkNotNull(handler, "handler");
        // 获取 pipeline 双向链表结构的头结点
        AbstractChannelHandlerContext ctx = head.next;
        for (;;) {

            if (ctx == null) {
                return null;
            }

            if (ctx.handler() == handler) {
                return ctx;
            }

            ctx = ctx.next;
        }
    }

6.2 remove

remove 方法的整体代码结构和 addLast0 方法的代码结构一样,整体逻辑也是先从 pipeline 中的双向链表结构中将指定的 ChanneHandlerContext 删除,然后在处理被删除的 ChannelHandler 中 handlerRemoved 方法的回调。

    private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
        assert ctx != head && ctx != tail;

        synchronized (this) {
            //从pipeline的双向列表中删除指定channelHandler对应的context
            atomicRemoveFromHandlerList(ctx);

            if (!registered) {
                //如果此时channel还未向reactor注册,则通过向pipeline中添加PendingHandlerRemovedTask任务
                //在注册之后回调channelHandelr中的handlerRemoved方法
                callHandlerCallbackLater(ctx, false);
                return ctx;
            }

            //channelHandelr从pipeline中删除后,需要回调其handlerRemoved方法
            //需要确保handlerRemoved方法在channelHandelr指定的executor中进行
            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
        }
        callHandlerRemoved0(ctx);
        return ctx;
    }
  1. 从 pipeline 中删除指定 ChannelHandler 对应的 ChannelHandlerContext 。逻辑比较简单,就是普通双向链表的删除操作。
    private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }
  1. 如果此时 channel 并未向对应的 reactor 进行注册,则需要向 pipeline 的任务列表中添加 PendingHandlerRemovedTask 任务,再该任务中会执行 ChannelHandler 的 handlerRemoved 回调,当 channel 向 reactor 注册成功后,reactor 会执行 pipeline 中任务列表中的任务,从而回调被删除 ChannelHandler 的 handlerRemoved 方法。
pipeline任务.png
    private final class PendingHandlerRemovedTask extends PendingHandlerCallback {

        PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        public void run() {
            callHandlerRemoved0(ctx);
        }
    }

在执行 ChannelHandler 中 handlerRemoved 回调的时候,需要对 ChannelHandler 的状态进行判断:只有当 handlerState 为 ADD_COMPLETE 的时候才能回调 handlerRemoved 方法。

这里表达的语义是只有当 ChannelHanler 的 handlerAdded 方法被回调之后,那么在 ChannelHanler 被从 pipeline 中删除的时候它的 handlerRemoved 方法才可以被回调。

在 ChannelHandler 的 handlerRemove 方法被回调之后,将 ChannelHandler 的状态设置为 REMOVE_COMPLETE 。

    private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {

        try {
            // 在这里回调 handlerRemoved 方法
            ctx.callHandlerRemoved();
        } catch (Throwable t) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
        }
    }

    final void callHandlerRemoved() throws Exception {
        try {
            if (handlerState == ADD_COMPLETE) {
                handler().handlerRemoved(this);
            }
        } finally {
            // Mark the handler as removed in any case.
            setRemoved();
        }
    }

   final void setRemoved() {
        handlerState = REMOVE_COMPLETE;
    }
  1. 如果 channel 已经在 reactor 中注册成功,那么当 channelHandler 从 pipeline 中删除之后,需要立即回调其 handlerRemoved 方法。但是需要确保 handlerRemoved 方法在 channelHandler 指定的 executor 中进行。

7. pipeline 的初始化

其实关于 pipeline 初始化的相关内容我们在《详细图解 Netty Reactor 启动全流程》中已经简要介绍了 NioServerSocketChannel 中的 pipeline 的初始化时机以及过程。

《Netty 如何高效接收网络连接》中笔者也简要介绍了 NioSocketChannel 中 pipeline 的初始化时机以及过程。

本小节笔者将结合这两种类型的 Channel 来完整全面的介绍 pipeline 的整个初始化过程。

7.1 NioServerSocketChannel 中 pipeline 的初始化

从前边提到的这两篇文章以及本文前边的相关内容我们知道,Netty 提供了一个特殊的 ChannelInboundHandler 叫做 ChannelInitializer ,用户可以利用这个特殊的 ChannelHandler 对 Channel 中的 pipeline 进行自定义的初始化逻辑。

如果用户只希望在 pipeline 中添加一个固定的 ChannelHandler 可以通过如下代码直接添加。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主从Reactor
                  ...........
             .handler(new LoggingHandler(LogLevel.INFO))

如果希望添加多个 ChannelHandler ,则可以通过 ChannelInitializer 来自定义添加逻辑。

由于使用 ChannelInitializer 初始化 NioServerSocketChannel 中 pipeline 的逻辑会稍微复杂一点,下面我们均以这个复杂的案例来讲述 pipeline 的初始化过程。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主从Reactor
                  ...........
               .handler(new ChannelInitializer<NioServerSocketChannel>() {
                 @Override
                 protected void initChannel(NioServerSocketChannel ch) throws Exception {
                              ....自定义pipeline初始化逻辑....
                               ChannelPipeline p = ch.pipeline();
                               p.addLast(channelHandler1);
                               p.addLast(channelHandler2);
                               p.addLast(channelHandler3);
                                    ........
                 }
             })

以上这些由用户自定义的用于初始化 pipeline 的 ChannelInitializer ,被保存至 ServerBootstrap 启动类中的 handler 字段中。用于后续的初始化调用

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable
   private volatile ChannelHandler handler;
}

在服务端启动的时候,会伴随着 NioServeSocketChannel 的创建以及初始化,在初始化 NioServerSokcetChannel 的时候会将一个新的 ChannelInitializer 添加进 pipeline 中,在新的 ChannelInitializer 中才会将用户自定义的 ChannelInitializer 添加进 pipeline 中,随后才执行初始化过程。

Netty 这里之所以引入一个新的 ChannelInitializer 来初始化 NioServerSocketChannel 中的 pipeline 的原因是需要兼容前边介绍的这两种初始化 pipeline 的方式。

忘记 netty 启动过程的同学可以在回看下笔者的《详细图解 Netty Reactor 启动全流程》这篇文章。

   @Override
    void init(Channel channel) {
       .........

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //ServerBootstrap中用户指定的channelHandler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                .........
            }
        });
   }

注意此时 NioServerSocketChannel 并未开始向 Main Reactor 注册,根据本文第四小节《4. 向 pipeline 添加 channelHandler 》中的介绍,此时向 pipeline 中添加这个新的 ChannelInitializer 之后,netty 会向 pipeline 的任务列表中添加 PendingHandlerAddedTask 。当 NioServerSocketChannel 向 Main Reactor 注册成功之后,紧接着 Main Reactor 线程会调用这个 PendingHandlerAddedTask ,在任务中会执行这个新的 ChannelInitializer 的 handlerAdded 回调。在这个回调方法中会执行上边 initChannel 方法里的代码。

server channel pipeline 注册前结构.png

当 NioServerSocketChannel 在向 Main Reactor 注册成功之后,就挨个执行 pipeline 中的任务列表中的任务。

       private void register0(ChannelPromise promise) {
                     .........
                boolean firstRegistration = neverRegistered;
                //执行真正的注册操作
                doRegister();
                //修改注册状态
                neverRegistered = false;
                registered = true;
                //调用pipeline中的任务链表,执行PendingHandlerAddedTask
                pipeline.invokeHandlerAddedIfNeeded();
                .........
    final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
            // 执行 pipeline 任务列表中的 PendingHandlerAddedTask 任务。
            callHandlerAddedForAllHandlers();
        }
    }

执行 pipeline 任务列表中的 PendingHandlerAddedTask 任务:

    private void callHandlerAddedForAllHandlers() {
        // pipeline 任务列表中的头结点
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered;
            // This Channel itself was registered.
            registered = true;
            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            // Null out so it can be GC'ed.
            this.pendingHandlerCallbackHead = null;
        }

        PendingHandlerCallback task = pendingHandlerCallbackHead;
        // 挨个执行任务列表中的任务
        while (task != null) {
            //触发 ChannelInitializer 的 handlerAdded 回调
            task.execute();
            task = task.next;
        }
    }

最终在 PendingHandlerAddedTask 中执行 pipeline 中 ChannelInitializer 的 handlerAdded 回调。

这个 ChannelInitializer 就是在初始化 NioServerSocketChannel 的 init 方法中向 pipeline 添加的 ChannelInitializer。

@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {

            if (initChannel(ctx)) {
                //初始化工作完成后,需要将自身从pipeline中移除
                removeState(ctx);
            }
        }
    }

}

在 handelrAdded 回调中执行 ChannelInitializer 匿名类中 initChannel 方法,注意此时执行的 ChannelInitializer 类为在本小节开头 init 方法中由 Netty 框架添加的 ChannelInitializer ,并不是用户自定义的 ChannelInitializer 。

    @Override
    void init(Channel channel) {
       .........

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //ServerBootstrap中用户指定的ChannelInitializer
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                .........
            }
        });
   }

执行完 ChannelInitializer 匿名类中 initChannel 方法后,需将 ChannelInitializer 从 pipeline 中删除。并回调 ChannelInitializer 的 handlerRemoved 方法。删除过程笔者已经在第六小节《6. 从 pipeline 删除 channelHandler》详细介绍过了。

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                //执行ChannelInitializer匿名类中的initChannel方法
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                    //初始化完毕后,从pipeline中移除自身
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }

当执行完 initChannel 方法后此时 pipeline 的结构如下图所示:

serverSocketChannel注册之后pipeline结构.png

当用户的自定义 ChannelInitializer 被添加进 pipeline 之后,根据第四小节所讲的添加逻辑,此时 NioServerSocketChannel 已经向 main reactor 成功注册完毕,不再需要向 pipeine 的任务列表中添加 PendingHandlerAddedTask 任务,而是直接调用自定义 ChannelInitializer 中的 handlerAdded 回调,和上面的逻辑一样。不同的是这里最终回调至用户自定义的初始化逻辑实现 initChannel 方法中。执行完用户自定义的初始化逻辑之后,从 pipeline 删除用户自定义的 ChannelInitializer 。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主从Reactor
                  ...........
               .handler(new ChannelInitializer<NioServerSocketChannel>() {
                 @Override
                 protected void initChannel(NioServerSocketChannel ch) throws Exception {
                              ....自定义pipeline初始化逻辑....
                               ChannelPipeline p = ch.pipeline();
                               p.addLast(channelHandler1);
                               p.addLast(channelHandler2);
                               p.addLast(channelHandler3);
                                    ........
                 }
             })

随后 netty 会以异步任务的形式向 pipeline 的末尾添加 ServerBootstrapAcceptor ,至此 NioServerSocketChannel 中 pipeline 的初始化工作就全部完成了。

7.2 NioSocketChannel 中 pipeline 的初始化

在 7.1 小节中笔者举的这个 pipeline 初始化的例子相对来说比较复杂,当我们把这个复杂例子的初始化逻辑搞清楚之后,NioSocketChannel 中 pipeline 的初始化过程就变的很简单了。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主从Reactor
                  ...........
               .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                              ....自定义pipeline初始化逻辑....
                               ChannelPipeline p = ch.pipeline();
                               p.addLast(channelHandler1);
                               p.addLast(channelHandler2);
                               p.addLast(channelHandler3);
                                    ........
                 }
             })
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    //保存用户自定义ChannelInitializer
    private volatile ChannelHandler childHandler;
}

《Netty 如何高效接收网络连接》一文中我们介绍过,当客户端发起连接,完成三次握手之后,NioServerSocketChannel 上的 OP_ACCEPT 事件活跃,随后会在 NioServerSocketChannel 的 pipeline 中触发 channelRead 事件。并最终在 ServerBootstrapAcceptor 中初始化客户端 NioSocketChannel 。

主从Reactor组完整结构.png
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

       @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            child.pipeline().addLast(childHandler);
                    ...........
       }
}

在这里会将用户自定义的 ChannelInitializer 添加进 NioSocketChannel 中的 pipeline 中,由于此时 NioSocketChannel 还没有向 sub reactor 开始注册。所以在向 pipeline 中添加 ChannelInitializer 的同时会伴随着 PendingHandlerAddedTask 被添加进 pipeline 的任务列表中。

niosocketchannel中pipeline的初始化.png

后面的流程大家应该很熟悉了,和我们在7.1小节中介绍的一模一样,当 NioSocketChannel 再向 sub reactor 注册成功之后,会执行 pipeline 中的任务列表中的 PendingHandlerAddedTask 任务,在 PendingHandlerAddedTask 任务中会回调用户自定义 ChannelInitializer 的 handelrAdded 方法,在该方法中执行 initChannel 方法,用户自定义的初始化逻辑就封装在这里面。在初始化完 pipeline 后,将 ChannelInitializer 从 pipeline 中删除,并回调其 handlerRemoved 方法。

至此客户端 NioSocketChannel 中 pipeline 初始化工作就全部完成了。

8. 事件传播

在本文第三小节《3. pipeline中的事件分类》中我们介绍了 Netty 事件类型共分为三大类,分别是 Inbound类事件,Outbound类事件,ExceptionCaught事件。并详细介绍了这三类事件的掩码表示,和触发时机,以及事件传播的方向。

本小节我们就来按照 Netty 中异步事件的分类从源码角度分析下事件是如何在 pipeline 中进行传播的。

8.1 Inbound事件的传播

在第三小节中我们介绍了所有的 Inbound 类事件,这些事件在 pipeline 中的传播逻辑和传播方向都是一样的,唯一的区别就是执行的回调方法不同。

本小节我们就以 ChannelRead 事件的传播为例,来说明 Inbound 类事件是如何在 pipeline 中进行传播的。

第三小节中我们提到过,在 NioSocketChannel 中,ChannelRead 事件的触发时机是在每一次 read loop 读取数据之后在 pipeline 中触发的。

               do {
                          ............               
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));

                          ............
       
                    // 在客户端NioSocketChannel的pipeline中触发ChannelRead事件
                    pipeline.fireChannelRead(byteBuf);
  
                } while (allocHandle.continueReading());

从这里可以看到,任何 Inbound 类事件在 pipeline 中的传播起点都是从 HeadContext 头结点开始的。

public class DefaultChannelPipeline implements ChannelPipeline {

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
    
                    .........
}

ChannelRead 事件从 HeadContext 开始在 pipeline 中传播,首先就会回调 HeadContext 中的 channelRead 方法。

在执行 ChannelHandler 中的相应事件回调方法时,需要确保回调方法的执行在指定的 executor 中进行。

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        //需要保证channelRead事件回调在channelHandler指定的executor中进行
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

在执行 HeadContext 的 channelRead 方法发生异常时,就会回调 HeadContext 的 exceptionCaught 方法。并在相应的事件回调方法中决定是否将事件继续在 pipeline 中传播。

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ctx.fireChannelRead(msg);
        }

       @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ctx.fireExceptionCaught(cause);
        }
    }

在 HeadContext 中通过 ctx.fireChannelRead(msg) 继续将 ChannelRead 事件在 pipeline 中向后传播。

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
        return this;
    }

}

这里的 findContextInbound 方法是整个 inbound 类事件在 pipeline 中传播的核心所在。

因为我们现在需要继续将 ChannelRead 事件在 pipeline 中传播,所以我们目前的核心问题就是通过 findContextInbound 方法在 pipeline 中找到下一个对 ChannelRead 事件感兴趣的 ChannelInboundHandler 。然后执行该 ChannelInboundHandler 的 ChannelRead 事件回调。

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        //需要保证channelRead事件回调在channelHandler指定的executor中进行
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

ChannelRead 事件就这样循环往复的一直在 pipeline 中传播,在传播的过程中只有对 ChannelRead 事件感兴趣的 ChannelInboundHandler 才可以响应。其他类型的 ChannelHandler 则直接跳过。

如果 ChannelRead 事件在 pipeline 中传播的过程中,没有得到其他 ChannelInboundHandler 的有效处理,最终会被传播到 pipeline 的末尾 TailContext 中。而在本文第二小节中,我们也提到过 TailContext 对于 inbound 事件存在的意义就是做一个兜底的处理。比如:打印日志,释放 bytebuffer 。

 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
            onUnhandledInboundMessage(ctx, msg);
    }

    protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
        onUnhandledInboundMessage(msg);
        if (logger.isDebugEnabled()) {
            logger.debug("Discarded message pipeline : {}. Channel : {}.",
                         ctx.pipeline().names(), ctx.channel());
        }
    }

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            // 释放DirectByteBuffer
            ReferenceCountUtil.release(msg);
        }
    }

}

8.2 findContextInbound

本小节要介绍的 findContextInbound 方法和我们在上篇文章《一文聊透 Netty 发送数据全流程》中介绍的 findContextOutbound 方法均是 netty 异步事件在 pipeline 中传播的核心所在。

事件传播的核心问题就是需要高效的在 pipeline 中按照事件的传播方向,找到下一个具有响应事件资格的 ChannelHandler 。

比如:这里我们在 pipeline 中传播的 ChannelRead 事件,我们就需要在 pipeline 中找到下一个对 ChannelRead 事件感兴趣的 ChannelInboundHandler ,并执行该 ChannelInboudnHandler 的 ChannelRead 事件回调,在 ChannelRead 事件回调中对事件进行业务处理,并决定是否通过 ctx.fireChannelRead(msg) 将 ChannelRead 事件继续向后传播。

    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            ctx = ctx.next;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));

        return ctx;
    }

参数 mask 表示我们正在传播的 ChannelRead 事件掩码 MASK_CHANNEL_READ 。

    static final int MASK_EXCEPTION_CAUGHT = 1;
    static final int MASK_CHANNEL_REGISTERED = 1 << 1;
    static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
    static final int MASK_CHANNEL_ACTIVE = 1 << 3;
    static final int MASK_CHANNEL_INACTIVE = 1 << 4;
    static final int MASK_CHANNEL_READ = 1 << 5;
    static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
    static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
    static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;

通过 ctx = ctx.next 在 pipeline 中找到下一个 ChannelHandler ,并通过 skipContext 方法判断下一个 ChannelHandler 是否具有响应事件的资格。如果没有则跳过继续向后查找。

比如:下一个 ChannelHandler 如果是一个 ChannelOutboundHandler,或者下一个 ChannelInboundHandler 对 ChannelRead 事件不感兴趣,那么就直接跳过。

8.3 skipContext

该方法主要用来判断下一个 ChannelHandler 是否具有 mask 代表的事件的响应资格。

    private static boolean skipContext(
            AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {

        return (ctx.executionMask & (onlyMask | mask)) == 0 ||
                (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
    }

以上就是 skipContext 方法的核心逻辑,这里表达的核心语义是:

这里大部分同学可能会对 ctx.executor() == currentExecutor 这个条件感到很疑惑。加上这个条件,其实对我们这里的核心语义并没有多大影响。

public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {

    @Skip
    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        ctx.bind(localAddress, promise);
    }

    @Skip
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.connect(remoteAddress, localAddress, promise);
    }

    @Skip
    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
        ctx.disconnect(promise);
    }

    @Skip
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
        ctx.close(promise);
    }

    @Skip
    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.deregister(promise);
    }

    @Skip
    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        ctx.read();
    }

    @Skip
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }

    @Skip
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

而这里之所以需要加入 ctx.executor() == currentExecutor 条件的判断,是为了防止 HttpContentCompressor 在被指定不同的 executor 情况下无法正确的创建压缩内容,导致的一些异常。但这个不是本文的重点,大家只需要理解这里的核心语义就好,这种特殊情况的特殊处理了解一下就好。

8.4 Outbound事件的传播

关于 Outbound 类事件的传播,笔者在上篇文章《一文搞懂 Netty 发送数据全流程》中已经进行了详细的介绍,本小节就不在赘述。

8.5 ExceptionCaught事件的传播

在最后我们来介绍下异常事件在 pipeline 中的传播,ExceptionCaught 事件和 Inbound 类事件一样都是在 pipeline 中从前往后开始传播。

ExceptionCaught 事件的触发有两种情况:一种是 netty 框架内部产生的异常,这时 netty 会直接在 pipeline 中触发 ExceptionCaught 事件的传播。异常事件会在 pipeline 中从 HeadContext 开始一直向后传播直到 TailContext。

比如 netty 在 read loop 中读取数据时发生异常:

     try {
               ...........

               do {
                          ............               
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));

                          ............
       
                    //客户端NioSocketChannel的pipeline中触发ChannelRead事件
                    pipeline.fireChannelRead(byteBuf);
  
                } while (allocHandle.continueReading());

                         ...........
        }  catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
       } 

这时会 netty 会直接从 pipeline 中触发 ExceptionCaught 事件的传播。

       private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
                RecvByteBufAllocator.Handle allocHandle) {
             
                    .............

            pipeline.fireExceptionCaught(cause);

                    .............

        }

和 Inbound 类事件一样,ExceptionCaught 事件会在 pipeline 中从 HeadContext 开始一直向后传播。

    @Override
    public final ChannelPipeline fireExceptionCaught(Throwable cause) {
        AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
        return this;
    }

第二种触发 ExceptionCaught 事件的情况是,当 Inbound 类事件或者 flush 事件在 pipeline 中传播的过程中,在某个 ChannelHandler 中的事件回调方法处理中发生异常,这时该 ChannelHandler 的 exceptionCaught 方法会被回调。用户可以在这里处理异常事件,并决定是否通过 ctx.fireExceptionCaught(cause) 继续向后传播异常事件。

比如我们在 ChannelInboundHandler 中的 ChannelRead 回调中处理业务请求时发生异常,就会触发该 ChannelInboundHandler 的 exceptionCaught 方法。

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
    private void invokeExceptionCaught(final Throwable cause) {
        if (invokeHandler()) {
            try {
                //触发channelHandler的exceptionCaught回调
                handler().exceptionCaught(this, cause);
            } catch (Throwable error) {
                  ........
        } else {
                  ........
        }
    }

再比如:当我们在 ChannelOutboundHandler 中的 flush 回调中处理业务结果发送的时候发生异常,也会触发该 ChannelOutboundHandler 的 exceptionCaught 方法。

   private void invokeFlush0() {
        try {
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    }

我们可以在 ChannelHandler 的 exceptionCaught 回调中进行异常处理,并决定是否通过 ctx.fireExceptionCaught(cause) 继续向后传播异常事件。

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {

        .........异常处理.......

        ctx.fireExceptionCaught(cause);
    }
    @Override
    public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
        invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
        return this;
    }

   static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
        ObjectUtil.checkNotNull(cause, "cause");
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeExceptionCaught(cause);
        } else {
            try {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeExceptionCaught(cause);
                    }
                });
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to submit an exceptionCaught() event.", t);
                    logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
                }
            }
        }
    }

8.6 ExceptionCaught 事件和 Inbound 类事件的区别

虽然 ExceptionCaught 事件和 Inbound 类事件在传播方向都是在 pipeline 中从前向后传播。但是大家这里注意区分这两个事件的区别。

在 Inbound 类事件传播过程中是会查找下一个具有事件响应资格的 ChannelInboundHandler 。遇到 ChannelOutboundHandler 会直接跳过。

而 ExceptionCaught 事件无论是在哪种类型的 channelHandler 中触发的,都会从当前异常 ChannelHandler 开始一直向后传播,ChannelInboundHandler 可以响应该异常事件,ChannelOutboundHandler 也可以响应该异常事件。

由于无论异常是在 ChannelInboundHandler 中产生的还是在 ChannelOutboundHandler 中产生的, exceptionCaught 事件都会在 pipeline 中是从前向后传播,并且不关心 ChannelHandler 的类型。所以我们一般将负责统一异常处理的 ChannelHandler 放在 pipeline 的最后,这样它对于 inbound 类异常和 outbound 类异常均可以捕获得到。

异常事件的传播.png

总结

本文涉及到的内容比较多,通过 netty 异步事件在 pipeline 中的编排和传播这条主线,我们相当于将之前的文章内容重新又回顾总结了一遍。

本文中我们详细介绍了 pipeline 的组成结构,它主要是由 ChannelHandlerContext 类型节点组成的双向链表。ChannelHandlerContext 包含了 ChannelHandler 执行上下文的信息,从而可以使 ChannelHandler 只关注于 IO 事件的处理,遵循了单一原则和开闭原则。

此外 pipeline 结构中还包含了一个任务链表,用来存放执行 ChannelHandler 中的 handlerAdded 回调和 handlerRemoved 回调。pipeline 还持有了所属 channel 的引用。

我们还详细介绍了 Netty 中异步事件的分类:Inbound 类事件,Outbound 类事件,ExceptionCaught 事件。并详细介绍了每种分类下的所有事件的触发时机和在 pipeline 中的传播路径。

最后介绍了 pipeline 的结构以及创建和初始化过程,以及对 pipeline 相关操作的源码实现。

中间我们又穿插介绍了 ChannelHanderContext 的结构,介绍了 ChannelHandlerContext 具体封装了哪些关于 ChannelHandler 执行的上下文信息。

本文的内容到这里就结束了,感谢大家的观看,我们下篇文章见~~~

上一篇下一篇

猜你喜欢

热点阅读