(六)ChannelPipeline与ChannelHandle

2021-04-26  本文已影响0人  guessguess

本来是想直接看Channel相关的事件是如何传播的,但是发现在看如何传播之前,还是有必要梳理一下ChannelPipeline与ChannelHandlerContext的结构。方便后续理解。之前在梳理Netty是如何将自定义的ChannelHandler添加到ChannelPipeline中的时候有粗略讲到基本的结构。

基本概念

每个Channel都有自己的ChannelPipeline,用于管理ChannelHandlerContext。
而ChannelPipeline中有一个双向的ChannelHandlerContext链表,用于管理ChannelHandlerContext。
而ChannelHandlerContext保存了自身与其他ChannelHandlerContext的关联关系,以及保存了ChannelHandler。

ChannelHandlerContext说白了就是用来保存ChannelHandler的。
而ChannelPipeline说白了就是控制channelHandler的执行顺序的。(通过控制ChannelHandlerContext链的顺序)

ChannelInboundInvoker

感觉看了蛮久的源码,总算get到为什么里面的方法都是fire了。
某种意义上,InBound入站的操作都是都是被动的。
比如fireChannelRegistered这个方法,是Channel完成注册(注册到selector中,与对应的eventLoop绑定),随后要做的操作,可以通过fire(发射事件),去处理一些后续的操作。
所以可以理解成,都是一些操作完成之后,再去做的事情(有点像回调)

public interface ChannelInboundInvoker {
    ChannelInboundInvoker fireChannelRegistered();
    ChannelInboundInvoker fireChannelUnregistered();
    ChannelInboundInvoker fireChannelActive();
    ChannelInboundInvoker fireChannelInactive();
    ChannelInboundInvoker fireExceptionCaught(Throwable cause);
    ChannelInboundInvoker fireUserEventTriggered(Object event);
    ChannelInboundInvoker fireChannelRead(Object msg);
    ChannelInboundInvoker fireChannelReadComplete();
    ChannelInboundInvoker fireChannelWritabilityChanged();
}

ChannelOutboundInvoker

OutBound出站的操作。都是主动发起的,比如连接服务端。

public interface ChannelOutboundInvoker {
    ChannelFuture bind(SocketAddress localAddress);
    ChannelFuture connect(SocketAddress remoteAddress);
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
    ChannelFuture disconnect();
    ChannelFuture close();
    ChannelFuture deregister();
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
    ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    ChannelFuture disconnect(ChannelPromise promise);
    ChannelFuture close(ChannelPromise promise);
    ChannelFuture deregister(ChannelPromise promise);
    ChannelOutboundInvoker read();
    ChannelFuture write(Object msg);
    ChannelFuture write(Object msg, ChannelPromise promise);
    ChannelOutboundInvoker flush();
    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
    ChannelFuture writeAndFlush(Object msg);
    ChannelPromise newPromise();
    ChannelProgressivePromise newProgressivePromise();
    ChannelFuture newSucceededFuture();
    ChannelFuture newFailedFuture(Throwable cause);
    ChannelPromise voidPromise();
}

ChannelPipeline

