JavaNIO(七)ChannelPipeline和Channe

2019-07-28  本文已影响0人  清雨季

一 ChannelHandlerContext和ChannelHandler

在ChannelPipeline这一层,ChannelPipeline会把收到的事件交给ChannelHandler处理。

在外部,我们可以向ChannelPipeline中添加ChannelHandler处理器,但是添加到Pipeline中后,会被包装成一个ChannelHandlerContext对象:

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            newCtx = newContext(group, filterName(name, handler), handler);
            addLast0(newCtx);
            //...
    }

ChannelHandlerContext主要是做了下面两件事:

在ChannelPipeline中,把所有的ChannelHandlerContext放在一个链表中,本身记录了链表的头尾节点:

    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;

对于ChannelHandler中每一个事件相关的方法,ChannelHandlerContext中都有一个对应的fireXXX方法,这个方法用于向下一个ChannelHandler传递事件

二 inbound事件和outbound事件

Pipeline层把所有的事件分为两种:

 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+

二 ChannelPipeline的初始化过程

ChannelPipeline使用双向链表维护了ChannlHandler的信息,所以初始化中最关键的,是初始化了双向队列的头尾节点:

    protected DefaultChannelPipeline(Channel channel) {
        //....
        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

这里TailContext和HeadContext都是ChannelHandler的实现类
tail节点基本是空实现,或者仅仅打印一些日志:

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

而这个head节点,也就是HeadContext对象,就做了很多事了。

例如,上一节说的,当向Channel中写数据时,数据会先被ChannelPipeline处理,然后再交给Unsafe,把数据写出去。 其实就是在HeadContext中,把数据交给Unsafe写出去的:

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            unsafe.write(msg, promise);
        }

由于写数据是outbound事件,所以会由tail节点开始处理,数据到达head节点时已经处理完了。

三 处理一个事件的流程

以channelRead事件为例:

在ChannelPipeline#fireChannelRead方法中,首先调用head节点的channelRead方法进行处理:

    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }

AbstractChannelHandlerContext.invokeChannelRead方法的第一个参数是ChannelHandler类型的,方法内部会执行这个ChannelHandler对象的channelRead方法:

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(() -> next.invokeChannelRead(m));
        }
    }

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

这段代码的逻辑就是判断是否配置了专门的线程池,如果配置了,就用专门的线程池去调用。

上面说过,head节点其实就是一个HeadContext对象,因此具体执行到的就是HeadContext#channelRead方法:

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ctx.fireChannelRead(msg);
        }

方法内只是简单的调用了ctx.fireChannelRead(msg); 这行代码的作用就是向下一个ChannelHanderl传递事件。因此,如果在想在某个ChannelHandler中拦截事件,那么只需要不加上这一行代码即可。

然后这个ctx.fireChannelRead方法中,会找到下一个ChannelHandler:

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
        return this;
    }

找到后又调用invokeChannelRead触发下一个ChannelHandler的channelRead方法。这就回到了最开始的地方。

一直这样循环处理,直到所有的ChannelHandler处理完毕。

上一篇下一篇

猜你喜欢

热点阅读