2021-08-29_Netty基于长度的变长解码器源码学习笔记

2021-08-29  本文已影响0人  kikop

20210829_netty基于长度的变长解码器源码学习笔记2

1概述

LengthFieldBasedFrameDecoder基于长度的变长解码器,其核心要点:消息结构体中要有指定消息体或整个消息的长度说明。

这里如果一个消息没有指定固定长度的话, Netty也给我们准备了一个编码器,即LengthFieldPrepender(消息发送之前预先长度编码),这两个类通常一起使用来解决半包与粘包的问题。

常见的应用场景:专用的客户端、服务器程序。

本机主要是结合代码分析LengthFieldBasedFrameDecoder解码时序过程。

1.1LengthFieldBasedFrameDecoder解码器

如果消息是通过长度进行区分(LengthFieldPrepender),此解码器可以处理粘包与半包问题。

1.1.1构造函数

public LengthFieldBasedFrameDecoder(
            int maxFrameLength,
            int lengthFieldOffset, // 长度偏移位置 
            int lengthFieldLength, // 长度占用的字节长度
            int lengthAdjustment,  // 消息内容读取结束标志调整,决定netty还要读多少个字节,就是一个完整的消息包
    int initialBytesToStrip) {     // 拿到一个完整的数据包之后向业务解码传递之前,应该跳过多少字节(有时想跳过指定的数据长度,或者屏蔽不感兴趣的数据)
        this(
                maxFrameLength,
                lengthFieldOffset, lengthFieldLength, lengthAdjustment,
                initialBytesToStrip, true);
    }

1.2pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <artifactId>technicaltools</artifactId>
        <groupId>com.kikop</groupId>
        <version>1.0-SNAPSHOT</version>
        <!--定义依赖的父pom文件-->
        <relativePath>../pom.xml</relativePath>
    </parent>

    <packaging>jar</packaging>
    <artifactId>mylengthfielddecoderdemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <description>mylengthfielddecoderdemo project</description>


    <properties>
        <junit.version>5.7.0</junit.version>
    </properties>

    <dependencies>

        <!--1.mytechcommon-->
        <dependency>
            <groupId>com.kikop</groupId>
            <artifactId>mytechcommon</artifactId>
            <version>2.0-SNAPSHOT</version>
        </dependency>

        <!--2.junit-->
        <!-- Common test dependencies -->
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-params</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.vintage</groupId>
            <artifactId>junit-vintage-engine</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.1</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest-library</artifactId>
            <version>1.3</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.18.0</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-core</artifactId>
            <version>2.18.3</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>io.github.artsok</groupId>
            <artifactId>rerunner-jupiter</artifactId>
            <version>2.1.6</version>
            <scope>test</scope>
        </dependency>


        <!--3.netty-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <!--<version>${netty.version}</version>-->
            <version>4.1.6.Final</version>
        </dependency>

    </dependencies>

    <build>

    </build>

</project>

2代码示例

2.1测试

package com.kikop;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.TooLongFrameException;
import org.junit.Assert;
import org.junit.Test;

/**
 * @author kikop
 * @version 1.0
 * @project Name: mynettydemo
 * @file Name: LengthFieldBasedFrameDecoderTest
 * @desc 基于长度的变长解码器测试
 * @date 2021/8/20
 * @time 9:30
 * @by IDE: IntelliJ IDEA
 */
public class LengthFieldBasedFrameDecoderTest {


