【网络编程】Netty编解码之LengthFieldPrepen

2023-03-07  本文已影响0人  程就人生

大家好,我是娟姐。

图1 图2

在实际应用中,我们经常会面对自定义格式的协议,比如图1和图2,该如何编解码,并防止粘包/拆包呢?

在 netty 框架里,刚好有 LengthFieldPrepender 编码器 和 LengthFieldBasedFrameDecoder 解码器,来处理上面两种协议的编解码。

LengthFieldPrepender 继承 MessageToMessageEncoder<ByteBuf> ,是一个长度前置编码器,它负责在消息的头部设置消息的长度。

图3

在图3中,第 37 行的示例为原始数据共12个字节;第 43 行的数据在消息头加入了数据长度,数据长度占用 2 个字节;第 51 行与 43 行的不同之处就是数据长度多了 2 个字节,这 2 字节是数据长度本身所占用的长度。

在 LengthFieldPrepender 中,有 4 个成员变量。

  1. byteOrder:设置字节序,默认大字端,在缓冲区处理数据是以大字端方式,还是以小字端方式;

  2. lengthFieldLength:数据长度所占用的字节数,没有默认值,必须设置;

  3. lengthIncludesLengthFieldLength:默认false,数据长度中是否包含数据长度本身的长度;

  4. lengthAdjustment:默认0,长度调整字节数,消息体的长度等于数据长度加上长度调整字节数。

在 LengthFieldPrepender 中,共 5 个构造函数,成员变量 lengthFieldLength 为必传参数,其他都有默认值。在第 5 个构造函数中完成所有成员变量的初始化。

图4

在图4中,lengthFieldLength 长度字段的值只能为 1,2,3,4,8,否则抛异常。为什么?因为:

下面看 LengthFieldPrepender 的核心方法encode方法,看看它是如何在消息的头部加入数据长度的。

图5

在图5中的 163 行,长度等于消息体的可读字节数加上长度调整字节数。

在164行,如果数据长度字节中包含长度本身的长度时,在165行把长度本身的长度也加上。

在168 行,对数据长度的合法性进行验证,长度肯定是不能小于 0 的,小于 0 就要抛 IllegalArgumentException 异常。

在170 行,对数据长度所占字节数进行判断。如果数据长度为 1 个字节时,数据长度的值不能大于等于256,因为一个字节的存储上限是255,等于或超过 256 在写入的时候会丢失。如果小于 256 时,就将数据长度在上下文 ctx 中分配一个字节的缓存区并写入该数据。数据长度为 2、3时判断同上。数据长度为 4、8 时直接开辟空间写入。

在 201 行,将传递过来的 msg 写入缓存,交给下一个编码器处理。

再来看看 LengthFieldBasedFrameDecoder 继承 ByteToMessageDecoder ,它是一个解码器,根据消息中的长度动态拆分 ByteBuf。对设置了数据长度的消息体解析特别有用。

图6

在 LengthFieldBasedFrameDecoder 中,有 8 个 final 类型的成员变量,有 3 个类型的非 final 类型的成员变量。

  1. byteOrder:字节序,默认大字端;

  2. maxFrameLength:一个数据包允许的最大长度,初始化时必须设置;

  3. lengthFieldOffset:数据长度所在位置偏移量,从第几位开始读数据长度;

  4. lengthFieldLength:数据长度所占用的字节数;

  5. lengthFieldEndOffset:默认值为 0,结束偏移量;

  6. lengthAdjustment:默认值为 0,长度调整字节数;

  7. initialBytesToStrip:默认值为0,要剥离的初始字节;

  8. failFast:快速失败,默认 true,如果为 true 时,不读完数据包就抛出异常,否则读完数据包再抛出异常;

  9. discardingTooLongFrame:是否跳过超出存储范围的字节,默认false;

  10. tooLongFrameLength:最长的包长;

  11. bytesToDiscard:需要跳过的字节数;

在 LengthFieldBasedFrameDecoder 类中,有 4 个构造函数,在第 4 个构造函数中,完成了所有 final 类型成员变量的赋值,其他非 final 类型的成员变量,则在需要的时候再去赋值使用。

图6

在图6的构造函数中,结束偏移量 lengthFieldEndOffset 是根据数据长度 lengthFieldLength 加上 数据长度偏移量 lengthFieldOffset 计算出来的。这些成员变量,到底代表什么含义,很不好理解,也容易混淆。在类的注释上,有很多范例,可以根据这些范例来理解这些成员变量是如何使用的。第一种协议格式:

