Netty ReplayingDecoder 源码分析与特性解读

2021-01-12  本文已影响0人  嘟嘟碰碰叮叮当当

转自:https://blog.csdn.net/wzq6578702/article/details/78826494

在介绍ReplayingDecoder之前 想看一下它的用法,构建一个服务端和客户端的模型:

服务端:

public class MyServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new MyServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        }finally{
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务端initializer:

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipline = ch.pipeline();
        pipline.addLast(new MyReplayingDecoder());//使用ReplayingDecoder
        pipline.addLast(new MyLongToByteEncoder());
        pipline.addLast(new MyServerHandler());
    }
}

ServerHandler:

public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+" --> "+msg);
        ctx.writeAndFlush(654321L);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

MyReplayingDecoder:

public class MyReplayingDecoder extends ReplayingDecoder<Void> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyReplayingDecoder decode invoked!");
        //注意没有判断字节数!!!!
        out.add(in.readLong());
    }
}

MyLongToByteEncoder

public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
        System.out.println("encode invoked");
        System.out.println(msg);
        out.writeLong(msg);
    }
}

客户端:

public class Myclient {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyClientIniatializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            channelFuture.channel().writeAndFlush("hello");
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

客户端Iniatializer:

public class MyClientIniatializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipline = ch.pipeline();

        pipline.addLast(new MyReplayingDecoder());
        pipline.addLast(new MyLongToByteEncoder());
        pipline.addLast(new MyClientHandler());
    }
}

客户端Handler:

public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress());
        System.out.println("client output "+msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(123456L);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}

运行服务端,之后运行客户端:
服务端输出结果:

MyReplayingDecoder decode invoked!
/127.0.0.1:4448 --> 123456
encode invoked
654321

客户端输出结果:

encode invoked
123456
MyReplayingDecoder decode invoked!
localhost/127.0.0.1:8899
client output 654321

加了下,数据传输流程分析

/**
 * describe:  1. client channelActive 发送12345
 *            2. client 触发encode方法发送到server端
 *            3. server 触发decode方法
 *            4. server channelRead0
 *            5. server write flush 发送出去
 *            6. server 触发encode方法发送到客户端
 *            7. clinet 触发decode方法
 *            8. clinet 触发channelRead0
 */

ReplayingDecoder java doc

ByteToMessageDecoder一种特殊变体,它可以在阻塞I / O范例中实现非阻塞解码器。
最大的区别ReplayingDecoder和ByteToMessageDecoder是ReplayingDecoder可以
让你实现decode()和decodeLast()方法,就像已经获得所有所需的字节,而不是检查
所需的字节的可用性。

例如,以下ByteToMessageDecoder实现:
   public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
  
      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
  
       if (buf.readableBytes() < 4) {
          return;
       }
  
       buf.markReaderIndex();
       int length = buf.readInt();
  
       if (buf.readableBytes() < length) {
          buf.resetReaderIndex();
          return;
       }
  
       out.add(buf.readBytes(length));
     }
   }
使用ReplayingDecoder了如下简化:
   public class IntegerHeaderFrameDecoder
        extends ReplayingDecoder<Void> {
  
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf) throws Exception {
  
       out.add(buf.readBytes(buf.readInt()));
     }
   }
这是如何运作的?

ReplayingDecoder通过一个专门的ByteBuf实现其抛出一个Error的某些类型的时候有没有在缓冲区足够的数据。 在上面的IntegerHeaderFrameDecoder ,您仅假设调用buf.readInt()时缓冲区中将有4个或更多字节。 如果缓冲区中确实有4个字节,它将按预期返回整数标头。 否则,将引发Error并将控件返回给ReplayingDecoder 。 如果ReplayingDecoder捕获到Error ,则它将把缓冲区的readerIndex倒回到“初始”位置(即缓冲区的开头),并在缓冲区中收到更多数据时再次调用readerIndex decode(..)方法。
请注意, ReplayingDecoder始终会抛出相同的缓存Error实例,以避免创建新Error并为每次抛出填充其堆栈跟踪的开销。

