netty

使用LengthFieldBasedFrameDecoder解码

2019-05-13  本文已影响0人  老鼠AI大米_Java全栈

最近需要做一个长连接的设备管理,使用netty可以方便的做到,还可以配置心跳及解码器

自定义长度解码器

LengthFieldBasedFrameDecoder解码器自定义长度解决TCP粘包黏包问题。所以又称为: 自定义长度解码器

TCP粘包和黏包现象

  1. TCP粘包是指发送方发送的若干个数据包到接收方时粘成一个包。从接收缓冲区来看,后一个包数据的头紧接着前一个数据的尾。

  2. 当TCP连接建立后,Client发送多个报文给Server,TCP协议保证数据可靠性,但无法保证Client发了n个包,服务端也按照n个包接收。Client端发送n个数据包,Server端可能收到n-1或n+1个包。

为什么出现粘包现象?

  1. 发送方原因: TCP默认会使用Nagle算法。而Nagle算法主要做两件事:1)只有上一个分组得到确认,才会发送下一个分组;2)收集多个小分组,在一个确认到来时一起发送。所以,正是Nagle算法造成了发送方有可能造成粘包现象。

  2. 接收方原因: TCP接收方采用缓存方式读取数据包,一次性读取多个缓存中的数据包。自然出现前一个数据包的尾和后一个收据包的头粘到一起。

如何解决粘包现象

就是要选择相应的解码器

LengthFieldBasedFrameDecoder参数

自定义长度解码器,所以构造函数中6个参数,基本都围绕那个定义长度域,进行的描述。

  1. maxFrameLength - 发送的数据帧最大长度
  2. lengthFieldOffset - 定义长度域位于发送的字节数组中的下标。换句话说:发送的字节数组中下标为${lengthFieldOffset}的地方是长度域的开始地方
  3. lengthFieldLength - 用于描述定义的长度域的长度。换句话说:发送字节数组bytes时, 字节数组bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength]域对应于的定义长度域部分
  4. lengthAdjustment - 满足公式: 发送的字节数组bytes.length - lengthFieldLength = bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength] + lengthFieldOffset + lengthAdjustment
  5. initialBytesToStrip - 接收到的发送数据包,去除前initialBytesToStrip位
  6. failFast - true: 读取到长度域超过maxFrameLength,就抛出一个 TooLongFrameException。false: 只有真正读取完长度域的值表示的字节之后,才会抛出 TooLongFrameException,默认情况下设置为true,建议不要修改,否则可能会造成内存溢出
  7. ByteOrder - 数据存储采用大端模式或小端模式

举例解释参数如何写

客户端多次发送"HELLO, WORLD"字符串给服务端。"HELLO, WORLD"共12字节(12B)。长度域中的内容是16进制的值,如下:

  1. 0x000c -----> 12

  2. 0x000e -----> 14

场景1

数据包大小: 14B = 长度域2B + "HELLO, WORLD"


1.png

解释:

如上图,长度域的值为12B(0x000c)。希望解码后保持一样,根据上面的公式,参数应该为:

  1. lengthFieldOffset = 0

  2. lengthFieldLength = 2

  3. lengthAdjustment = 0 = 数据包长度(14) - lengthFieldOffset - lengthFieldLength - 长度域的值(12)

  4. initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据

场景2

数据包大小: 14B = 长度域2B + "HELLO, WORLD"


2.png

解释:

上图中,解码后,希望丢弃长度域2B字段,所以,只要initialBytesToStrip = 2即可。其他与场景1相同

  1. lengthFieldOffset = 0

  2. lengthFieldLength = 2

  3. lengthAdjustment = 0 = 数据包长度(14) - lengthFieldOffset - lengthFieldLength - 长度域的值(12)

  4. initialBytesToStrip = 2 解码过程中,丢弃2个字节的数据

场景3

数据包大小: 14B = 长度域2B + "HELLO, WORLD"。与场景1不同的是:场景3中长度域的值为14(0x000E)


3.png

解释:

如上图,长度域的值为14(0x000E)。希望解码后保持一样,根据上面的公式,参数应该为:

  1. lengthFieldOffset = 0

  2. lengthFieldLength = 2

  3. lengthAdjustment = -2 = 数据包长度(14) - lengthFieldOffset - lengthFieldLength - 长度域的值(14)

  4. initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据

场景4

场景4在长度域前添加2个字节的Header。长度域的值(0x00000C) = 12。总数据包长度: 17=Header(2B) + 长度域(3B) + "HELLO, WORLD"


4.png

解释

如上图。编码解码后,长度保持一致,所以initialBytesToStrip = 0。参数应该为:

  1. lengthFieldOffset = 2

  2. lengthFieldLength = 3

  3. lengthAdjustment = 0 = 数据包长度(17) - lengthFieldOffset(2) - lengthFieldLength(3) - 长度域的值(12)

  4. initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据

场景5

与场景4不同的地方是: Header与长度域的位置换了。总数据包长度: 17=长度域(3B) + Header(2B) + "HELLO, WORLD"


5.png

解释

如上图。编码解码后,长度保持一致,所以initialBytesToStrip = 0。参数应该为:

  1. lengthFieldOffset = 0

  2. lengthFieldLength = 3

  3. lengthAdjustment = 2 = 数据包长度(17) - lengthFieldOffset(0) - lengthFieldLength(3) - 长度域的值(12)

  4. initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据