图 7

在图7中,第一个红色框框是解码前的协议,数据长度占用 2 个字节,消息体为 "HELLO, WORLD" 字符串,消息体的长度为 12 个字节,因此数据长度位置填充12 的十六进制数据 0x000C。第二个红色框框是解码后的协议,解码后的协议基本和解析前的协议保持一致。

8 个 final 类型的成员变量,其中:

第二种协议格式:

图 8

在图8 中,解码后的数据长度不见了,只剩消息体。这是如何设置的呢,其实很简单,只需要把要跳过的初始字节 initialBytesToStrip 设置为数据长度所占用的字节数即可。

8 个 final 类型的成员变量,其中:

第三种协议格式:

图 9

在图9中,解码前和解码后的数据格式是一样的,唯一的不同是数据位的长度。消息体的长度只有 12 个字节,这里是 14 个字节,多了2个字节,多出的这2个字节是数据长度本身所占用的字节。也就是说,数据长度位置的数据包含了数据长度本身所占用的字节数,数据长度位置的数据是整个数据包的数据长度,而不单单是消息体的长度。在设置参数时,除了设置数据长度 lengthFieldLength 所占用的字节数,还需要将数据长度调整参数 lengthAdjustment 设置为 -2 。

8 个 final 类型的成员变量,其中:

在设置最大包长时,如果数据长度中包含了数据长度本身的长度,那么消息体的最大长度就需要扣除数据长度所占的长度。第四种协议格式:

图 10

在图10中,在数据长度的前面增加了字头,解码前的协议和解码后的协议格式保持一致。其中,字头 header 占用 2 个字节,数据长度 length 占用 3 个字节。

8 个 final 类型的成员变量,其中:

第五种协议格式:

图11

在图11中,数据长度和字头调了个位置,真是什么样的协议格式都有,对于这样的协议格式参数该如何设置呢?

8 个 final 类型的成员变量,其中:

第六种协议格式:

图12

在图12中,协议更加复杂了,出现了双字头。字头1和字头2分别占用 1 个字节。数据长度占用2个字节。对于这样的协议参数该如何设置呢?

8 个 final 类型的成员变量,其中:

第七种协议格式:

图13

在图13中,协议格式又有了新变化,数据长度包含的不仅仅是消息体的长度,还包括双字头的长度以及数据长度所占用的字节数。这么复杂的协议,它的参数是如何设置的呢?

8 个 final 类型的成员变量,其中:

接下来看 LengthFieldBasedFrameDecoder 的核心方法 decode 方法。

@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}

/**
 * Create a frame out of the {@link ByteBuf} and return it.
 *
 * @param   ctx             the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
 * @param   in              the {@link ByteBuf} from which to read data
 * @return  frame           the {@link ByteBuf} which represent the frame or {@code null} if no frame could
 *                          be created.
 */
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    if (discardingTooLongFrame) {
        discardingTooLongFrame(in);
    }

    if (in.readableBytes() < lengthFieldEndOffset) {
        return null;
    }

    int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
    long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);

    if (frameLength < 0) {
        failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
    }

    frameLength += lengthAdjustment + lengthFieldEndOffset;

    if (frameLength < lengthFieldEndOffset) {
        failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
    }

    if (frameLength > maxFrameLength) {
        exceededFrameLength(in, frameLength);
        return null;
    }

    // never overflows because it's less than maxFrameLength
    int frameLengthInt = (int) frameLength;
    if (in.readableBytes() < frameLengthInt) {
        return null;
    }

    if (initialBytesToStrip > frameLengthInt) {
        failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
    }
    in.skipBytes(initialBytesToStrip);

    // extract frame
    int readerIndex = in.readerIndex();
    int actualFrameLength = frameLengthInt - initialBytesToStrip;
    ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
    in.readerIndex(readerIndex + actualFrameLength);
    return frame;
}

在第3行,通过方法重写的方式对进来的 ByteBuf 进行解码处理。在第4行判断解码后的对象是否为 null,如果不为 null,在第5行,将数据传递给下一个解码器处理。

从第 17 行开始重写 decode 方法。

在 18 行判断 discardingTooLongFrame 参数是否为true,初次进到这个方法时,discardingTooLongFrame 默认值是false,直到进入 40 行的 exceededFrameLength 方法,要销毁的数据长度大于 0 时,才会被设置为 true。