从这个接口来看,ChannelPipeline具备了以下功能
1.管理ChannelHandlerContext的链。
2.入站操作,去完成某些事件的后续操作。
3.出站操作,主动去做些什么。
4.覆写了接口,入站的方法,返回值都改为ChannelPipeline(ChannelPipeline为ChannelInboundInvoker的子类)

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
    ChannelPipeline addFirst(String name, ChannelHandler handler);
    ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
    ChannelPipeline addLast(String name, ChannelHandler handler);
    ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
    ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
    ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
    ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
    ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
    ChannelPipeline addFirst(ChannelHandler... handlers);
    ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
    ChannelPipeline addLast(ChannelHandler... handlers);
    ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
    ChannelPipeline remove(ChannelHandler handler);
    ChannelHandler remove(String name);
    <T extends ChannelHandler> T remove(Class<T> handlerType);
    ChannelHandler removeFirst();
    ChannelHandler removeLast();
    ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
    ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
    <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
                                         ChannelHandler newHandler);
    ChannelHandler first();
    ChannelHandlerContext firstContext();
    ChannelHandler last();
    ChannelHandlerContext lastContext();
    ChannelHandler get(String name);
    <T extends ChannelHandler> T get(Class<T> handlerType);
    ChannelHandlerContext context(ChannelHandler handler);
    ChannelHandlerContext context(String name);
    ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);
    Channel channel();
    List<String> names();
    Map<String, ChannelHandler> toMap();

    @Override
    ChannelPipeline fireChannelRegistered();

     @Override
    ChannelPipeline fireChannelUnregistered();

    @Override
    ChannelPipeline fireChannelActive();

    @Override
    ChannelPipeline fireChannelInactive();

    @Override
    ChannelPipeline fireExceptionCaught(Throwable cause);

    @Override
    ChannelPipeline fireUserEventTriggered(Object event);

    @Override
    ChannelPipeline fireChannelRead(Object msg);

    @Override
    ChannelPipeline fireChannelReadComplete();

    @Override
    ChannelPipeline fireChannelWritabilityChanged();

    @Override
    ChannelPipeline flush();
}

DefaultChannelPipeline的基本结构

因为在Channel实例化的时候,默认使用的是DefaultChannelPipeline,所以这里只需要关注这个类即可.
结构如下。再说DefaultChannelPipeline的具体实现的时候,先要看看ChannelHandlerContext的基本结构。


DefaultChannelPipeline

ChannelHandlerContext

ChannelHandlerContext也是ChannelInboundInvoker, ChannelOutboundInvoker的子类。
从接口的结构上看,具备以下功能
1.ChannelHanderContext,也跟channel绑定了
2.也有自己对应的EventLoop(就是专门处理它的线程)
3.此外具备了出站,入站的功能。就是可以被动去做些什么,也可以主动去做一些事情。
4.此外也可以返回这个ChannelHandlerContext所在的ChannelPipeline
5 覆写了入站的方法,返回类型改为ChannelHandlerContext(ChannelHandlerContext为ChannelInboundInvoker的子类)

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
    Channel channel();
    EventExecutor executor();
    String name();
    ChannelHandler handler();
    boolean isRemoved();

    @Override
    ChannelHandlerContext fireChannelRegistered();

    @Override
    ChannelHandlerContext fireChannelUnregistered();

    @Override
    ChannelHandlerContext fireChannelActive();
    @Override
    ChannelHandlerContext fireChannelInactive();
    @Override
    ChannelHandlerContext fireExceptionCaught(Throwable cause);
    @Override
    ChannelHandlerContext fireUserEventTriggered(Object evt);
    @Override
    ChannelHandlerContext fireChannelRead(Object msg);
    @Override
    ChannelHandlerContext fireChannelReadComplete();
    @Override
    ChannelHandlerContext fireChannelWritabilityChanged();
    @Override
    ChannelHandlerContext read();
    @Override
    ChannelHandlerContext flush();
    ChannelPipeline pipeline();
    ByteBufAllocator alloc();

    /**
     * @deprecated Use {@link Channel#attr(AttributeKey)}
     */
    @Deprecated
    @Override
    <T> Attribute<T> attr(AttributeKey<T> key);

    /**
     * @deprecated Use {@link Channel#hasAttr(AttributeKey)}
     */
    @Deprecated
    @Override
    <T> boolean hasAttr(AttributeKey<T> key);
}

AbstractChannelHandlerContext

