异步编程NIO(Netty, Vert.x, Akka和Node.js)深入浅出Netty

Netty通用TCP黏包解决方案--LengthFieldBas

2018-10-07  本文已影响0人  L千年老妖

前言

TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,往往采用如下4种方式。

  1. 消息长度固定:累计读取到固定长度为LENGTH之后就认为读取到了一个完整的消息。然后将计数器复位,重新开始读下一个数据报文。
  2. 回车换行符作为消息结束符:在文本协议中应用比较广泛。
  3. 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符。
  4. 通过在消息头中定义长度字段来标示消息的总长度。

netty中针对这四种场景均有对应的解码器作为解决方案,比如:

  1. 通过FixedLengthFrameDecoder 定长解码器来解决定长消息的黏包问题;
  2. 通过LineBasedFrameDecoder和StringDecoder来解决以回车换行符作为消息结束符的TCP黏包的问题;
  3. 通过DelimiterBasedFrameDecoder 特殊分隔符解码器来解决以特殊符号作为消息结束符的TCP黏包问题;
  4. 最后一种,也是本文的重点,通过LengthFieldBasedFrameDecoder 自定义长度解码器解决TCP黏包问题。

大多数的协议都会在协议头中携带长度字段,用于标识消息体或则整包消息的长度。LengthFieldBasedFrameDecoder通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息,只要传入正确的参数,就可以轻松解决“黏包”的问题。

LengthFieldBasedFrameDecoder配置

  1. lengthFieldOffset: 长度字段的偏差
  2. lengthFieldLength: 长度字段占的字节数
  3. lengthAdjustment: 添加到长度字段的补偿值
  4. initialBytesToStrip: 从解码帧中第一次去除的字节数

范例1:2bytes长度字段+消息体,保留长度字段

ex1.png

在解码前字节缓冲区占了14个字节,其中前两个字节是标识长度的字节,length=12,表示后面12个字节的消息体长度。

lengthFieldOffset = 0  // 第一个字段就是长度字段,偏移为0
lengthFieldLength = 2  // 长度字段length为2bytes
lengthAdjustment = 0   // 不需要进行调整
initialBytesToStrip = 0  // 不忽略长度字段

范例2:2bytes长度字段+消息体,去除长度字段

ex2.png
因为我们可以通过调用ByteBuf#readableBytes()来获取内容的长度,因此便可以指定initialBytesToStrip去除长度字段。
lengthFieldOffset = 0  // 第一个字段就是长度字段,偏移为0
lengthFieldLength = 2  // 长度字段length为2bytes
lengthAdjustment = 0   // 不需要进行调整
initialBytesToStrip = 2  // 忽略长度字段(the length of the Length field)

范例3:2bytes长度字段(标识整个消息的长度)+消息体,保留长度字段

在大多数的应用场景中,长度字段仅用来标识消息体的长度,这类协议通常由消息长度字段+消息体组成,如上图所示的几个例子。但是,对于某些协议,长度字段还包含了消息头的长度。在这种应用场景中,往往需要使用lengthAdjustment进行修正。由于整个消息(包含消息头)的长度往往大于消息体的长度,所以,lengthAdjustment为负数。下图展示了通过指定lengthAdjustment字段来包含消息头的长度:

lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = -2
initialBytesToStrip = 0
ex3.png

范例4:两个请求头,长度字段偏移1byte+消息体,保留长度字段

由于协议的种类繁多,并不是所有的协议都将长度字段放在消息头的首位,当标识消息长度的字段位于消息头的中间或者尾部时,需要使用lengthFieldOffset字段进行标识,下面的参数组合给出了如何解决消息长度字段不在首位的问题:

lengthFieldOffset = 2
lengthFieldLength = 3
lengthAdjustment = 0
initialBytesToStrip = 0

由于消息头1的长度为2,所以长度字段的偏移量为2;消息长度字段Length为3,所以lengthFieldLength值为3。由于长度字段仅仅标识消息体的长度,所以lengthAdjustment和initialBytesToStrip都为0。


ex4.png

范例5:两个请求头,长度字段+第二个请求头+消息体,保留长度字段

长度字段和消息体之前还存在其他请求头,可以指定lengthAdjustment将额外的头长度计入帧长度计算:

lengthFieldOffset = 0
lengthFieldLength = 3
lengthAdjustment = 2(the length of Header 1)
initialBytesToStrip = 0
ex5.png

范例6:2byte长度字段位于两个2byte字段中间,去除第一个字段和长度字段

长度字段夹在两个消息头之间或者长度字段位于消息头的中间,前后都有其它消息头字段,在这种场景下如果想忽略长度字段以及其前面的其它消息头字段,通过initialBytesToStrip参数来跳过要忽略的字节长度,它的组合配置示意如下:
lengthFieldOffset = 1 (the length of HDR1)
lengthFieldLength = 2
lengthAdjustment = 1 (the length of HDR2)
initialBytesToStrip = 3 (the length of HDR1 + LEN)

image.png