在 22 行判断 ByteBuf 里面的可读字节数是否小于结束偏移量 lengthFieldEndOffset,如果小于就返回null 。

第 26 行,计算实际的数据长度偏移量 actualLengthFieldOffset,它等于当前可读索引加上数据长度偏移量。

第 27 行,根据实际的数据长度偏移量 actualLengthFieldOffset 和数据长度所占字节数 lengthFieldLength,获取协议中的数据长度 frameLength。

第 29 行,判断数据长度 frameLength 是否小于 0,小于 0 时从 ByteBuf 中跳过结束偏移量 lengthFieldEndOffset 的字节数,并抛出 CorruptedFrameException 异常。

第 33 行,重新计算数据长度 frameLength,它等于当前数据长度 frameLength 加上长度调整字节数 lengthAdjustment 加上结束偏移量 lengthFieldEndOffset 。

第 35 行,如果数据长度小于结束偏移量时,从 ByteBuf 中跳过结束偏移量数量的字节数,并抛出 CorruptedFrameException 异常。

第 39 行,如果数据长度 frameLength 大于数据包的最大长度 maxFrameLength 时,会跳过数据长度 frameLength 的字节数,并返回 null。

第 46 行,如果可读数量小于最大长度 frameLengthInt 时,返回 null。

第 50 行,要跳过的初始字节数 initialBytesToStrip 大于数据长度 frameLengthInt 时,返回 null。

第 53 行,从 ByteBuf 中跳过初始字节 initialBytesToStrip。

第 56 行,获取读索引。

第 57 行,计算数据包的实际长度 actualFrameLength ,用数据长度 frameLengthInt 减去初始字节 initialBytesToStrip。

第 58 行,根据读索引 readerIndex 和实际数据长度 actualFrameLength,从 ByteBuf 缓冲区中切出一个ByteBuf的数据片来。

第 59 行,设置当前 ByteBuf 缓冲区的读索引为读索引 readerIndex 加上实际数据包的长度 actualFrameLength。也就是说被分离出去的数据读过了,不要再读了。

第 60 行,将 58 行切出的 ByteBuf 返回。

编码器和解码器的源码已经过了一遍了,接下来就拿图1中的协议实战一下。

服务器端编码:


import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
 * Netty服务器端
 * @author 程就人生
 * @date 2023年01月07日
 * @Description 
 *
 */
public class TestServer {

    public void bind(final int port){
        // 配置服务端的Nio线程组,boosGroup负责新客户端接入
        EventLoopGroup boosGroup = new NioEventLoopGroup();
        // workerGroup负责I/O消息处理
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{            
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boosGroup, workerGroup)
            // 线程组设置为非阻塞
            .channel(NioServerSocketChannel.class)
            //连接缓冲池的大小
          .option(ChannelOption.SO_BACKLOG, 1024)
          //设置通道Channel的分配器
          .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            //设置长连接
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            // 采用匿名内部类的方式,声明hanlder
            .childHandler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                  // 数据包最大长度65535, 长度偏移量 0,长度2个字节,长度调整 0 个字节,要剥离2个初始字节
                  ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
                  // ByteBuf 转 字符串
                  ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                  
                  // 数据长度占用两个字节,数据长度字段中不包含数据长度本身所占用的字节数
                  ch.pipeline().addLast(new LengthFieldPrepender(2)); 
                  // 字符串 转 ByteBuf
                  ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));                  
                    // 事件处理绑定
                    ch.pipeline().addLast(new ServerHandler());
                }               
            });
            // 绑定端口
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            // 服务端启动监听事件
        channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
                public void operationComplete(Future<? super Void> future) throws Exception {
                  //启动成功后的处理
                    if (future.isSuccess()) {
                       System.out.println("服务器启动成功,Started Successed:" + port);
                    } else {
                      System.out.println("服务器启动失败,Started Failed:" + port);
                    }
                }
            });
        // 等待服务端监听端口关闭
        channelFuture.channel().closeFuture().sync();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            // 优雅退出
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }        
    }
    
    public static void main(String[] argo){
        new TestServer().bind(8080);
    }
}

/**
 * 服务器端handler
 * @author 程就人生
 * @date 2023年01月07日
 * @Description 
 *
 */