     @Test
    public void testDiscardTooLongFrame1()
    {
        // 1.构造ByteBuf
        // 默认大端格式,refCnt=1,长度为:256,readerIndex=0,writeIndex=0
        // 高位:byte[0]:0,低位byte[3]:32
        ByteBuf buf = Unpooled.buffer();

        /*
         * | 4 byte|
         * +-------+
         * |  32   |        length = 4 bytes
         * +-------+
         */
        buf.writeInt(32);

        /*
         * | 4 bytes|          32bytes       |
         * +--------+---*---+-----------+----+
         * |   32   | 1 | 2 | ...       | 32 |        length = 36 bytes
         * +--------+---+---+-----------+----+
         */
        for (int i = 0; i < 32; i++) {
            buf.writeByte(i);
        }

        /*
         * | 4 bytes|          32bytes       | 4 bytes|
         * +--------+---*---+-----------+----+--------+
         * |   32   | 1 | 2 | ......... | 32 |    1   |        length = 40 bytes
         * +--------+---+---+-----------+----+--------+
         */
        buf.writeInt(1);

        /*
         * | 4 bytes|          32bytes       | 4 bytes|1 bytes|
         * +--------+---*---+-----------+----+--------+-------+
         * |   32   | 1 | 2 | ......... | 32 |    1   |   a   |        length = 41 bytes
         * +--------+---+---+-----------+----+--------+-------+
         */
        buf.writeByte('a');
        // lengthFieldEndOffset:4
        EmbeddedChannel channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(
                16, 0, 4));
        try {
            // 逻辑:先收到后判断,一下子收到41字节,超过 maxFrameLength,所以抛弃:4+32内容
            // 紧接着还剩:5字节(1个int 和 1个byte)
            // 2.字节数据写入通道
            channel.writeInbound(buf);
            fail();
        } catch (TooLongFrameException e) {
            // expected
            e.printStackTrace();
        }
        assertTrue(channel.finish());

        // 3.从通道中读取数据
        ByteBuf b = channel.readInbound();
        assertEquals(5, b.readableBytes());
        assertEquals(1, b.readInt());
        assertEquals('a', b.readByte());
        b.release();

        assertNull(channel.readInbound());
        channel.finish();
    }

}

2.2Netty变长解码器源码分析之decode

解码时,用到了设计模式中的构子函数 ByteToMessageDecoder::decode

2.2.1LengthFieldBasedFrameDecoder类继承关系

image-20210829154720769.png
// 入站处理器 LengthFieldBasedFrameDecoder:ChannelInboundHandler
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
    public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
        public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
            public abstract class ChannelHandlerAdapter implements ChannelHandler {
                public interface ChannelInboundHandler extends ChannelHandler {
                    public interface ChannelHandler {

2.2.2AttributeMap如何破线程安全

/**
 * Holds {@link Attribute}s which can be accessed via {@link AttributeKey}.
 *
 * Implementations must be Thread-safe.
 */
public interface AttributeMap {
    /**
     * Get the {@link Attribute} for the given {@link AttributeKey}. This method will never return null, but may return
     * an {@link Attribute} which does not have a value set yet.
     */
    <T> Attribute<T> attr(AttributeKey<T> key);

    /**
     * Returns {@code} true if and only if the given {@link Attribute} exists in this {@link AttributeMap}.
     */
    <T> boolean hasAttr(AttributeKey<T> key);
}

2.2.2.1DefaultAttributeMap

@SuppressWarnings("unchecked")
@Override
public <T> Attribute<T> attr(AttributeKey<T> key) {
        synchronized (head) {
            DefaultAttribute<?> curr = head;
public <T> boolean hasAttr(AttributeKey<T> key) {
    if (key == null) {
        throw new NullPointerException("key");
    }
    AtomicReferenceArray<DefaultAttribute<?>> attributes = this.attributes;
    if (attributes == null) {
        // no attribute exists
        return false;
    }

    int i = index(key);
    DefaultAttribute<?> head = attributes.get(i);
    if (head == null) {
        // No attribute exists which point to the bucket in which the head should be located
        return false;
    }

    // We need to synchronize on the head.
    synchronized (head) {
        // Start with head.next as the head itself does not store an attribute.
        DefaultAttribute<?> curr = head.next;
        while (curr != null) {
            if (curr.key == key && !curr.removed) {
                return true;
            }
            curr = curr.next;
        }
        return false;
    }
}

2.2.3DefaultChannelPipeline

/**
 * The default {@link ChannelPipeline} implementation.  It is usually created
 * by a {@link Channel} implementation when the {@link Channel} is created.
 * 通道创建时,链表已经创建完成
 */
public class DefaultChannelPipeline implements ChannelPipeline {
        final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    

2.2.3.1HeadContext

// HeadContext.class extends AbstractChannelHandlerContext
final class HeadContext extends AbstractChannelHandlerContext
        implements ChannelOutboundHandler, ChannelInboundHandler {

    private final Unsafe unsafe;

    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, false, true);
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }

    @Override
    public ChannelHandler handler() {
        return this;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // NOOP
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // NOOP
    }

    @Override
    public void bind(
            ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
            throws Exception {
        unsafe.bind(localAddress, promise);
    }

    @Override
    public void connect(
            ChannelHandlerContext ctx,
            SocketAddress remoteAddress, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        unsafe.connect(remoteAddress, localAddress, promise);
    }

    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        unsafe.disconnect(promise);
    }

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        unsafe.close(promise);
    }

    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        unsafe.deregister(promise);
    }

    @Override
    public void read(ChannelHandlerContext ctx) {
        unsafe.beginRead();
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }

    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        unsafe.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        invokeHandlerAddedIfNeeded();
        ctx.fireChannelRegistered();
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();

        // Remove all handlers sequentially if channel is closed and unregistered.
        if (!channel.isOpen()) {
            destroy();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();

        readIfIsAutoRead();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();

        readIfIsAutoRead();
    }

    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }
}

2.2.3.2触发顺序

2.2.3.2.1DefaultChannelPipeline.class
// 1.DefaultChannelPipeline.class
@Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
2.2.3.2.2AbstractChannelHandlerContext.class
// 2.AbstractChannelHandlerContext.class
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                // 获取入站处理器,DefaultChannelPipeline$HeadContext#0
                // 并调用channelRead事件,数据:msg
                
                // DefaultChannelPipeline$HeadContext#0::channelRead
                //-->AbstractChannelHandlerContext::channelRead
                //---->LengthFieldBasedFrameDecoder#0::channelRead
                //------>又回到该invokeChannelRead函数
                //-------->最终到这里,形成一个大大的环
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
                
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
2.2.3.2.3AbstractChannelHandlerContext.class(HeadContext)
// 3.DefaultChannelPipeline.class
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // ctx:DefaultChannelPipeline$HeadContext#0
            // final class HeadContext extends AbstractChannelHandlerContext
            ctx.fireChannelRead(msg);
        }
