【网络编程】Netty编解码之 字头 + 数据长度 + 消息体的

2023-03-08  本文已影响0人  程就人生

在上一篇文章中介绍了 LengthFieldPrepender + StringEncoder 长度前置的文本编码器 和 LengthFieldBasedFrameDecoder + StringDecoder 基本长度解析的文本解码器组合的使用。

LengthFieldBasedFrameDecoder 解码器的功能还是比较强大的,可以解析 字头 + 数据长度 + 消息体 的协议格式,不止 6 种,还可以有很多变种。

在编码的时候,LengthFieldPrepender 长度前置编码类,就有些局限,只能添加数据长度,再添加一个字头就实现不了,今天就在 LengthFieldPrepender 的基础上改造一下,实现增加字头编码的功能。

做过物联网的朋友都知道,硬件那边使用的都是字节数组,很少有文本类型的信息,所以文本编码器 StringEncoder 会换成 ByteArrayEncoder 字节数组编码器,文本解码器 StringDecoder 会换成 ByteArrayDecoder 字节数组解码器。

在处理业务逻辑的时候,我们处理的是一组字节数组,在给客户端发送指令的时候,也发送一组字节数组,客户端发给服务器的时候,服务器接收到的也是一组字节数组。

在项目的历史版本中,服务器端使用拼接的方式,拼接字头,拼接数据长度,然后发给客户端。在这个版本中,我要把字头做成公共的,数据长度也做成公共的,在业务处理器中接收完整的数据包,去掉字头,根据数据长度获取消息体,毕竟消息体是不固定的,至于解析处理的字节代表什么意思,在此可以忽略不计。

还是老规矩,客户端给服务器端发送100条数据,服务器端反馈给客户端100条数据,查看运行结果是否会发生粘包/拆包问题。

服务器端代码:

package com.test.nio.stickyBag;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
 * Netty服务器端
 * @author 程就人生
 * @date
 * @Description 
 *
 */
public class TestServer {

    public void bind(final int port){
        // 配置服务端的Nio线程组,boosGroup负责新客户端接入
        EventLoopGroup boosGroup = new NioEventLoopGroup();
        // workerGroup负责I/O消息处理
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{            
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boosGroup, workerGroup)
            // 线程组设置为非阻塞
            .channel(NioServerSocketChannel.class)
            //连接缓冲池的大小
          .option(ChannelOption.SO_BACKLOG, 1024)
          //设置通道Channel的分配器
          .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            //设置长连接
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            // 采用匿名内部类的方式,声明hanlder
            .childHandler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                  /**
                   * 包长最大长度不能超过65535
                   * 长度偏移量 1 个字节
                   * 长度占用2个字节
                   * 长度调整-3
                   * 剥离初始字节0
                   */
                  ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,1,2,-3,0));                  
                    // Bytebuf 转 byte数组
                    ch.pipeline().addLast(new ByteArrayDecoder());
                    
                    // 对要发出去的数据进行编码
                    /**
                     * 根据 LengthFieldPrepender解码类改写
                     * 长度占用2个字节
                     * 长度调整0
                     * 长度字段中包含长度本身占用的字节数,
                     * 字头为0XFE
                     * 字头占用1个字节
                     */
                    ch.pipeline().addLast(new LengthHeaderFieldPrepender(2, 0, true, 0XFE, 1));
                    // byte数组  转 Bytebuf
                  ch.pipeline().addLast(new ByteArrayEncoder());   
                  
                    // 事件处理绑定,具体的业务逻辑处理
                    ch.pipeline().addLast(new ServerHandler());
                }               
            });
            // 绑定端口
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            // 服务端启动监听事件
        channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
                public void operationComplete(Future<? super Void> future) throws Exception {
                  //启动成功后的处理
                    if (future.isSuccess()) {
                       System.out.println("服务器启动成功,Started Successed:" + port);
                    } else {
                      System.out.println("服务器启动失败,Started Failed:" + port);
                    }
                }
            });
        // 等待服务端监听端口关闭
        channelFuture.channel().closeFuture().sync();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            // 优雅退出
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }        
    }
    
    public static void main(String[] argo){
        new TestServer().bind(8080);
    }
}

/**
 * 服务器端handler
 * @author 程就人生
 * @date
 * @Description 
 *
 */
