Netty ReplayingDecoder 源码分析与特性解读
转自:https://blog.csdn.net/wzq6578702/article/details/78826494
在介绍ReplayingDecoder之前 想看一下它的用法,构建一个服务端和客户端的模型:
服务端:
public class MyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端initializer:
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipline = ch.pipeline();
pipline.addLast(new MyReplayingDecoder());//使用ReplayingDecoder
pipline.addLast(new MyLongToByteEncoder());
pipline.addLast(new MyServerHandler());
}
}
ServerHandler:
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println(ctx.channel().remoteAddress()+" --> "+msg);
ctx.writeAndFlush(654321L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
MyReplayingDecoder:
public class MyReplayingDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyReplayingDecoder decode invoked!");
//注意没有判断字节数!!!!
out.add(in.readLong());
}
}
MyLongToByteEncoder
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("encode invoked");
System.out.println(msg);
out.writeLong(msg);
}
}
客户端:
public class Myclient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyClientIniatializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
channelFuture.channel().writeAndFlush("hello");
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
客户端Iniatializer:
public class MyClientIniatializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipline = ch.pipeline();
pipline.addLast(new MyReplayingDecoder());
pipline.addLast(new MyLongToByteEncoder());
pipline.addLast(new MyClientHandler());
}
}
客户端Handler:
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println(ctx.channel().remoteAddress());
System.out.println("client output "+msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(123456L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
}
运行服务端,之后运行客户端:
服务端输出结果:
MyReplayingDecoder decode invoked!
/127.0.0.1:4448 --> 123456
encode invoked
654321
客户端输出结果:
encode invoked
123456
MyReplayingDecoder decode invoked!
localhost/127.0.0.1:8899
client output 654321
加了下,数据传输流程分析
/**
* describe: 1. client channelActive 发送12345
* 2. client 触发encode方法发送到server端
* 3. server 触发decode方法
* 4. server channelRead0
* 5. server write flush 发送出去
* 6. server 触发encode方法发送到客户端
* 7. clinet 触发decode方法
* 8. clinet 触发channelRead0
*/
ReplayingDecoder java doc
ByteToMessageDecoder一种特殊变体,它可以在阻塞I / O范例中实现非阻塞解码器。
最大的区别ReplayingDecoder和ByteToMessageDecoder是ReplayingDecoder可以
让你实现decode()和decodeLast()方法,就像已经获得所有所需的字节,而不是检查
所需的字节的可用性。
例如,以下ByteToMessageDecoder实现:
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
if (buf.readableBytes() < 4) {
return;
}
buf.markReaderIndex();
int length = buf.readInt();
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return;
}
out.add(buf.readBytes(length));
}
}
使用ReplayingDecoder了如下简化:
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<Void> {
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf) throws Exception {
out.add(buf.readBytes(buf.readInt()));
}
}
这是如何运作的?
ReplayingDecoder通过一个专门的ByteBuf实现其抛出一个Error的某些类型的时候有没有在缓冲区足够的数据。 在上面的IntegerHeaderFrameDecoder ,您仅假设调用buf.readInt()时缓冲区中将有4个或更多字节。 如果缓冲区中确实有4个字节,它将按预期返回整数标头。 否则,将引发Error并将控件返回给ReplayingDecoder 。 如果ReplayingDecoder捕获到Error ,则它将把缓冲区的readerIndex倒回到“初始”位置(即缓冲区的开头),并在缓冲区中收到更多数据时再次调用readerIndex decode(..)方法。
请注意, ReplayingDecoder始终会抛出相同的缓存Error实例,以避免创建新Error并为每次抛出填充其堆栈跟踪的开销。
局限性
以简单为代价, ReplayingDecoder强制您执行一些限制:
禁止某些缓冲区操作。
如果网络速度慢且消息格式复杂,则性能可能会变差,这与上面的示例不同。 在这种情况下,您的解码器可能不得不一遍又一遍地解码消息的相同部分。
您必须记住,可以多次调用decode(..)方法来解码一条消息。 例如,以下代码将不起作用:
public class MyDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<Integer>();
@Override
public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
// This assertion will fail intermittently since values.offer()
// can be called more than two times!
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
正确的实现如下所示,您还可以利用“检查点”功能,下一部分将对此进行详细说明。
public class MyDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<Integer>();
@Override
public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
// Revert the state of the variable that might have been changed
// since the last partial decode.
values.clear();
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
// Now we know this assertion will never fail.
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
改善表现
幸运的是,使用checkpoint()方法可以显着提高复杂解码器实现的性能。
该checkpoint()这样方法更新缓冲区的“初始”位置ReplayingDecoder倒回readerIndex缓冲剂与在那里你叫的最后一个位置checkpoint()方法。
用Enum调用checkpoint(T)
尽管您可以只使用checkpoint()方法并自己管理解码器的状态,
但是管理解码器状态的最简单方法是
创建一个代表解码器当前状态的Enum类型并调用checkpoint(T)状态改变时的方法。
根据要解码的消息的复杂性,可以根据需要设置任意多个状态:
public enum MyDecoderState {
READ_LENGTH,
READ_CONTENT;
}
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<MyDecoderState> {
private int length;
public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
switch (state()) {
case READ_LENGTH:
length = buf.readInt();
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
out.add(frame);
break;
default:
throw new Error("Shouldn't reach here.");
}
}
}
没有参数调用checkpoint()
管理解码器状态的另一种方法是自己管理。
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<Void> {
private boolean readLength;
private int length;
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
if (!readLength) {
length = buf.readInt();
readLength = true;
checkpoint();
}
if (readLength) {
ByteBuf frame = buf.readBytes(length);
readLength = false;
checkpoint();
out.add(frame);
}
}
}
用流水线中的另一个解码器替换一个解码器
如果你打算写一个协议复用器,你可能会想更换ReplayingDecoder与另一个(协议检测) ReplayingDecoder , ByteToMessageDecoder或MessageToMessageDecoder (实际协议解码器)。 不能仅通过调用ChannelPipeline.replace(ChannelHandler, String, ChannelHandler)来实现此目的,但是需要一些其他步骤:
public class FirstDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) {
...
// Decode the first message
Object firstMessage = ...;
// Add the second decoder
ctx.pipeline().addLast("second", new SecondDecoder());
if (buf.isReadable()) {
// Hand off the remaining data to the second decoder
out.add(firstMessage);
out.add(buf.readBytes(super.actualReadableBytes()));
} else {
// Nothing to hand off
out.add(firstMessage);
}
// Remove the first decoder (me)
ctx.pipeline().remove(this);
}
几种常见的编解码器:
LineBasedFrameDecoder
解码器,将接收到的ByteBuf在行尾拆分。
"\n"和"\r\n"都被处理。 有关基于分隔符的更通用解码器,请参见DelimiterBasedFrameDecoder
FixedLengthFrameDecoder
解码器,将接收到的ByteBuf为固定的字节数。 例如,如果您收到以下四个分段的数据包:
+---+----+------+----+
| A | BC | DEFG | HI |
+---+----+------+----+
FixedLengthFrameDecoder (3)会将它们解码为以下三个具有固定长度的数据包:
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
DelimiterBasedFrameDecoder
一种解码器, ByteBuf通过一个或多个定界符对接收到的ByteBuf拆分。 这对于解码以分隔符(例如NUL或换行符)结尾的帧特别有用。
预定义的分隔符
为了方便起见, Delimiters定义了常用的定界符。
指定多个定界符
DelimiterBasedFrameDecoder允许您指定多个定界符。 如果在缓冲区中找到多个定界符,它将选择产生`最短帧`的定界符。 例如,如果缓冲区中包含以下数据:
+--------------+
| ABC\nDEF\r\n |
+--------------+
DelimiterBasedFrameDecoder ( Delimiters.lineDelimiter() )将选择'\n'作为第一个定界符并产生两个帧:
+-----+-----+
| ABC | DEF |
+-----+-----+
而不是错误地选择'\r\n'作为第一个定界符:
+----------+
| ABC\nDEF |
+----------+
LengthFieldBasedFrameDecoder
解码器按消息中的length字段的值动态拆分接收到的ByteBuf 。 当您解码二进制消息时,此消息特别有用,该二进制消息具有代表消息正文或整个消息长度的整数头字段。
LengthFieldBasedFrameDecoder具有许多配置参数,因此它可以解码带有长度字段的任何消息,这在专有的客户端-服务器协议中经常出现。 以下是一些示例,可让您基本了解哪个选项可以执行什么操作。
2字节长度的字段,偏移量为0,不剥离标题
在此示例中,长度字段的值为12(0x0C) ,代表“ HELLO,WORLD”的长度。 默认情况下,解码器假定length字段表示在length字段之后的字节数。 因此,可以使用简单的参数组合对其进行解码。
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 0 (= do not strip header)
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
2个字节的长度字段,偏移量为0,带头
因为我们可以通过调用ByteBuf.readableBytes()获得内容的长度,所以您可能希望通过指定initialBytesToStrip来剥离长度字段。 在此示例中,我们指定2 ,它与length字段的长度相同,以剥离前两个字节。
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 2 (= the length of the Length field)
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+
2个字节的长度字段,偏移量为0,不剥离标题,该长度字段表示整个消息的长度
在大多数情况下,长度字段仅表示消息正文的长度,如前面的示例所示。 但是,在某些协议中,长度字段表示整个消息的长度,包括消息头。 在这种情况下,我们指定一个非零的lengthAdjustment 。 因为此示例消息中的length值总是比主体长度大2 ,所以我们将-2指定为lengthAdjustment以进行补偿。
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = -2 (= the length of the Length field)
initialBytesToStrip = 0
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
3个字节的长度字段位于5个字节的报头末尾,不剥离报头
以下消息是第一个示例的简单变体。 该消息之前会附加一个额外的标头值。 lengthAdjustment再次为零,因为解码器始终在帧长度计算过程中考虑前置数据的长度。
lengthFieldOffset = 2 (= the length of Header 1)
lengthFieldLength = 3
lengthAdjustment = 0
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
| 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
5字节标题开头的3字节长度字段,不剥离标题
这是一个高级示例,显示了在length字段和消息正文之间存在一个额外的标头的情况。 您必须指定一个正的lengthAdjustment,以便解码器将额外的标头计入帧长度计算中。
lengthFieldOffset = 0
lengthFieldLength = 3
lengthAdjustment = 2 (= the length of Header 1)
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
| 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
2个字节的长度字段在4字节标题的中间偏移量1处,除去第一个标题字段和长度字段
这是以上所有示例的组合。 在length字段之前有前置报头,在length字段之后有多余的报头。 前置标头会影响lengthFieldOffset ,额外标头会影响lengthAdjustment 。 我们还指定了一个非零的initialBytesToStrip来从帧中剥离长度字段和前置标头。 如果不想剥离前置标头,则可以将initialBytesToSkip指定为0 。
lengthFieldOffset = 1 (= the length of HDR1)
lengthFieldLength = 2
lengthAdjustment = 1 (= the length of HDR2)
initialBytesToStrip = 3 (= the length of HDR1 + LEN)
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
2个字节的长度字段位于4字节标题的中间偏移量1处,去掉第一个标题字段和length字段,length字段代表整个消息的长度
让我们再对前面的示例进行一些修改。 与前一个示例的唯一区别是,长度字段表示整个消息的长度,而不是消息主体,就像第三个示例一样。 我们必须将HDR1的长度和Length计入lengthAdjustment中。 请注意,我们不需要考虑HDR2的长度,因为length字段已经包含了整个标头长度。
lengthFieldOffset = 1
lengthFieldLength = 2
lengthAdjustment = -3 (= the length of HDR1 + LEN, negative)
initialBytesToStrip = 3
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
也可以看看:
LengthFieldPrepender