2.2.3.2.4AbstractChannelHandlerContext.class(HeadContext)
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    // findContextInbound:查找下一个入站处理器
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

2.2.3.2.5ByteToMessageDecoder
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) { 
            // 循环读取消息,知道无消息可读
                // 第一次读取处理了4+32=36个字节
                // 第二次读取剩下的5个字节
                
                int outSize = out.size();

                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes();
                // 注意这里 decode被 LengthFieldBasedFrameDecoder 重写
                decode(ctx, in, out);
    /**
     * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
     * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
     * {@link ByteBuf}.
     *
     * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @param in            the {@link ByteBuf} from which to read data
     * @param out           the {@link List} to which decoded messages should be added
     * @throws Exception    is thrown if an error accour
     */
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

2.2.4LengthFieldBasedFrameDecoder.decode解码分析

// 先回忆一下读取的ByteBuf
/*
         * | 4 bytes|          32bytes       | 4 bytes|1 bytes|
         * +--------+---*---+-----------+----+--------+-------+
         * |   32   | 1 | 2 | ......... | 32 |    1   |   a   |        length = 41 bytes
         * +--------+---+---+-----------+----+--------+-------+
         */
image-20210829155608890.png

DefaultChannelPipeline$HeadContext#0:inbound=false,outbound=true

LengthFieldBasedFrameDecoder#0:inbound=true,outbound=false

DefaultChannelPipeline$TailContext#0:inbound=true,outbound=false

/**
     * Create a frame out of the {@link ByteBuf} and return it.
     *
     * @param   ctx             the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @param   in              the {@link ByteBuf} from which to read data
     * @return  frame           the {@link ByteBuf} which represent the frame or {@code null} if no frame could
     *                          be created.
     */
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (discardingTooLongFrame) {
            long bytesToDiscard = this.bytesToDiscard;
            int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
            in.skipBytes(localBytesToDiscard);
            bytesToDiscard -= localBytesToDiscard;
            this.bytesToDiscard = bytesToDiscard;

            failIfNecessary(false);
        }
        // 1.in.readableBytes 可读字节长度:41,肯定要大于读取的长度,这是基本要求
        if (in.readableBytes() < lengthFieldEndOffset) {
            return null; // 什么意思
        }

        // 2.开始需要读取的长度Length偏移位置:
        // 第一次预期actualLengthFieldOffset:in.readerIndex():0+0==>0
        // 第二次预期actualLengthFieldOffset:36+0==>36
        int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
        
        // 3.获取字节in中长度headLen值:
        // 第一次lengthFieldLength:=4,frameLength=headLen值为:32
        // 第二次lengthFieldLength:=4,frameLength=headLen值为:1
        long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);

        if (frameLength < 0) {
            in.skipBytes(lengthFieldEndOffset);
            throw new CorruptedFrameException(
                    "negative pre-adjustment length field: " + frameLength);
        }
        // 4.按照数据结构
        // 第一次预期的完整消息长度(headLen+content)
        // 增加长度纠偏lengthAdjustment+lengthFieldEndOffset:32+0+4=36-->frameLength
        
        // 第二次预期的完整消息长度(headLen+content)
        // 增加长度纠偏lengthAdjustment+lengthFieldEndOffset:1+0+4=5-->frameLength
        frameLength += lengthAdjustment + lengthFieldEndOffset;

        if (frameLength < lengthFieldEndOffset) {
            in.skipBytes(lengthFieldEndOffset);
            throw new CorruptedFrameException(
                    "Adjusted frame length (" + frameLength + ") is less " +
                    "than lengthFieldEndOffset: " + lengthFieldEndOffset);
        }

        if (frameLength > maxFrameLength) {
            // 4.1.丢弃的长度 36-41=-5
            long discard = frameLength - in.readableBytes();
            tooLongFrameLength = frameLength;

            if (discard < 0) {
                // buffer contains more bytes then the frameLength so we can discard all now
                
                // 4.2.直接丢弃前面的字节,跳过完整消息报frameLength个长度
                // 继续下一轮回的decode解码器
                // 第一次后,此时buf中in 读写索引变化情况如下:
                // readIndex变化情况:0-->36
                // writerIndex:41-->41(缓存字节没变)
                in.skipBytes((int) frameLength);
            } else {
                // Enter the discard mode and discard everything received so far.
                discardingTooLongFrame = true;
                bytesToDiscard = discard;
                // 直接丢弃前面的字节
                in.skipBytes(in.readableBytes());
            }
            failIfNecessary(true);
            // 继续下一次 while ByteToMessageDecode解码
            return null;
        }

        // 5.完整的消息包
        // never overflows because it's less than maxFrameLength
        int frameLengthInt = (int) frameLength;
        
        // 第一次 in.readableBytes():41
        // 第二次 in.readableBytes():5
        if (in.readableBytes() < frameLengthInt) {
            return null;
        }

        if (initialBytesToStrip > frameLengthInt) {
            in.skipBytes(frameLengthInt);
            throw new CorruptedFrameException(
                    "Adjusted frame length (" + frameLength + ") is less " +
                    "than initialBytesToStrip: " + initialBytesToStrip);
        }
        in.skipBytes(initialBytesToStrip);

        // extract frame
        // 6.组装最终ByteBuf:frame
        // in.readerIndex():36
        int readerIndex = in.readerIndex();
        
        int actualFrameLength = frameLengthInt - initialBytesToStrip; // 5-0(部分场景需要)
        
        // in:io.netty.buffer.UnpooledUnsafeHeapByteBuf
        // frame:io.netty.buffer.UnpooledSlicedByteBuf        
  
        // extractFrame具体动作:buffer.retainedSlice(index, length):构造新的 ByteBuf,refCnt:2与in一样,同一个指针引用
        ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
        // 修改in.readerIndex下标
        in.readerIndex(readerIndex + actualFrameLength);
        return frame;
    }