AbstractChannelHandlerContext是ChannelHandlerContext的实现类,其实大多数方法都是这个类实现的。所以这个类很重要。
这个类主要做的事情有以下
1.通过成员变量,可以看得出,维系了前后ctx的关系
2.另外也有自己的状态,正在添加,添加完成,删除完成,初始化
3.根据这个ctx对应的ChannelHandler去设置对应的出,入站属性。
4.也有绑定对应的EventLoop
5.其中也实现了大多数的操作,如以下的例子,fireChannelRegistered,其实说白了就是通过入参的ctx绑定的eventLoop去执行invokeChannelRegistered()方法。

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
    维护了前后的关系,因为ChannelHandlerContext是处于一个链表中
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
    ctx对应的几个状态,正在添加,添加完成,删除完成,初始化
    private static final int ADD_PENDING = 1;
    private static final int ADD_COMPLETE = 2;
    private static final int REMOVE_COMPLETE = 3;
    private static final int INIT = 0;
    是出站,还是入站,这个得看具体ChannelHandler的是入站,还是出站来决定
    private final boolean inbound;
    private final boolean outbound;
    对应的ChannelPipeline
    private final DefaultChannelPipeline pipeline;
    负责处理方法的执行器,用于执行方法。
    final EventExecutor executor;
    一开始的状态默认为初始化
    private volatile int handlerState = INIT;

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

    @Override
    public ChannelHandlerContext fireChannelRegistered() {
        invokeChannelRegistered(findContextInbound());
        return this;
    }

    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

DefaultChannelHandlerContext的基本实现以及结构。

其实从代码来看,也就是多加了俩个方法,用于判断要给Inbound,outbound去设置值罢了。

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
}

结构图如下


DefaultChannelHandlerContext

DefaultChannelPipeline

前面说完了DefaultChannelHandlerContext,最后来看看DefaultChannelPipeline的基本结构。

public class DefaultChannelPipeline implements ChannelPipeline {
    ctx链的头和尾
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    DefaultChannelPipeline所对应的channel
    private final Channel channel;
    构造方法
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }

TailContext 是inbound=true的ChannelHandlerContext
其实很多方法都没有实现。
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        TailContext(DefaultChannelPipeline pipeline) {
            tail是inbound为true的ctx
            super(pipeline, null, TAIL_NAME, true, false);
            添加后,顺便将状态改为添加完成
            setAddComplete();
        }

        @Override
        public ChannelHandler handler() {
            本身TailContext就是ChannelHandler的子类
            return this;
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception { }
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            ReferenceCountUtil.release(evt);
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            onUnhandledInboundException(cause);
        }
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            onUnhandledInboundMessage(msg);
        }
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
    }

    HeadContext 是outbound=true的ChannelHandlerContext。
    但是有点特殊,实现了ChannelOutboundHandler以及ChannelInboundHandler 的接口。
    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            说明HeadContext是outbound=true的ctx
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }

        @Override
        public ChannelHandler handler() {
            return this;
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        }
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        }
        @Override
        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                throws Exception {
            unsafe.bind(localAddress, promise);
        }
        @Override
        public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception {
            unsafe.connect(remoteAddress, localAddress, promise);
        }
        @Override
        public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
            unsafe.disconnect(promise);
        }
        @Override
        public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
            unsafe.close(promise);
        }
        @Override
        public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
            unsafe.deregister(promise);
        }
        @Override
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }
        @Override
        public void flush(ChannelHandlerContext ctx) throws Exception {
            unsafe.flush();
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.fireExceptionCaught(cause);
        }
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            invokeHandlerAddedIfNeeded();
            ctx.fireChannelRegistered();
        }
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelUnregistered();
            if (!channel.isOpen()) {
                destroy();
            }
        }
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
            readIfIsAutoRead();
        }
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelInactive();
        }
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.fireChannelRead(msg);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelReadComplete();
            readIfIsAutoRead();
        }

        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                channel.read();
            }
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            ctx.fireUserEventTriggered(evt);
        }

        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelWritabilityChanged();
        }
    }

}

ChannelHandler

从ChannelHandler的接口结构看。
ChannlHandler可以给自己设置ctx

public interface ChannelHandler {
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}
上一篇 下一篇

猜你喜欢

热点阅读