Netty技术程序员深入浅出Netty源码剖析

Netty ChannelHandler与ChannelPipe

2018-07-24  本文已影响8人  良辰美景TT

ChannelHandler

  ChannelHandler基本上是我们第一次接触Netty就会碰到的对象,我们自定义的各种ChannelHandler主要用于处理我们系统的各种业务逻辑,比如发生了active事件后的处理逻辑,发生了读事件的处理逻辑,下面先来看一下ChannelHandler的类继承图:


image.png

  ChannelHandler被分为两部分,分别为ChannelOutboundHandler与ChannelInboundHandler。其中ChannelInboundHandler提供了一些方法再接收数据或Channel状态改变时被调用,ChannelOutboundHandler则提供了与网络I/O相关的方法。
同时Netty也提供了相应的Adapter,主要是为了我们编码的方便,我们可以通过继承Adapter,这样ChannelHandler里便只需要关注需要重写的方法。而不是实现所有接口的方法。

StringDecoder源码

  我们来关注一下StringDecoder这个类,StringDecoder用于对读入的数据根据指定的字符编码进行转换。StringDecoder继承MessageToMessageDecoder,而MessageToMessageDecoder继承ChannelInboundHandlerAdapter。StringDecoder便是一个典型的ChannelInboundHandler啦,先来看看MessageToMessageDecoder里都有那些内容,源码如下:

public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter {

//matcher用于检验是否对msg进行Decoder
    private final TypeParameterMatcher matcher;

    /**
     * Create a new instance which will try to detect the types to match out of the type parameter of the class.
     */
    protected MessageToMessageDecoder() {
        matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
    }

    /**
     * Create a new instance
     *
     * @param inboundMessageType    The type of messages to match and so decode
     */
    protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {
        matcher = TypeParameterMatcher.get(inboundMessageType);
    }

    /**
     * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
     * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     */
    public boolean acceptInboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }

//可以看出MessageToMessageDecoder只对 channelRead进行了重写,这就是Adapter提供的好处
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//这里的out是个list对象
        CodecOutputList out = CodecOutputList.newInstance();
        try {
//acceptInboundMessage判断是否对msg进行解析
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
//这是个留给子类实现的方法啦, 也就是我们的StringDecoder里会实现的方法啦
                    decode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }
            } else {
                out.add(msg);
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            int size = out.size();
//对out里的对象触发fireChannelRead,让其它的channelhandler处理
            for (int i = 0; i < size; i ++) {
                ctx.fireChannelRead(out.getUnsafe(i));
            }
            out.recycle();
        }
    }


    protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

MessageToMessageDecoder方法只做了两件事:1:判断当前个对象是否需要调用decode方法,2:将decode结果的对象调用fireChannelRead方法交给其它的ChannelHandler处理。StringDecoder类里的方法就更简单了,源码如下:

@Sharable
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {

    // TODO Use CharsetDecoder instead.
//传入字节码
    private final Charset charset;

    /**
     * Creates a new instance with the current system character set.
     */
    public StringDecoder() {
        this(Charset.defaultCharset());
    }

    /**
     * Creates a new instance with the specified character set.
     */
    public StringDecoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
//这里对msg进行处理
        out.add(msg.toString(charset));
    }
}

StringEncoder源码

  我们再来关注一下StringEncoder的处理流程,StringEncoder用于对需要写的数据进行字符编码,StringEncoder继承自MessageToMessageEncoder,而MessageToMessageEncoder又继承ChannelOutboundHandlerAdapter。下面是MessageToMessageEncoder的源码:

public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {

    private final TypeParameterMatcher matcher;

    /**
     * Create a new instance which will try to detect the types to match out of the type parameter of the class.
     */
    protected MessageToMessageEncoder() {
        matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
    }

    /**
     * Create a new instance
     *
     * @param outboundMessageType   The type of messages to match and so encode
     */
    protected MessageToMessageEncoder(Class<? extends I> outboundMessageType) {
        matcher = TypeParameterMatcher.get(outboundMessageType);
    }

    /**
     * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
     * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     */
    public boolean acceptOutboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }

//只需要关注这个方法啦,这里会对面要写的数据进行encode
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        CodecOutputList out = null;
        try {
//跟上面decode一样,需要验证msg能不能处理
            if (acceptOutboundMessage(msg)) {
                out = CodecOutputList.newInstance();
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
//具体的encode留给子类处理
                    encode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (out.isEmpty()) {
                    out.recycle();
                    out = null;
                    throw new EncoderException(
                            StringUtil.simpleClassName(this) + " must produce at least one message.");
                }
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new EncoderException(t);
        } finally {
//out不为空的话,就会调用ctx的witer方法触发写数据的逻辑啦
            if (out != null) {
                final int sizeMinusOne = out.size() - 1;
                if (sizeMinusOne == 0) {
                    ctx.write(out.get(0), promise);
                } else if (sizeMinusOne > 0) {
                    // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
                    // See https://github.com/netty/netty/issues/2525
                    ChannelPromise voidPromise = ctx.voidPromise();
                    boolean isVoidPromise = promise == voidPromise;
                    for (int i = 0; i < sizeMinusOne; i ++) {
                        ChannelPromise p;
                        if (isVoidPromise) {
                            p = voidPromise;
                        } else {
                            p = ctx.newPromise();
                        }
                        ctx.write(out.getUnsafe(i), p);
                    }
                    ctx.write(out.getUnsafe(sizeMinusOne), promise);
                }
                out.recycle();
            }
        }
    }

   
    protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

  MessageToMessageEncoder类里也只做了三件事:1:判断当前的对象是否需要进行encoder。2:调用子类encoder方法对对象进行encoder。3:将encoder好了的对象调用发送逻辑。下面是StringEncoder源码:

