晓我课堂

netty粘包和拆包

2021-11-19  本文已影响0人  wavefreely

粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。

拆包与粘包同时发生在数据的发送方与接收方两方。

产生原因

发送方通过网络每发送一批二进制数据包,那么这次所发送的数据包就称为一帧,即

Frame。在进行基于 TCP 的网络传输时,TCP 协议会将用户真正要发送的数据根据当前缓存

的实际情况对其进行拆分或重组,变为用于网络传输的 Frame。在 Netty 中就是将 ByteBuf

中的数据拆分或重组为二进制的 Frame。而接收方则需要将接收到的 Frame 中的数据进行重

组或拆分,重新恢复为发送方发送时的 ByteBuf 数据。

具体场景描述:

个过程称为发送拆包;接收方在接收到需要将这些 Frame 进行合并,这个合并的过程称

为接收方粘包。

的 ByteBuf 合并为一个 Frame 进行传输,这个合并的过程称为发送方的粘包;接收方在

接收到这个 Frame 后需要进行拆包,拆分出多个原来的小的 ByteBuf,这个拆分的过程

称为接收方拆包。

ByteBuf 中的一部分入入到了一个 Frame 中,另一部分被放入到了另一个 Frame 中。这

个过程就是发送方拆包。但对于将这些 ByteBuf 放入到一个 Frame 的过程,就是发送方

粘包;当接收方在接收到两个 Frame 后,对于第一个 Frame 的最后部分,与第二个 Frame

的最前部分会进行合并,这个合并的过程就是接收方粘包。但在将 Frame 中的各个

ByteBuf 拆分出来的过程,就是接收方拆包。

具体情况如下图所示:

netty粘包拆包.png

解决方案

固定长度

对于使用固定长度的粘包和拆包场景,可以使用:

FixedLengthFrameDecoder:每次读取固定长度的消息,如果当前读取到的消息不足指定长度,那么就会等待下一个消息到达后进行补足。其使用也比较简单,只需要在构造函数中指定每个消息的长度即可。

bootstrap.group(parentGroup, childGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
        //接收套接字缓冲区大小
        .option(ChannelOption.SO_RCVBUF, 1024 * 1024)
        //发送套接字缓冲区大小
        .option(ChannelOption.SO_SNDBUF, 1024 * 1024)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .option(ChannelOption.TCP_NODELAY, true)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                // 这里将FixedLengthFrameDecoder添加到pipeline中,指定长度为100
                pipeline.addLast(new FixedLengthFrameDecoder(100));
                // StringEncoder:字符串编码器,将String编码为将要发送到Channel中的ByteBuf
                pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
                // StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为String
                pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
                //绑定处理器(可绑定多个)
                pipeline.addLast(new ServerHandler()); //处理业务
            }
        });
行拆包

LineBasedFrameDecoder:每个应用层数据包,都以换行符作为分隔符,进行分割拆分,LineBasedFrameDecoder依次遍历ByteBuf中的可读字节,判断是否有"\n"或者"\r\n",如果有就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行,它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度,如果连续读到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。这个使用也比较简单:

childHandler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 这里将FixedLengthFrameDecoder添加到pipeline中,指定长度为100
        //pipeline.addLast(new FixedLengthFrameDecoder(100));
        //这里将LineBasedFrameDecoder添加到pipeline中,设置最大长度为1024
        pipeline.addLast(new LineBasedFrameDecoder(1024));
        // StringEncoder:字符串编码器,将String编码为将要发送到Channel中的ByteBuf
        pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
        // StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为String
        pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
        //绑定处理器(可绑定多个)
        pipeline.addLast(new ServerHandler()); //处理业务
    }
});
指定分隔符

对于通过分隔符进行粘包和拆包问题的处理,Netty提供了

DelimiterBasedFrameDecoder:通过用户指定的分隔符对数据进行粘包和拆包处理,用法如下:

childHandler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 这里将FixedLengthFrameDecoder添加到pipeline中,指定长度为100
        // pipeline.addLast(new FixedLengthFrameDecoder(100));
        //这里将LineBasedFrameDecoder添加到pipeline中,设置最大长度为1024
        // pipeline.addLast(new LineBasedFrameDecoder(1024));

        //被按照$_$进行分隔,这里1024指的是分隔的最大长度,即当读取到1024个字节的数据之后,
        // 若还是未读取到分隔符,则舍弃当前数据段,因为其很有可能是由于码流紊乱造成的
        ByteBuf delimiter = copiedBuffer(Constants.MESSAGE_DELIMITER.getBytes(Charset.forName("UTF-8")));
        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));

        // StringEncoder:字符串编码器,将String编码为将要发送到Channel中的ByteBuf
        pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
        // StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为String
        pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
        //绑定处理器(可绑定多个)
        pipeline.addLast(new ServerHandler()); //处理业务
    }
});
基于数据包长度的拆包

LengthFieldBasedFrameDecoder:将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度,应用如下:

childHandler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 这里将LengthFieldBasedFrameDecoder添加到pipeline的首位,因为其需要对接收到的数据
        // 进行长度字段解码,这里也会对数据进行粘包和拆包处理
        pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
        // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段
        pipeline.addLast(new LengthFieldPrepender(2));
        // StringEncoder:字符串编码器,将String编码为将要发送到Channel中的ByteBuf
        pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
        // StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为String
        pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
        //绑定处理器(可绑定多个)
        pipeline.addLast(new ServerHandler()); //处理业务
    }
});
自定义粘包拆包器

可以通过实现MessageToByteEncoderByteToMessageDecoder来实现自定义粘包和拆包处理的目的。

最后我们也可以自定义编码器MessageToMessageEncoder和自定义解码器MessageToMessageDecoder,来实现消息内容的转换,比如序列化成某个对象,处理器里面我们就可以不用再去转换对象,具体实现如下:

自定义解码器
/**
 * @Description:  自定义解码器
 * @author: dy
 */
public class CustomDecoder extends MessageToMessageDecoder<ByteBuf> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        System.out.println("====1111111111===="+msg.toString(Charset.forName("UTF-8")));
        out.add(JSON.parseObject(msg.toString(Charset.forName("UTF-8")), Message.class));
    }
}

使用只需要在构造器里面加入我们自定义的编码器就可以了:

childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                   
                            // 这里将LengthFieldBasedFrameDecoder添加到pipeline的首位,因为其需要对接收到的数据
                            // 进行长度字段解码,这里也会对数据进行粘包和拆包处理
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
                            // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段
                            pipeline.addLast(new LengthFieldPrepender(2));
                            // StringEncoder:字符串编码器,将message对象编码为将要发送到Channel中的ByteBuf
                            pipeline.addLast(new CustomDecoder());
                             // StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为String
                            pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
                            //绑定处理器(可绑定多个)
                            pipeline.addLast(new ServerHandler()); //处理业务
                        }
                    });
上一篇 下一篇

猜你喜欢

热点阅读