局限性

以简单为代价, ReplayingDecoder强制您执行一些限制:
禁止某些缓冲区操作。
如果网络速度慢且消息格式复杂,则性能可能会变差,这与上面的示例不同。 在这种情况下,您的解码器可能不得不一遍又一遍地解码消息的相同部分。
您必须记住,可以多次调用decode(..)方法来解码一条消息。 例如,以下代码将不起作用:

 public class MyDecoder extends ReplayingDecoder<Void> {
  
     private final Queue<Integer> values = new LinkedList<Integer>();
  
      @Override
     public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
  
       // A message contains 2 integers.
       values.offer(buf.readInt());
       values.offer(buf.readInt());
  
       // This assertion will fail intermittently since values.offer()
       // can be called more than two times!
       assert values.size() == 2;
       out.add(values.poll() + values.poll());
     }
   }

正确的实现如下所示,您还可以利用“检查点”功能,下一部分将对此进行详细说明。

 public class MyDecoder extends ReplayingDecoder<Void> {
  
     private final Queue<Integer> values = new LinkedList<Integer>();
  
      @Override
     public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
  
       // Revert the state of the variable that might have been changed
       // since the last partial decode.
       values.clear();
  
       // A message contains 2 integers.
       values.offer(buf.readInt());
       values.offer(buf.readInt());
  
       // Now we know this assertion will never fail.
       assert values.size() == 2;
       out.add(values.poll() + values.poll());
     }
   }
改善表现

幸运的是,使用checkpoint()方法可以显着提高复杂解码器实现的性能。
该checkpoint()这样方法更新缓冲区的“初始”位置ReplayingDecoder倒回readerIndex缓冲剂与在那里你叫的最后一个位置checkpoint()方法。

用Enum调用checkpoint(T)

尽管您可以只使用checkpoint()方法并自己管理解码器的状态,
但是管理解码器状态的最简单方法是
创建一个代表解码器当前状态的Enum类型并调用checkpoint(T)状态改变时的方法。
根据要解码的消息的复杂性,可以根据需要设置任意多个状态:

   public enum MyDecoderState {
     READ_LENGTH,
     READ_CONTENT;
   }
  
   public class IntegerHeaderFrameDecoder
        extends ReplayingDecoder<MyDecoderState> {
  
     private int length;
  
     public IntegerHeaderFrameDecoder() {
       // Set the initial state.
       super(MyDecoderState.READ_LENGTH);
     }
  
      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
       switch (state()) {
       case READ_LENGTH:
         length = buf.readInt();
         checkpoint(MyDecoderState.READ_CONTENT);
       case READ_CONTENT:
         ByteBuf frame = buf.readBytes(length);
         checkpoint(MyDecoderState.READ_LENGTH);
         out.add(frame);
         break;
       default:
         throw new Error("Shouldn't reach here.");
       }
     }
   }
没有参数调用checkpoint()

管理解码器状态的另一种方法是自己管理。

   public class IntegerHeaderFrameDecoder
        extends ReplayingDecoder<Void> {
  
     private boolean readLength;
     private int length;
  
      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
       if (!readLength) {
         length = buf.readInt();
         readLength = true;
         checkpoint();
       }
  
       if (readLength) {
         ByteBuf frame = buf.readBytes(length);
         readLength = false;
         checkpoint();
         out.add(frame);
       }
     }
   }
用流水线中的另一个解码器替换一个解码器