public class StringEncoder extends MessageToMessageEncoder<CharSequence> {

    // TODO Use CharsetEncoder instead.
    private final Charset charset;

    /**
     * Creates a new instance with the current system character set.
     */
    public StringEncoder() {
        this(Charset.defaultCharset());
    }

    /**
     * Creates a new instance with the specified character set.
     */
    public StringEncoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
        if (msg.length() == 0) {
            return;
        }
//根据 charset将String转成ByteBuf对象
        out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
    }
}

ChannelPipeline

  ChannelPipeline用于组织ChannelHandlerContext(内部含有ChannelHandler),在Netty里采用的是双端链表的方式来管理ChannelHandlerContext。在ChannelPipeline里提供了各种对双端链表处理的方法,同时也提供了各种触发ChannelHandlerContext的方法,比如:fireChannelActive方法,下面是部分源码:

public class DefaultChannelPipeline implements ChannelPipeline {

//双端链表的head对象
    final AbstractChannelHandlerContext head;
//双端链表的tail对象
    final AbstractChannelHandlerContext tail;
//持用的channel对象
    private final Channel channel;

    private final ChannelFuture succeededFuture;
    private final VoidChannelPromise voidPromise;
    private final boolean touch = ResourceLeakDetector.isEnabled();

    private Map<EventExecutorGroup, EventExecutor> childExecutors;
    private volatile MessageSizeEstimator.Handle estimatorHandle;
    private boolean firstRegistration = true;

//的链表的未位增加一个ChannelHandler 
    public final ChannelPipeline addLast(ChannelHandler handler) {
        return addLast(null, handler);
    }

//的链表的未位增加一个ChannelHandler ,需要传入这个ChannelHandler的名称
    @Override
    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }

//最终会调用到这个方法来对channelHandler处理
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
//这是一个同步方法,需要锁住这个pipeline对象
        synchronized (this) {
//参数合法性验证
            checkMultiplicity(handler);
//这里会将ChannelHandler 包装成ChannelHandlerContext对象,这也就是为什么双端链表里存的是ChannelHandlerContext啦其中filterName会对为null的name生成一个名称
            newCtx = newContext(group, filterName(name, handler), handler);
//这里才是具体处理链表的方法啦
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.

            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
//下面的方法是对链表进行操作的代码
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

//pipeline里提供了类似fireChannelActive方法,这些方法最络会调用到channelHandler对应的方法上
    @Override
    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }
}

ChannelHandlerContext

  ChannelHandlerContext对于连接ChannelHandler与ChannelPipeline。
ChannelHandlerContext内部持有ChannelHandler对象,同时又是ChannelPipeline链表里的节点,串起了ChannelPipeline的整个逻辑,下面来看看ChannelHandlerContext最重要的类AbstractChannelHandlerContext源码:

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
//当前ChannelHandlerContext指向的下一个ChannelHandlerContext
    volatile AbstractChannelHandlerContext next;
//当前ChannelHandlerContext指向的前一个ChannelHandlerContext
    volatile AbstractChannelHandlerContext prev;
//用于标识channelHanlder是否为inbound
    private final boolean inbound;
//用于标识channelHanlder是否为outbound
    private final boolean outbound;
//同时也持胡pipeline对象
    private final DefaultChannelPipeline pipeline;
//channelHandler取的名称
    private final String name;
//是否需要排序
    private final boolean ordered;

//构造方法如下
    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

//这个方法是个static方法,用于给pipeline对象调用,
    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
//会触发ChannelHandlerContext的invokeChannelActive方法
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }

//active的逻辑会调用到这个方法里
    private void invokeChannelActive() {
//确认当前channelhandler的状态 
        if (invokeHandler()) {
            try {
//最络会调用到channelhandler的channelActive方法,其中handler()方法是留给子类实现的可以看DefaultChannelHandlerContext源码部分
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }

}

DefaultChannelHandlerContext源码就很简单了,提供了一个handler方法用于得到当前的ChannelHandler和判断当前ChannelHandler的类型。代码如下:

package io.netty.channel;

import io.netty.util.concurrent.EventExecutor;

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
}

ChannelHandlerContext里作为ChannelPipeline的链表节点,决定着事件是否进行向下流转,如果想让事件向下流转,只需要通过ChannelHandlerContext调用相应的fire方法就行了

上一篇下一篇

猜你喜欢

热点阅读