class ServerHandler extends ChannelInboundHandlerAdapter{
    // 对接收的消息进行计数
    private static int counter;
    // I/O消息的接收处理
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        try{
            // 把接收到的内容输出到控制台
            System.out.println("这里是服务器端控制台:" + msg + ",计数:" + ++counter);
            // 1.2、发送字符串
            String resp = "来自服务器端的消息~!@@来自服务器端的消息S~!";            
            // 返回信息给客户端
            ctx.writeAndFlush(resp);            
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       // 遇到异常时关闭ChannelHandlerContext 
       ctx.close();
    }
}

在 53 行,先使用 LengthFieldBasedFrameDecoder 解码类对进来的 buffer ,根据传递的参数进行进行解码,这里的参数设置的非常简单,最大包长 65535,数据长度占用 2个字节,要剥离的初始偏移量为 2 个字节,因为我们不需要数据长度只需要消息体,其他参数设置为0。

在 54 行,再使用 StringDecoder 对进来的数据进行解码,从 ByteBuf 类型解码成 String 字符串。

在 59 行,对要发出去的数据进行编码,先将 String 字符串类型的数据编码为 ByteBuf。

在 57 行,对要出去的数据再次包装,在字头添加数据长度,这里设置数据长度占用2个字节。

在这里有一点需要注意,那就是编码的顺序和解码的顺序是反着的,这个位置不能放错。

客户端编码:


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
 * netty客户端
 * @author 程就人生
 * @date 2023年01月07日
 * @Description 
 *
 */
public class TestClient {
  
    public void connect(int port, String host){
        // 客户端Nio线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try{   
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
            // 线程组设置为非阻塞
            .channel(NioSocketChannel.class)
          .option(ChannelOption.SO_KEEPALIVE, true)
            .handler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {                   
                  // 数据包最大长度65535, 长度偏移量 0,长度2个字节,长度调整 0 个字节,要剥离2个初始字节
                  ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
                  // ByteBuf 转 字符串
                  ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                  
                  // 数据长度占用两个字节,数据长度字段中不包含数据长度本身所占用的字节数
                  ch.pipeline().addLast(new LengthFieldPrepender(2)); 
                  // 字符串 转 ByteBuf
                  ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));                  
                    // 事件处理绑定
                    ch.pipeline().addLast(new ClientHandler());
                }
            });            
            // 建立连接
            ChannelFuture channelFuture = bootstrap.connect(host, port);
            // 等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            // 优雅退出
            group.shutdownGracefully();
        }
    }
    
    public static void main(String[] argo){
        new TestClient().connect(8080, "localhost");
    }
}

/**
 * 客户端处理handler
 * @author 程就人生
 * @date 2023年01月07日
 * @Description 
 *
 */
class ClientHandler extends ChannelInboundHandlerAdapter{
    // 对接收的消息次数进行计数
    private static int counter;
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
      // 1.2、发送字符串
        String req = "来自客户端的消息~!@@来自客户端的消息clent~!";
        // 连接成功后,发送消息,连续发送100次,模拟数据交互的频繁
        for(int i = 0;i<100;i++){
            ctx.writeAndFlush(req);
        }
        
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        try{
            System.out.println("这里是客户端控制台:" + msg + ",计数:" + ++counter);
        }catch(Exception e){
            e.printStackTrace();
        }        
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //释放资源
        ctx.close();
    }
}

客户端的编解码和服务器端的编解码保持一致。服务器端运行结果:

客户端运行结果:

服务器端和客户端输出了完整的数据,没有发生粘包/拆包问题。

如果在数据长度中加入数据长度本身的数据,只需把服务器的编解码参数和客户端的编解码参数调整一下即可:

// 数据包最大长度65535, 长度偏移量 0,长度2个字节,长度调整 -2 个字节,要剥离2个初始字节
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, -2, 2));
// ByteBuf 转 字符串
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));

// 数据长度占用两个字节,数据长度字段中包含数据长度本身所占用的字节数
ch.pipeline().addLast(new LengthFieldPrepender(2, true)); 
// 字符串 转 ByteBuf
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));

LengthFieldBasedFrameDecoder 解码类可以解析带有字头的协议,但是LengthFieldPrepender 编码类,只能在消息体前面加入数据长度,如果要加入字头就不能满足这个需求了。这对组合比较适合文中的前三种协议结构。

以上便是 LengthFieldPrepender 编码类和 LengthFieldBasedFrameDecoder 解码类的固定搭配。

上一篇 下一篇

猜你喜欢

热点阅读