Netty源码分析系列

Netty源码分析系列--13.ReplayingDecoder

2018-11-17  本文已影响16人  ted005

ReplayingDecoder的原理


public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder

继承基类ByteToMessageDecoder的方式

下面是一个用来解码带有长整型(Long)数据头head的解码器:

public class LongHeaderFrameDecoder extends ByteToMessageDecoder {
  
    @Override
    protected void decode(ChannelHandlerContext ctx,
                         ByteBuf buf, List<Object> out) throws Exception {

        //总字节数<8,不够Long的长度,返回
        if (buf.readableBytes() < 8) {
          return;
        }
        
        buf.markReaderIndex();
        //读取head的值,例如6,说明body的长度是6个字节
        int length = buf.readLong();
        
        //body的总字节数不够6,返回
        if (buf.readableBytes() < length) {
          buf.resetReaderIndex();
          return;
        }
        
        //读取6个长度的body
        out.add(buf.readBytes(length));
    }
}

从以上代码可以看出,在decode方法中需要对数据的长度做判断,依据ByteBufreaderIndex来获取真实数据,逻辑比较复杂。

继承基类ReplayingDecoder的方式

如果以上的例子选择继承ReplayingDecoder,那逻辑会非常简单。由于不存在状态管理,所以泛型使用Void

public class LongHeaderFrameDecoder extends ReplayingDecoder<Void> {
  
    @Override
    protected void decode(ChannelHandlerContext ctx,
                         ByteBuf buf, List<Object> out) throws Exception {
        // 读取head的值,例如6,说明body的长度是6个字节
        int length = buf.readLong();   
        // 读取6个长度的body
        out.add(buf.readBytes(length));
    }
}

状态管理和checkpoint方法

状态可以使用枚举Enum来表示,如:

public enum MyDecoderState {
     READ_HEAD,
     READ_BODY;
}

当调用checkpoint(MyDecoderState state)时,ReplayingDecoder会将当前readerIndex赋值给int类型的成员变量checkpoint,在后续数据读取过程中方便重置。

protected void checkpoint(S state) {
    checkpoint();
    state(state);
}

protected void checkpoint() {
    checkpoint = internalBuffer().readerIndex();
}

使用状态管理后的LongHeaderFrameDecoder:

public class LongHeaderFrameDecoder
        extends ReplayingDecoder<MyDecoderState> {
     // HEAD的长度
     private int length;
    
     public LongHeaderFrameDecoder() {
       // 初始状态是读取头部HEAD
       super(MyDecoderState.READ_LENGTH);
     }
    
      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
       switch (state()) {
           case READ_HEAD:
             length = buf.readLong();
             checkpoint(MyDecoderState.READ_BODY);
           case READ_BODY:
             ByteBuf frame = buf.readBytes(length);
             checkpoint(MyDecoderState.READ_BODY);
             out.add(frame);
             break;
           default:
             throw new Error("Shouldn't reach here.");
           }
     }
}

上一篇下一篇

猜你喜欢

热点阅读