// 获取具体消息体buf中 headLen长度的值
protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
    buf = buf.order(order);
    long frameLength;
    switch (length) {
    case 1:
        frameLength = buf.getUnsignedByte(offset);
        break;
    case 2:
        frameLength = buf.getUnsignedShort(offset);
        break;
    case 3:
        frameLength = buf.getUnsignedMedium(offset);
        break;
    case 4:
        frameLength = buf.getUnsignedInt(offset);
        break;
    case 8:
        frameLength = buf.getLong(offset);
        break;
    default:
        throw new DecoderException(
                "unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");
    }
    return frameLength;
}

3总结

3.1项目应用

3.1.1自定义消息对应的变长参数

@MessageDecodeAnnotattion(
        maxFrameLength = 1024,
        lengthFieldOffset = 5, // 跳过前面5个字节就到长度字段
        lengthFieldLength = 1, // 消息内容数据长度
        lengthAdjustment = 0,  // 完整包的进行长度微调
        initialBytesToStrip = 0, // 数据全要 ok
        failFast = true, readerIdleTimeSeconds = 30, writerIdleTimeSeconds = 0, allIdleTimeSeconds = 0)

3.1.2自定义消息结构

@OffsetAnnoattion(position = 0, datType = EnumDataType.ByteArray, DataLength = 2)
private byte[] head;        // 数组,2长度。头部标识 0xAAAA