class ServerHandler extends ChannelInboundHandlerAdapter{
    // 对接收的消息进行计数
    private static int counter;
    // I/O消息的接收处理
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        try{
           // 把接收到的内容输出到控制台
            byte[] data = (byte[]) msg;
           int dataLength = data.length;
           ByteBuf buf = Unpooled.buffer(dataLength);
           buf.writeBytes(data);
           System.out.println("这里是服务器端控制台:" + ByteBufUtil.hexDump(buf).toUpperCase() + "计数:" + ++counter);
           
           buf = Unpooled.buffer(15);
           buf.writeByte(0X08);
           buf.writeByte(0XD4);
           buf.writeByte(0X9B);
           buf.writeByte(0X06);
           buf.writeByte(0XB6);
           buf.writeByte(0X01);
           buf.writeByte(0X11);  
           buf.writeByte(0X01);
           buf.writeByte(0X01);
           buf.writeByte(0X00);
           buf.writeByte(0X12);
           buf.writeByte(0X02);
           buf.writeByte(0XF4);
           buf.writeByte(0X01);
           buf.writeByte(0X60);
           // 返回信息给客户端
           ctx.writeAndFlush(buf.array());
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       // 遇到异常时关闭ChannelHandlerContext 
       ctx.close();
    }
}

在 59 行,我们对进来的数据先使用 LengthFieldBasedFrameDecoder 类进行解码,数据包长最大不能超过 65535,长度偏移量 1 个字节(字头的长度),长度占用2个字节,长度调整 -3 (字头占用的字节 + 数据长度占用的字节),剥离初始字节 0 个字节 (解析后还需要完整的数据包)。

经过 LengthFieldBasedFrameDecoder 解码后,在 61 行再经过 ByteArrayDecoder 字节数组解码器解码成字节数组,最后传递给 ServerHandler 进行具体的业务逻辑处理。

在业务处理类 ServerHandler 中的 channelRead 方法中,我们将接收到的字节数组在控制台打印,并给客户端发送消息体长度为 15 个字节的字节数组,加上消息头和数据长度,客户端收到的应该是 18 个字节的字节数组。在 128 行,使用工具类 ByteBufUtil.hexDump(buf).toUpperCase() 打印客户端发过来的数据到控制台。

在 72 行,使用 LengthHeaderFieldPrepender 编码类对将要发出的字节数组进行加工,加上字头和数据长度,最后交给 ByteArrayEncoder 编码类将数据写入到 Channel 通道中,发送给客户端。

根据 LengthFieldPrepender 改写的 LengthHeaderFieldPrepender 代码:

import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
import java.nio.ByteOrder;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.internal.ObjectUtil;
/**
 * 根据 LengthFieldPrepender 类改写
 * @author 程就人生
 * @Date
 * 
 * 解码前:                                                                  解码后:                     
 * +--------+--------+----------------+        +--------+--------+----------------+
 * |  Header| Length | Actual Content |  ----->| Header | Length | Actual Content | 
 * |  0xFE  | 0x0006 | OX00 0X01 0X11 |        |  0xFE  | 0x0006 | OX00 0X01 0X11 |  
 * +------ -+--------+----------------+        +--------+--------+----------------+
 */
@Sharable
public class LengthHeaderFieldPrepender extends MessageToMessageEncoder<ByteBuf> {

    private final ByteOrder byteOrder;
    private final int lengthFieldLength;
    private final boolean lengthIncludesLengthFieldLength;
    private final int lengthAdjustment;    
    private final int header;
    private final int headerLength;

    /**
     * Creates a new instance.
     *
     * @param lengthFieldLength the length of the prepended length field.
     *                          Only 1, 2, 3, 4, and 8 are allowed.
     *
     * @throws IllegalArgumentException
     *         if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
     */
    public LengthHeaderFieldPrepender(int lengthFieldLength) {
        this(lengthFieldLength, false);
    }

    /**
     * Creates a new instance.
     *
     * @param lengthFieldLength the length of the prepended length field.
     *                          Only 1, 2, 3, 4, and 8 are allowed.
     * @param lengthIncludesLengthFieldLength
     *                          if {@code true}, the length of the prepended
     *                          length field is added to the value of the
     *                          prepended length field.
     *
     * @throws IllegalArgumentException
     *         if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
     */
    public LengthHeaderFieldPrepender(int lengthFieldLength, boolean lengthIncludesLengthFieldLength) {
        this(lengthFieldLength, 0, lengthIncludesLengthFieldLength);
    }