如果你打算写一个协议复用器,你可能会想更换ReplayingDecoder与另一个(协议检测) ReplayingDecoder , ByteToMessageDecoder或MessageToMessageDecoder (实际协议解码器)。 不能仅通过调用ChannelPipeline.replace(ChannelHandler, String, ChannelHandler)来实现此目的,但是需要一些其他步骤:


   public class FirstDecoder extends ReplayingDecoder<Void> {
  
        @Override
       protected void decode(ChannelHandlerContext ctx,
                               ByteBuf buf, List<Object> out) {
           ...
           // Decode the first message
           Object firstMessage = ...;
  
           // Add the second decoder
           ctx.pipeline().addLast("second", new SecondDecoder());
  
           if (buf.isReadable()) {
               // Hand off the remaining data to the second decoder
               out.add(firstMessage);
               out.add(buf.readBytes(super.actualReadableBytes()));
           } else {
               // Nothing to hand off
               out.add(firstMessage);
           }
           // Remove the first decoder (me)
           ctx.pipeline().remove(this);
       }

几种常见的编解码器:

LineBasedFrameDecoder

解码器,将接收到的ByteBuf在行尾拆分。
"\n"和"\r\n"都被处理。 有关基于分隔符的更通用解码器,请参见DelimiterBasedFrameDecoder 

FixedLengthFrameDecoder

解码器,将接收到的ByteBuf为固定的字节数。 例如,如果您收到以下四个分段的数据包:
   +---+----+------+----+
   | A | BC | DEFG | HI |
   +---+----+------+----+
   
FixedLengthFrameDecoder (3)会将它们解码为以下三个具有固定长度的数据包:
   +-----+-----+-----+
   | ABC | DEF | GHI |
   +-----+-----+-----+
   

DelimiterBasedFrameDecoder

一种解码器, ByteBuf通过一个或多个定界符对接收到的ByteBuf拆分。 这对于解码以分隔符(例如NUL或换行符)结尾的帧特别有用。
预定义的分隔符
为了方便起见, Delimiters定义了常用的定界符。
指定多个定界符
DelimiterBasedFrameDecoder允许您指定多个定界符。 如果在缓冲区中找到多个定界符,它将选择产生`最短帧`的定界符。 例如,如果缓冲区中包含以下数据:
   +--------------+
   | ABC\nDEF\r\n |
   +--------------+
   
DelimiterBasedFrameDecoder ( Delimiters.lineDelimiter() )将选择'\n'作为第一个定界符并产生两个帧:
   +-----+-----+
   | ABC | DEF |
   +-----+-----+
   
而不是错误地选择'\r\n'作为第一个定界符:
   +----------+
   | ABC\nDEF |
   +----------+
   

LengthFieldBasedFrameDecoder

解码器按消息中的length字段的值动态拆分接收到的ByteBuf 。 当您解码二进制消息时,此消息特别有用,该二进制消息具有代表消息正文或整个消息长度的整数头字段。
LengthFieldBasedFrameDecoder具有许多配置参数,因此它可以解码带有长度字段的任何消息,这在专有的客户端-服务器协议中经常出现。 以下是一些示例,可让您基本了解哪个选项可以执行什么操作。
2字节长度的字段,偏移量为0,不剥离标题
在此示例中,长度字段的值为12(0x0C) ,代表“ HELLO,WORLD”的长度。 默认情况下,解码器假定length字段表示在length字段之后的字节数。 因此,可以使用简单的参数组合对其进行解码。
   lengthFieldOffset   = 0
   lengthFieldLength   = 2
   lengthAdjustment    = 0
   initialBytesToStrip = 0 (= do not strip header)
  
   BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
   +--------+----------------+      +--------+----------------+
   | Length | Actual Content |----->| Length | Actual Content |
   | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
   +--------+----------------+      +--------+----------------+
   
2个字节的长度字段,偏移量为0,带头
因为我们可以通过调用ByteBuf.readableBytes()获得内容的长度,所以您可能希望通过指定initialBytesToStrip来剥离长度字段。 在此示例中,我们指定2 ,它与length字段的长度相同,以剥离前两个字节。
   lengthFieldOffset   = 0
   lengthFieldLength   = 2
   lengthAdjustment    = 0
   initialBytesToStrip = 2 (= the length of the Length field)
  
   BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
   +--------+----------------+      +----------------+
   | Length | Actual Content |----->| Actual Content |
   | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
   +--------+----------------+      +----------------+
   
2个字节的长度字段,偏移量为0,不剥离标题,该长度字段表示整个消息的长度
在大多数情况下,长度字段仅表示消息正文的长度,如前面的示例所示。 但是,在某些协议中,长度字段表示整个消息的长度,包括消息头。 在这种情况下,我们指定一个非零的lengthAdjustment 。 因为此示例消息中的length值总是比主体长度大2 ,所以我们将-2指定为lengthAdjustment以进行补偿。
   lengthFieldOffset   =  0
   lengthFieldLength   =  2
   lengthAdjustment    = -2 (= the length of the Length field)
   initialBytesToStrip =  0
  
   BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
   +--------+----------------+      +--------+----------------+
   | Length | Actual Content |----->| Length | Actual Content |
   | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
   +--------+----------------+      +--------+----------------+
   
3个字节的长度字段位于5个字节的报头末尾,不剥离报头
以下消息是第一个示例的简单变体。 该消息之前会附加一个额外的标头值。 lengthAdjustment再次为零,因为解码器始终在帧长度计算过程中考虑前置数据的长度。
   lengthFieldOffset   = 2 (= the length of Header 1)
   lengthFieldLength   = 3
   lengthAdjustment    = 0
   initialBytesToStrip = 0
  
   BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
   +----------+----------+----------------+      +----------+----------+----------------+
   | Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
   |  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
   +----------+----------+----------------+      +----------+----------+----------------+
   
5字节标题开头的3字节长度字段,不剥离标题
这是一个高级示例,显示了在length字段和消息正文之间存在一个额外的标头的情况。 您必须指定一个正的lengthAdjustment,以便解码器将额外的标头计入帧长度计算中。
   lengthFieldOffset   = 0
   lengthFieldLength   = 3
   lengthAdjustment    = 2 (= the length of Header 1)
   initialBytesToStrip = 0
  
   BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
   +----------+----------+----------------+      +----------+----------+----------------+
   |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
   | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
   +----------+----------+----------------+      +----------+----------+----------------+
   
2个字节的长度字段在4字节标题的中间偏移量1处,除去第一个标题字段和长度字段
这是以上所有示例的组合。 在length字段之前有前置报头,在length字段之后有多余的报头。 前置标头会影响lengthFieldOffset ,额外标头会影响lengthAdjustment 。 我们还指定了一个非零的initialBytesToStrip来从帧中剥离长度字段和前置标头。 如果不想剥离前置标头,则可以将initialBytesToSkip指定为0 。
   lengthFieldOffset   = 1 (= the length of HDR1)
   lengthFieldLength   = 2
   lengthAdjustment    = 1 (= the length of HDR2)
   initialBytesToStrip = 3 (= the length of HDR1 + LEN)
  
   BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
   +------+--------+------+----------------+      +------+----------------+
   | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
   | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
   +------+--------+------+----------------+      +------+----------------+
   
2个字节的长度字段位于4字节标题的中间偏移量1处,去掉第一个标题字段和length字段,length字段代表整个消息的长度
让我们再对前面的示例进行一些修改。 与前一个示例的唯一区别是,长度字段表示整个消息的长度,而不是消息主体,就像第三个示例一样。 我们必须将HDR1的长度和Length计入lengthAdjustment中。 请注意,我们不需要考虑HDR2的长度,因为length字段已经包含了整个标头长度。
   lengthFieldOffset   =  1
   lengthFieldLength   =  2
   lengthAdjustment    = -3 (= the length of HDR1 + LEN, negative)
   initialBytesToStrip =  3
  
   BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
   +------+--------+------+----------------+      +------+----------------+
   | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
   | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
   +------+--------+------+----------------+      +------+----------------+
   
也可以看看:
LengthFieldPrepender
上一篇下一篇

猜你喜欢

热点阅读