场景6

如下图,"HELLO, WORLD"域前有多个字段。总数据长度: 16 = HEADER1(1) + 长度域(2) + HEADER2(1) + "HELLO, WORLD"


6.png
  1. lengthFieldOffset = 1

  2. lengthFieldLength = 2

  3. lengthAdjustment = 1 = 数据包长度(16) - lengthFieldOffset(1) - lengthFieldLength(2) - 长度域的值(12)

  4. initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据

自定义协议

很多时候并不能按照以上参数的方式去解析数据,所以需要自定义协议,LengthFieldBasedFrameDecoder解码器自定义协议.通常,协议的格式如下:

协议格式.png
通常来说,使用ByteToMessageDocoder这个编码器,我们要分别解析出Header,length,body这几个字段.而使用LengthFieldBasedFrameDecoder,我们就可以直接接收想要的一部分,相当于在原来的基础上包上了一层,有了这层之后,我们可以控制我们每次只要读想读的字段,这对于自定义协议来说十分方便.
  1. MyProtocolDecoder的定义
public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder {
    private static final int HEADER_SIZE = 6;
    /**
     *
     * @param maxFrameLength  帧的最大长度
     * @param lengthFieldOffset length字段偏移的地址
     * @param lengthFieldLength length字段所占的字节长
     * @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段
     * @param initialBytesToStrip 解析时候跳过多少个长度
     * @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异
     */
    public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        //在这里调用父类的方法,实现指得到想要的部分,我在这里全部都要,也可以只要body部分
        in = (ByteBuf) super.decode(ctx,in);  

        if(in == null){
            return null;
        }
        if(in.readableBytes()<HEADER_SIZE){
            throw new Exception("字节数不足");
        }
        //读取type字段
        byte type = in.readByte();
        //读取flag字段
        byte flag = in.readByte();
        //读取length字段
        int length = in.readInt();
        
        if(in.readableBytes()!=length){
            throw new Exception("标记的长度不符合实际长度");
        }
        //读取body
        byte []bytes = new byte[in.readableBytes()];
        in.readBytes(bytes);
        return new MyProtocolBean(type,flag,length,new String(bytes,"UTF-8"));
    }
}

在上述的代码中,调用父类的方法,实现截取到自己想要的字段,如可以判断数据必须以xx开头。

  1. 协议实体的定义
public class MyProtocolBean {
    //类型  系统编号 0xA 表示A系统,0xB 表示B系统
    private byte type;
    //信息标志  0xA 表示心跳包    0xC 表示超时包  0xC 业务信息包
    private byte flag;
    //内容长度
    private int length;
    //内容
    private String content;

    public MyProtocolBean(byte flag, byte type, int length, String content) {
        this.flag = flag;
        this.type = type;
        this.length = length;
        this.content = content;
    }
}

3.服务端的实现

public class Server {

    private static final int MAX_FRAME_LENGTH = 1024 * 1024;  //最大长度
    private static final int LENGTH_FIELD_LENGTH = 4;  //长度字段所占的字节数
    private static final int LENGTH_FIELD_OFFSET = 2;  //长度偏移
    private static final int LENGTH_ADJUSTMENT = 0;
    private static final int INITIAL_BYTES_TO_STRIP = 0;

    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
                            ch.pipeline().addLast(new ServerHandler());
                        };

                    }).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            // 绑定端口,开始接收进来的连接
            ChannelFuture future = sbs.bind(port).sync();

            System.out.println("Server start listen at " + port );
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new Server(port).start();
    }
}
  1. 服务端Hanlder
public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        MyProtocolBean myProtocolBean = (MyProtocolBean)msg;  //直接转化成协议消息实体
        System.out.println(myProtocolBean.getContent());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }
}

服务端Handler没什么特别的地方,只是输出接收到的消息

  1. 客户端
public class Client {
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MyProtocolEncoder());
                            ch.pipeline().addLast(new ClientHandler());
                        }
                    });

            ChannelFuture future = b.connect(HOST, PORT).sync();
            future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

}
  1. 客户端Handler
public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        MyProtocolBean myProtocolBean = new MyProtocolBean((byte)0xA, (byte)0xC, "Hello,Netty".length(), "Hello,Netty");
        ctx.writeAndFlush(myProtocolBean);
    }
}

客户端Handler实现发送消息.

  1. 客户端编码器
public class MyProtocolEncoder extends MessageToByteEncoder<MyProtocolBean> {

    @Override
    protected void encode(ChannelHandlerContext ctx, MyProtocolBean msg, ByteBuf out) throws Exception {
        if(msg == null){
            throw new Exception("msg is null");
        }
        out.writeByte(msg.getType());
        out.writeByte(msg.getFlag());
        out.writeInt(msg.getLength());
        out.writeBytes(msg.getContent().getBytes(Charset.forName("UTF-8")));
    }
}

编码的时候,只需要按照定义的顺序依次写入到ByteBuf中.

小结
若是上面的参数直接可以满足要求,可以直接使用参数,若不可以则通过自定义的方式去实现,更加灵活。

上一篇下一篇

猜你喜欢

热点阅读