    /**
     * Creates a new instance.
     *
     * @param lengthFieldLength the length of the prepended length field.
     *                          Only 1, 2, 3, 4, and 8 are allowed.
     * @param lengthAdjustment  the compensation value to add to the value
     *                          of the length field
     *
     * @throws IllegalArgumentException
     *         if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
     */
    public LengthHeaderFieldPrepender(int lengthFieldLength, int lengthAdjustment) {
        this(lengthFieldLength, lengthAdjustment, false);
    }

    /**
     * Creates a new instance.
     *
     * @param lengthFieldLength the length of the prepended length field.
     *                          Only 1, 2, 3, 4, and 8 are allowed.
     * @param lengthAdjustment  the compensation value to add to the value
     *                          of the length field
     * @param lengthIncludesLengthFieldLength
     *                          if {@code true}, the length of the prepended
     *                          length field is added to the value of the
     *                          prepended length field.
     *
     * @throws IllegalArgumentException
     *         if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
     */
    public LengthHeaderFieldPrepender(int lengthFieldLength, int lengthAdjustment, boolean lengthIncludesLengthFieldLength) {
        this(ByteOrder.BIG_ENDIAN, lengthFieldLength, lengthAdjustment, lengthIncludesLengthFieldLength, 0, 0);
    }
    
    public LengthHeaderFieldPrepender(int lengthFieldLength, int lengthAdjustment, boolean lengthIncludesLengthFieldLength, int header, int headerLength) {
        this(ByteOrder.BIG_ENDIAN, lengthFieldLength, lengthAdjustment, lengthIncludesLengthFieldLength, header, headerLength);
    }

    /**
     * Creates a new instance.
     *
     * @param byteOrder         the {@link ByteOrder} of the length field
     * @param lengthFieldLength the length of the prepended length field.
     *                          Only 1, 2, 3, 4, and 8 are allowed.
     * @param lengthAdjustment  the compensation value to add to the value
     *                          of the length field
     * @param lengthIncludesLengthFieldLength
     *                          if {@code true}, the length of the prepended
     *                          length field is added to the value of the
     *                          prepended length field.
     *
     * @throws IllegalArgumentException
     *         if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
     */
    public LengthHeaderFieldPrepender(
            ByteOrder byteOrder, int lengthFieldLength,
            int lengthAdjustment, boolean lengthIncludesLengthFieldLength, int header, int headerLength) {
        if (lengthFieldLength != 1 && lengthFieldLength != 2 &&
            lengthFieldLength != 3 && lengthFieldLength != 4 &&
            lengthFieldLength != 8) {
            throw new IllegalArgumentException(
                    "lengthFieldLength must be either 1, 2, 3, 4, or 8: " +
                    lengthFieldLength);
        }
        ObjectUtil.checkNotNull(byteOrder, "byteOrder");

        this.byteOrder = byteOrder;
        this.lengthFieldLength = lengthFieldLength;
        this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength;
        this.lengthAdjustment = lengthAdjustment;
        this.header = header;
        this.headerLength = headerLength;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        int length = msg.readableBytes() + lengthAdjustment + headerLength;
        if (lengthIncludesLengthFieldLength) {
            length += lengthFieldLength;
        }
        // 写字头(新加)
        out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) header));
        
        checkPositiveOrZero(length, "length");
        
        switch (lengthFieldLength) {
        case 1:
            if (length >= 256) {
                throw new IllegalArgumentException(
                        "length does not fit into a byte: " + length);
            }
            out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) length));
            break;
        case 2:
            if (length >= 65536) {
                throw new IllegalArgumentException(
                        "length does not fit into a short integer: " + length);
            }
            out.add(ctx.alloc().buffer(2).order(byteOrder).writeShort((short) length));
            break;
        case 3:
            if (length >= 16777216) {
                throw new IllegalArgumentException(
                        "length does not fit into a medium integer: " + length);
            }
            out.add(ctx.alloc().buffer(3).order(byteOrder).writeMedium(length));
            break;
        case 4:
            out.add(ctx.alloc().buffer(4).order(byteOrder).writeInt(length));
            break;
        case 8:
            out.add(ctx.alloc().buffer(8).order(byteOrder).writeLong(length));
            break;
        default:
            throw new Error("should not reach here");
        }        
        out.add(msg.retain());
    }
}

