netty学习系列八:拆包器
一、粘包与拆包
1、发送时的粘包与拆包
TCP连接维护了一个发送缓存区。将要发送给对端的数据会由socket API写入该发送缓存区。
TCP每次发送的报文段大小有限制,MSS就是单个TCP数据包能够承载的最大数据分段大小。
TCP为了考虑数据传输效率,会采用如下发送策略:
- 发送缓存区中存放的数据达到MSS字节时,就组装一个TCP报文段发送出去;
- 由发送方的应用进程指明要求立即发送报文段;
- 发送计时器到期,这时把缓存区中已有的数据装入报文段(但长度不能超过MSS)发送出去;
正是由于MSS和发送策略,会有以下情况:
- 用户数据包超过了mss,那么这个用户数据包在发送的时候必须拆分成多个TCP数据包,即发生拆包。
- 用户数据包有效载荷非常低,TCP的发送策略会将多个用户数据包合并为一个TCP数据包进行发送,即发生粘包。
2、接收时的粘包与拆包
由于TCP在发送数据时会发生粘包/拆包。所以接收过程也需要进行对应的粘包/拆包,以便将接收到的TCP数据包重新组装为发送端发来的原始用户数据包,并进行后续业务处理。
3、接收端进行粘包/拆包的原理
基本原理就是不断从TCP缓冲区中读取数据,并将新读取到的数据向后追加到 本地消息缓存 中,然后进行解码处理:
- 如果当前本地消息缓存中不足以拼接成一个业务数据包,那就保留数据,继续从tcp缓冲区中读取数据;
- 如果当前本地消息缓存中能够拼接成一个业务数据包,那就将对应数据解码成一个完整的业务数据包并传递给业务逻辑处理,本地消息缓存中剩余的多余数据仍然保留,以便和下次读到的数据尝试拼接。
二、netty中的拆包器
0、总述
netty 中的拆包过程原理同上,拆包器基类为ByteToMessageDecoder
,其内部有一个 累加器 ,将每次新读取到的数据不断累加到本地字节容器,然后尝试对累加后的本地字节容器中的数据进行拆包,拆成一个完整的业务数据包。
1、累加器
netty通过累加器实现将每次新读取到的数据不断累积到本地字节容器的操作。
ByteToMessageDecoder
中定义了两个累加器:
public static final Cumulator MERGE_CUMULATOR = ...;
public static final Cumulator COMPOSITE_CUMULATOR = ...;
默认情况下使用简单的MERGE_CUMULATOR
累加器,原理是每次都将读取到的数据通过内存拷贝的方式,拼接到一个大的字节容器中,这个大的字节容器即为ByteToMessageDecoder
中的cumulation。
private Cumulator cumulator = MERGE_CUMULATOR;
累加器的累加操作实现
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
- 若当前cumulation空间不足容纳新读取到的数据,则进行扩容;
ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable);
- 使用
ByteBuf.writeBytes(in);
将新数据累加到字节容器cumulation中;
2、数据读取与粘包/拆包
1、 代码入口
2、 累加新读取的数据到本地自己容器中
3、 将本地字节容器中的数据传递给业务拆包器拆包
4、 清理字节容器
5、 传递业务数据包给业务解码器处理
1)代码入口
a、NioEventLoop线程在处理IO事件的代码processSelectedKey(SelectionKey k, AbstractNioChannel ch)
中,对于OP_READ事件会调用相应Channel的NioByteUnsafe.read()
进行处理。
b、NioByteUnsafe.read()
会分配一个ByteBuf byteBuf
并将TCP接收缓存中的数据读取到byteBuf中,最后触发channelRead事件将byteBuf传递给pipeline中的ByteToMessageDecoder
回调处理。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
上述代码分为如下四个步骤:
1、 累加新读取的数据到本地自己容器中
2、 将本地字节容器中的数据传递给业务拆包器拆包
3、 清理字节容器
4、 传递业务数据包给业务解码器处理
2)累加新读取的数据到本地自己容器中
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
- 若当前累加器中没有数据(cumulation==null),则直接跳过内存拷贝,将字节容器cumulation的指针指向新读取的数据;
- 若当前累加器中有数据,调用累加器cumulation的```cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in)``方法累加新读取到的数据到本地字节容器。
3)将本地字节容器中的数据传递给业务拆包器拆包
CodecOutputList out = CodecOutputList.newInstance();
callDecode(ctx, cumulation, out);
1、 到这一步,本地字节容器中的数据是目前未经拆包的所有数据;
2、 callDecode
将尝试将本地字节容器的数据拆分成业务数据包,并放入业务数据包容器CodecOutputList out
中;
3、 对于业务数据包容器out,遍历其中的业务数据包,通过ctx.fireChannelRead(msg);
将每个业务数据包传递给后续处理器进行业务处理
具体的拆包工作由ByteToMessageDecode
的抽象方法定义:
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
netty中对各种用户协议的支持就体现在这个抽象方法中,所有的拆包器最终都实现了该抽象方法。
decode后,如果发现并没有拆到一个完整的数据包
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
- 若拆包器什么数据也没读取
oldInputLength == in.readableBytes()
,可能数据还不够业务拆包器处理,直接break等待新的数据; - 若拆包器已读取部分数据,说明解码器仍然在工作,继续循环解码。
4)清理字节容器
NioByteUnsafe.read()
每次处理OP_READ事件读取完数据,都会触发一次channelReadComplete事件。
ByteToMessageDecoder.channelReadComplete(ChannelHandlerContext ctx)
方法中实现了对本地字节容器的清理逻辑:
discardSomeReadBytes();
另外,为防止发送端发送数据过快ByteToMessageDecoder.channelRead
中在每次拆包过后都会做一次判断,如果读取到的数据量过多也会主动执行本地字节容器的清理逻辑:
if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
discardSomeReadBytes()之前,本地字节容器中的数据分布:
+--------------+----------+----------+
| readed | unreaded | writable |
+--------------+----------+----------+
discardSomeReadBytes()之后,本地字节容器中的数据分布:
+----------+-------------------------+
| unreaded | writable |
+----------+-------------------------+
**5)传递业务数据包给业务解码器处理**
经过上面几个步骤完成之后,就可以将拆成的业务数据包交给后续业务处理器处理了:
fireChannelRead(ctx, out, size);
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
#三、拆包器的具体实现类
LineBasedFrameDecoder -> 根据换行符\n或\r\n进行拆包
DelimiterBasedFrameDecoder -> 根据用户定义的标识符进行拆包
LengthFieldBasedFrameDecoder -> 根据包头长度进行拆包,适用于私有协议解码