范例7:2byte长度字段(标识整个消息的长度)位于两个2byte字段中间,去除第一个字段和长度字段

和范例6相似,只不过长度字段为整个消息的长度,因此我们在去除前两个字段时不需要考虑HDR2的长度,只需要调整HDR1 + LEN的字节长度。

lengthFieldOffset = 1
lengthFieldLength = 2
lengthAdjustment = -3(the length of HDR1 + LEN)
initialBytesToStrip = 3
ex7.png

事实上,通过4个参数的不同组合,可以达到不同的解码效果,用户在使用过程中可以根据业务的实际情况进行灵活调整。

实战[nodejs客户端,netty服务端]自定义编解码

自定义消息 AccessReq:

/*--- 消息头 ---*/
// 消息类型,用来区分协议
private byte flag;
// 消息主体总长度
private int len;

/*--- 消息主体 ---*/
// 消息名长度
private int msgNameLen;
// 消息名
private String msgName;
// 消息体
private String body;

长度字段前有一个1字节falg,lengthFieldOffset=1,长度字段4字节,lengthFieldLength=4,要保留整个消息,lengthAdjustment,initialBytesToStrip都为0

服务端解码并回消息:

ByteBuf buf = (ByteBuf) msg;
flag = buf.readByte();
len = buf.readInt();
msgNameLen = buf.readInt();
msgName = buf.readCharSequence(msgNameLen, CharsetUtil.UTF_8).toString();
body = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8).toString();

```
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

    AccessReq req = new AccessReq(msg);

    log.info("[SOCKET REQ-{}]:{}", req.getMsgName(), req.getBody());

    String obj = deelMsg(req);
    ByteBuf buf = Unpooled.copiedBuffer(obj, CharsetUtil.UTF_8);
    ctx.writeAndFlush(buf);
}
private String deelMsg(AccessReq req) {
    return "hello netty";
}

nodejs编码并发送

const net = require("net");
const socket = new net.Socket();
const client = socket.connect(
   4200,
   "127.0.0.1",
   function() {
       console.log("connect success");
       const name = "req";
       const body = "json";
       const nl = Buffer.byteLength(name);
       const bl = Buffer.byteLength(body);
       const buf = Buffer.alloc(bl + nl + 9);

       // 消息头
       buf.write("G"); // 消息类型  默认G,scoket连接
       buf.writeInt32BE(bl + nl + 4, 1); // 消息体总长度
       // 消息体
       buf.writeInt32BE(nl, 5); // 消息名长度
       buf.write(name, 9); // 消息名
       buf.write(body, nl + 9); // 消息体
       client.write(buf);
       console.log(buf);
    
   }
);
client.on("data", data => {
    console.log("[RECV] " + data);
});

打印结果

server:

2018-08-01 11:25:12.525  INFO 3884 --- [ntLoopGroup-5-1] c.c.g.a.h.n.NativeSocketHandler          : - U: [SOCKET REQ-req]:json

client:

chaoyer:netty-client apple$ node client.js
connect success
<Buffer 47 00 00 00 0b 00 00 00 03 72 65 71 6a 73 6f 6e>
[RECV] hello netty

LengthFieldBasedFrameDecoder源码解析

下面我们就来看看基于消息长度的半包解码器,首先看看入口方法:

protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
      
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}

内部调用decode(ChannelHandlerContext ctx, ByteBuf in) 如果解码成功,就将其加入到输出的List out列表中。该函数较长我们还是分几部分来分析:
(1)判断discardingTooLongFrame标识,看是否需要丢弃当前可读的字节缓冲区,如果为真,则执行求其操作。

if (discardingTooLongFrame) {
    //获取需要丢弃的长度
    long bytesToDiscard = this.bytesToDiscard;
    //丢弃的长度不能超过当前缓冲区可读的字节数
    int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
    //跳过需要忽略的字节长度
    in.skipBytes(localBytesToDiscard);
    //bytesToDiscard减去已经忽略的字节长度
    bytesToDiscard -= localBytesToDiscard;
    this.bytesToDiscard = bytesToDiscard;
    failIfNecessary(false);
}

(2)对当前缓冲区中可读字节数和长度偏移量进行对比,如果小于偏移量,谁明缓冲区数据报不够,直接返回null.

//数据报内数据不够,返回null,由IO线程继续读取数据。  
if (in.readableBytes() < lengthFieldEndOffset) {
    return null;
}

int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);

if (frameLength < 0) {
    in.skipBytes(lengthFieldEndOffset);
    throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength);
}

frameLength += lengthAdjustment + lengthFieldEndOffset;

if (frameLength < lengthFieldEndOffset) {
    in.skipBytes(lengthFieldEndOffset);
    throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less than lengthFieldEndOffset: " + lengthFieldEndOffset);
}

其实核心就是:对消息进行解码,解码之后将解码后的字节数据放到一个新的ByteBuf中返回,并更新原来的消息msg对象的读写索引值。

上一篇下一篇

猜你喜欢

热点阅读