@OffsetAnnoattion(position = 2, datType = EnumDataType.Byte)
private byte addr;          // 地址,1表示设备收

@OffsetAnnoattion(position = 3, datType = EnumDataType.Byte)
private byte sys;           // 系统号,0表示系统1或通道1或射频1,1表示系统2或通道2或射频2

@OffsetAnnoattion(position = 4, datType = EnumDataType.Byte)
private byte type;          // 消息类型

@OffsetAnnoattion(position = 5, datType = EnumDataType.Byte)
private byte data_length;   // 消息数据长度,紧随其后的是消息ID和消息数据长度的总和,如果这个值是:00x10且lengthAdjustment = 0,表示还要读16个字节,netty才能任务是一个完整的包

@OffsetAnnoattion(position = 6, datType = EnumDataType.Short)
private short messageid;    // 消息ID [是上位机每发一个消息的递增序号,可以总是填充全0]

@OffsetAnnoattion(position = 8, datType = EnumDataType.ByteArray, DataLength = -1)
private byte[] content_data; // 消息数据[254]

3.1.3获取指定属性名称的字节内容

涉及知识点:.capacity()、.position(XXX)、get(dest,0,length)

/**
 * 获取指定属性名称的字节数组
 @param propertyName 属性名称,如content_data(数组类型)
 * @return 字节数据
 */
protected byte[] GetBytes(String propertyName) {
    
    // 1.获取OffsetAnnoattion字段注解参数
    PropertyInfo info = ParseOffsetAnnoattion(propertyName);
    OffsetAnnoattion annoation = info.getOffset();
    int dataLength = annoation.DataLength();
    // 2.如果长度没有指定,则直接当前字节流去掉前面的已知长度
    int position = annoation.position();
    if (dataLength < 0) {
        // java.nio.ByteBuffer.m_buffer:完整消息包
        dataLength = this.m_buffer.capacity() - position;
    }
    // 3.构建指定长度的数组
    byte[] dest = new byte[dataLength];
    // 4.数组校验
    if (annoation.datType() != EnumDataType.ByteArray) {
        return null;
    }
    // 5.填充dest字节数组
    ((ByteBuffer) m_buffer.position(position)).get(dest, 0, dataLength);
    // 6.返回dest
    return dest;
}

参考

1.1【看完就会】Netty的LengthFieldBasedFrameDecoder的用法详解

https://blog.csdn.net/zougen/article/details/79037675

1.2【Netty】decoder相关(四):长度域解码器LengthFieldBasedFrameDecoder

https://blog.csdn.net/qq_33347239/article/details/104337384

1.3netty 中LengthFieldPrepender与LengthFieldBasedFrameDecoder

https://blog.csdn.net/cgj296645438/article/details/90667419

1.4Netty整合MessagePack、LengthFieldBasedFrameDecoder解决粘包/拆包问题

https://www.jianshu.com/p/2f4b9ecf2fdf

1.5Netty-源码分析ByteBuf-readSlice和readRetainedSlice使用细节

https://blog.csdn.net/nimasike/article/details/103462546

上一篇下一篇

猜你喜欢

热点阅读