在该编码类中,增加了两个成员变量,一个是字头 header,另一个是字头长度 headerLength,分别用在了 137 行和 142 行,计算消息体长度的时候用到了字头长度 headerLength,写入字头的时候用到了字头 header。其实在写入字头的时候也应该像 146 行一样,根据字头长度,写入不同数据类型的字头,这里先免了吧,严谨一点还是要加的。

客户端代码:

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;

/**
 * netty客户端
 * @author 程就人生
 * @date
 * @Description 
 *
 */
public class TestClient {
  
    public void connect(int port, String host){
        // 客户端Nio线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try{   
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
            // 线程组设置为非阻塞
            .channel(NioSocketChannel.class)
          .option(ChannelOption.SO_KEEPALIVE, true)
            .handler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {    
                  
                  /**
                   * 包长最大长度不能超过65535
                   * 长度偏移量 1 个字节
                   * 长度占用2个字节
                   * 长度调整-3
                   * 剥离初始字节0
                   */
                  ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,1,2,-3,0));                  
                    // Bytebuf 转 byte数组
                    ch.pipeline().addLast(new ByteArrayDecoder());
                    
                    // 对要发出去的数据进行编码
                    /**
                     * 根据 LengthFieldPrepender解码类改写
                     * 长度占用2个字节
                     * 长度调整0
                     * 长度字段中包含长度本身占用的字节数,
                     * 字头为0XFE
                     * 字头占用1个字节
                     */
                    ch.pipeline().addLast(new LengthHeaderFieldPrepender(2, 0, true, 0XFE, 1));
                    // byte数组  转 Bytebuf
                  ch.pipeline().addLast(new ByteArrayEncoder());    
                  
                    // 事件处理绑定,具体的业务逻辑处理
                    ch.pipeline().addLast(new ClientHandler());
                }
            });            
            // 建立连接
            ChannelFuture channelFuture = bootstrap.connect(host, port);
            // 等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            // 优雅退出
            group.shutdownGracefully();
        }
    }
    
    public static void main(String[] argo){
        new TestClient().connect(8080, "localhost");
    }
}

/**
 * 客户端处理handler
 * @author 程就人生
 * @date 
 * @Description 
 *
 */
class ClientHandler extends ChannelInboundHandlerAdapter{
    // 对接收的消息次数进行计数
    private static int counter;
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // 连接成功后,发送消息,连续发送100次,模拟数据交互的频繁
        ByteBuf firstMessage = null;
        for(int i = 0;i<100;i++){
            firstMessage = Unpooled.buffer(7);
            firstMessage.writeByte(0X08);
            firstMessage.writeByte(0X87);
            firstMessage.writeByte(0X9A);
            firstMessage.writeByte(0X01);
            firstMessage.writeByte(0X5D);
            firstMessage.writeByte(0X01);
            firstMessage.writeByte(0X90);     
            ctx.writeAndFlush(firstMessage.array());
        }        
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        try{
          byte[] data = (byte[]) msg;
            int dataLength = data.length;
            ByteBuf buf = Unpooled.buffer(dataLength);
            buf.writeBytes(data);
            System.out.println("这里是客户端控制台:" + ByteBufUtil.hexDump(buf).toUpperCase() + ";计数: " + ++counter);
            // 释放资源
            ReferenceCountUtil.release(buf);
        }catch(Exception e){
            e.printStackTrace();
        }        
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //释放资源
        ctx.close();
    }
}

客户端在使用匿名内部类设置编解码规则时,需要和服务端保持一致。

在 97 行的 channelActive 方法中,客户端向服务器发送 100 次数据,每条数据不包含字头数据长度占用7个字节,加上字头数据长度,要占用10个字节。服务器端输出的也应该是10个字节的数据。

在客户端业务逻辑处理器 ClientHandler 中的 channelRead 方法中,使用工具类 ByteBufUtil.hexDump(buf).toUpperCase() 打印服务器端发过来的数据到控制台,并使用 ReferenceCountUtil.release(buf) 方法对资源进行释放。

服务器端运行结果:

客户端运行结果:

以上便是 LengthFieldBasedFrameDecoder + ByteArrayDecoder 解码类和 LengthHeaderFieldPrepender + ByteArrayEncoder 编码类组合使用的示例。

上一篇下一篇

猜你喜欢

热点阅读