深入浅出Netty源码剖析Netty学习系列netty

netty学习系列八:拆包器

2017-04-17  本文已影响723人  益文的圈

一、粘包与拆包

1、发送时的粘包与拆包

TCP连接维护了一个发送缓存区。将要发送给对端的数据会由socket API写入该发送缓存区。
TCP每次发送的报文段大小有限制,MSS就是单个TCP数据包能够承载的最大数据分段大小。
TCP为了考虑数据传输效率,会采用如下发送策略:

正是由于MSS和发送策略,会有以下情况:

2、接收时的粘包与拆包

由于TCP在发送数据时会发生粘包/拆包。所以接收过程也需要进行对应的粘包/拆包,以便将接收到的TCP数据包重新组装为发送端发来的原始用户数据包,并进行后续业务处理。

3、接收端进行粘包/拆包的原理

基本原理就是不断从TCP缓冲区中读取数据,并将新读取到的数据向后追加到 本地消息缓存 中,然后进行解码处理:

二、netty中的拆包器

0、总述

netty 中的拆包过程原理同上,拆包器基类为ByteToMessageDecoder,其内部有一个 累加器 ,将每次新读取到的数据不断累加到本地字节容器,然后尝试对累加后的本地字节容器中的数据进行拆包,拆成一个完整的业务数据包。

netty拆包过程

1、累加器

netty通过累加器实现将每次新读取到的数据不断累积到本地字节容器的操作。
ByteToMessageDecoder 中定义了两个累加器:

public static final Cumulator MERGE_CUMULATOR = ...;
public static final Cumulator COMPOSITE_CUMULATOR = ...;

默认情况下使用简单的MERGE_CUMULATOR累加器,原理是每次都将读取到的数据通过内存拷贝的方式,拼接到一个大的字节容器中,这个大的字节容器即为ByteToMessageDecoder中的cumulation。

private Cumulator cumulator = MERGE_CUMULATOR;

累加器的累加操作实现

        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            ByteBuf buffer;
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1) {
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            buffer.writeBytes(in);
            in.release();
            return buffer;
        }

ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable);

2、数据读取与粘包/拆包

1、 代码入口
2、 累加新读取的数据到本地自己容器中
3、 将本地字节容器中的数据传递给业务拆包器拆包
4、 清理字节容器
5、 传递业务数据包给业务解码器处理

1)代码入口
a、NioEventLoop线程在处理IO事件的代码processSelectedKey(SelectionKey k, AbstractNioChannel ch)中,对于OP_READ事件会调用相应Channel的NioByteUnsafe.read()进行处理。
b、NioByteUnsafe.read()会分配一个ByteBuf byteBuf并将TCP接收缓存中的数据读取到byteBuf中,最后触发channelRead事件将byteBuf传递给pipeline中的ByteToMessageDecoder回调处理。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

上述代码分为如下四个步骤:
1、 累加新读取的数据到本地自己容器中
2、 将本地字节容器中的数据传递给业务拆包器拆包
3、 清理字节容器
4、 传递业务数据包给业务解码器处理

2)累加新读取的数据到本地自己容器中

                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }

3)将本地字节容器中的数据传递给业务拆包器拆包

CodecOutputList out = CodecOutputList.newInstance();
callDecode(ctx, cumulation, out);

1、 到这一步,本地字节容器中的数据是目前未经拆包的所有数据;
2、 callDecode 将尝试将本地字节容器的数据拆分成业务数据包,并放入业务数据包容器CodecOutputList out中;
3、 对于业务数据包容器out,遍历其中的业务数据包,通过ctx.fireChannelRead(msg);将每个业务数据包传递给后续处理器进行业务处理

具体的拆包工作由ByteToMessageDecode的抽象方法定义:

protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

netty中对各种用户协议的支持就体现在这个抽象方法中,所有的拆包器最终都实现了该抽象方法。

decode后,如果发现并没有拆到一个完整的数据包

                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

4)清理字节容器
NioByteUnsafe.read()每次处理OP_READ事件读取完数据,都会触发一次channelReadComplete事件。
ByteToMessageDecoder.channelReadComplete(ChannelHandlerContext ctx)方法中实现了对本地字节容器的清理逻辑:

discardSomeReadBytes();

另外,为防止发送端发送数据过快ByteToMessageDecoder.channelRead中在每次拆包过后都会做一次判断,如果读取到的数据量过多也会主动执行本地字节容器的清理逻辑:

if (++ numReads >= discardAfterReads) {
                    numReads = 0;
                    discardSomeReadBytes();
                }

discardSomeReadBytes()之前,本地字节容器中的数据分布:

+--------------+----------+----------+
|   readed     | unreaded | writable | 
+--------------+----------+----------+

discardSomeReadBytes()之后,本地字节容器中的数据分布:

+----------+-------------------------+
| unreaded |      writable           | 
+----------+-------------------------+

**5)传递业务数据包给业务解码器处理**
经过上面几个步骤完成之后,就可以将拆成的业务数据包交给后续业务处理器处理了:

fireChannelRead(ctx, out, size);

static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
    for (int i = 0; i < numElements; i ++) {
        ctx.fireChannelRead(msgs.getUnsafe(i));
    }
}

#三、拆包器的具体实现类
LineBasedFrameDecoder  ->  根据换行符\n或\r\n进行拆包 
DelimiterBasedFrameDecoder  ->  根据用户定义的标识符进行拆包 
LengthFieldBasedFrameDecoder  ->  根据包头长度进行拆包,适用于私有协议解码
上一篇下一篇

猜你喜欢

热点阅读