2021-08-29_Netty基于长度的变长解码器源码学习笔记
2021-08-29 本文已影响0人
kikop
20210829_netty基于长度的变长解码器源码学习笔记2
1概述
LengthFieldBasedFrameDecoder基于长度的变长解码器,其核心要点:消息结构体中要有指定消息体或整个消息的长度说明。
这里如果一个消息没有指定固定长度的话, Netty也给我们准备了一个编码器,即LengthFieldPrepender(消息发送之前预先长度编码),这两个类通常一起使用来解决半包与粘包的问题。
常见的应用场景:专用的客户端、服务器程序。
本机主要是结合代码分析LengthFieldBasedFrameDecoder解码时序过程。
1.1LengthFieldBasedFrameDecoder解码器
如果消息是通过长度进行区分(LengthFieldPrepender),此解码器可以处理粘包与半包问题。
1.1.1构造函数
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, // 长度偏移位置
int lengthFieldLength, // 长度占用的字节长度
int lengthAdjustment, // 消息内容读取结束标志调整,决定netty还要读多少个字节,就是一个完整的消息包
int initialBytesToStrip) { // 拿到一个完整的数据包之后向业务解码传递之前,应该跳过多少字节(有时想跳过指定的数据长度,或者屏蔽不感兴趣的数据)
this(
maxFrameLength,
lengthFieldOffset, lengthFieldLength, lengthAdjustment,
initialBytesToStrip, true);
}
1.2pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>technicaltools</artifactId>
<groupId>com.kikop</groupId>
<version>1.0-SNAPSHOT</version>
<!--定义依赖的父pom文件-->
<relativePath>../pom.xml</relativePath>
</parent>
<packaging>jar</packaging>
<artifactId>mylengthfielddecoderdemo</artifactId>
<version>1.0-SNAPSHOT</version>
<description>mylengthfielddecoderdemo project</description>
<properties>
<junit.version>5.7.0</junit.version>
</properties>
<dependencies>
<!--1.mytechcommon-->
<dependency>
<groupId>com.kikop</groupId>
<artifactId>mytechcommon</artifactId>
<version>2.0-SNAPSHOT</version>
</dependency>
<!--2.junit-->
<!-- Common test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.18.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.artsok</groupId>
<artifactId>rerunner-jupiter</artifactId>
<version>2.1.6</version>
<scope>test</scope>
</dependency>
<!--3.netty-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<!--<version>${netty.version}</version>-->
<version>4.1.6.Final</version>
</dependency>
</dependencies>
<build>
</build>
</project>
2代码示例
2.1测试
package com.kikop;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.TooLongFrameException;
import org.junit.Assert;
import org.junit.Test;
/**
* @author kikop
* @version 1.0
* @project Name: mynettydemo
* @file Name: LengthFieldBasedFrameDecoderTest
* @desc 基于长度的变长解码器测试
* @date 2021/8/20
* @time 9:30
* @by IDE: IntelliJ IDEA
*/
public class LengthFieldBasedFrameDecoderTest {
@Test
public void testDiscardTooLongFrame1()
{
// 1.构造ByteBuf
// 默认大端格式,refCnt=1,长度为:256,readerIndex=0,writeIndex=0
// 高位:byte[0]:0,低位byte[3]:32
ByteBuf buf = Unpooled.buffer();
/*
* | 4 byte|
* +-------+
* | 32 | length = 4 bytes
* +-------+
*/
buf.writeInt(32);
/*
* | 4 bytes| 32bytes |
* +--------+---*---+-----------+----+
* | 32 | 1 | 2 | ... | 32 | length = 36 bytes
* +--------+---+---+-----------+----+
*/
for (int i = 0; i < 32; i++) {
buf.writeByte(i);
}
/*
* | 4 bytes| 32bytes | 4 bytes|
* +--------+---*---+-----------+----+--------+
* | 32 | 1 | 2 | ......... | 32 | 1 | length = 40 bytes
* +--------+---+---+-----------+----+--------+
*/
buf.writeInt(1);
/*
* | 4 bytes| 32bytes | 4 bytes|1 bytes|
* +--------+---*---+-----------+----+--------+-------+
* | 32 | 1 | 2 | ......... | 32 | 1 | a | length = 41 bytes
* +--------+---+---+-----------+----+--------+-------+
*/
buf.writeByte('a');
// lengthFieldEndOffset:4
EmbeddedChannel channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(
16, 0, 4));
try {
// 逻辑:先收到后判断,一下子收到41字节,超过 maxFrameLength,所以抛弃:4+32内容
// 紧接着还剩:5字节(1个int 和 1个byte)
// 2.字节数据写入通道
channel.writeInbound(buf);
fail();
} catch (TooLongFrameException e) {
// expected
e.printStackTrace();
}
assertTrue(channel.finish());
// 3.从通道中读取数据
ByteBuf b = channel.readInbound();
assertEquals(5, b.readableBytes());
assertEquals(1, b.readInt());
assertEquals('a', b.readByte());
b.release();
assertNull(channel.readInbound());
channel.finish();
}
}
2.2Netty变长解码器源码分析之decode
解码时,用到了设计模式中的构子函数 ByteToMessageDecoder::decode
2.2.1LengthFieldBasedFrameDecoder类继承关系
image-20210829154720769.png// 入站处理器 LengthFieldBasedFrameDecoder:ChannelInboundHandler
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
public abstract class ChannelHandlerAdapter implements ChannelHandler {
public interface ChannelInboundHandler extends ChannelHandler {
public interface ChannelHandler {
2.2.2AttributeMap如何破线程安全
/**
* Holds {@link Attribute}s which can be accessed via {@link AttributeKey}.
*
* Implementations must be Thread-safe.
*/
public interface AttributeMap {
/**
* Get the {@link Attribute} for the given {@link AttributeKey}. This method will never return null, but may return
* an {@link Attribute} which does not have a value set yet.
*/
<T> Attribute<T> attr(AttributeKey<T> key);
/**
* Returns {@code} true if and only if the given {@link Attribute} exists in this {@link AttributeMap}.
*/
<T> boolean hasAttr(AttributeKey<T> key);
}
2.2.2.1DefaultAttributeMap
@SuppressWarnings("unchecked")
@Override
public <T> Attribute<T> attr(AttributeKey<T> key) {
synchronized (head) {
DefaultAttribute<?> curr = head;
public <T> boolean hasAttr(AttributeKey<T> key) {
if (key == null) {
throw new NullPointerException("key");
}
AtomicReferenceArray<DefaultAttribute<?>> attributes = this.attributes;
if (attributes == null) {
// no attribute exists
return false;
}
int i = index(key);
DefaultAttribute<?> head = attributes.get(i);
if (head == null) {
// No attribute exists which point to the bucket in which the head should be located
return false;
}
// We need to synchronize on the head.
synchronized (head) {
// Start with head.next as the head itself does not store an attribute.
DefaultAttribute<?> curr = head.next;
while (curr != null) {
if (curr.key == key && !curr.removed) {
return true;
}
curr = curr.next;
}
return false;
}
}
2.2.3DefaultChannelPipeline
/**
* The default {@link ChannelPipeline} implementation. It is usually created
* by a {@link Channel} implementation when the {@link Channel} is created.
* 通道创建时,链表已经创建完成
*/
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
2.2.3.1HeadContext
// HeadContext.class extends AbstractChannelHandlerContext
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
@Override
public ChannelHandler handler() {
return this;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
}
2.2.3.2触发顺序
2.2.3.2.1DefaultChannelPipeline.class
// 1.DefaultChannelPipeline.class
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
2.2.3.2.2AbstractChannelHandlerContext.class
// 2.AbstractChannelHandlerContext.class
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 获取入站处理器,DefaultChannelPipeline$HeadContext#0
// 并调用channelRead事件,数据:msg
// DefaultChannelPipeline$HeadContext#0::channelRead
//-->AbstractChannelHandlerContext::channelRead
//---->LengthFieldBasedFrameDecoder#0::channelRead
//------>又回到该invokeChannelRead函数
//-------->最终到这里,形成一个大大的环
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
2.2.3.2.3AbstractChannelHandlerContext.class(HeadContext)
// 3.DefaultChannelPipeline.class
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ctx:DefaultChannelPipeline$HeadContext#0
// final class HeadContext extends AbstractChannelHandlerContext
ctx.fireChannelRead(msg);
}
2.2.3.2.4AbstractChannelHandlerContext.class(HeadContext)
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
// findContextInbound:查找下一个入站处理器
invokeChannelRead(findContextInbound(), msg);
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
2.2.3.2.5ByteToMessageDecoder
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
// 循环读取消息,知道无消息可读
// 第一次读取处理了4+32=36个字节
// 第二次读取剩下的5个字节
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
// 注意这里 decode被 LengthFieldBasedFrameDecoder 重写
decode(ctx, in, out);
/**
* Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
* {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
* {@link ByteBuf}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
* @throws Exception is thrown if an error accour
*/
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
2.2.4LengthFieldBasedFrameDecoder.decode解码分析
// 先回忆一下读取的ByteBuf
/*
* | 4 bytes| 32bytes | 4 bytes|1 bytes|
* +--------+---*---+-----------+----+--------+-------+
* | 32 | 1 | 2 | ......... | 32 | 1 | a | length = 41 bytes
* +--------+---+---+-----------+----+--------+-------+
*/
image-20210829155608890.png
DefaultChannelPipeline$HeadContext#0:inbound=false,outbound=true
LengthFieldBasedFrameDecoder#0:inbound=true,outbound=false
DefaultChannelPipeline$TailContext#0:inbound=true,outbound=false
/**
* Create a frame out of the {@link ByteBuf} and return it.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
* be created.
*/
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (discardingTooLongFrame) {
long bytesToDiscard = this.bytesToDiscard;
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
in.skipBytes(localBytesToDiscard);
bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
failIfNecessary(false);
}
// 1.in.readableBytes 可读字节长度:41,肯定要大于读取的长度,这是基本要求
if (in.readableBytes() < lengthFieldEndOffset) {
return null; // 什么意思
}
// 2.开始需要读取的长度Length偏移位置:
// 第一次预期actualLengthFieldOffset:in.readerIndex():0+0==>0
// 第二次预期actualLengthFieldOffset:36+0==>36
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
// 3.获取字节in中长度headLen值:
// 第一次lengthFieldLength:=4,frameLength=headLen值为:32
// 第二次lengthFieldLength:=4,frameLength=headLen值为:1
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"negative pre-adjustment length field: " + frameLength);
}
// 4.按照数据结构
// 第一次预期的完整消息长度(headLen+content)
// 增加长度纠偏lengthAdjustment+lengthFieldEndOffset:32+0+4=36-->frameLength
// 第二次预期的完整消息长度(headLen+content)
// 增加长度纠偏lengthAdjustment+lengthFieldEndOffset:1+0+4=5-->frameLength
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than lengthFieldEndOffset: " + lengthFieldEndOffset);
}
if (frameLength > maxFrameLength) {
// 4.1.丢弃的长度 36-41=-5
long discard = frameLength - in.readableBytes();
tooLongFrameLength = frameLength;
if (discard < 0) {
// buffer contains more bytes then the frameLength so we can discard all now
// 4.2.直接丢弃前面的字节,跳过完整消息报frameLength个长度
// 继续下一轮回的decode解码器
// 第一次后,此时buf中in 读写索引变化情况如下:
// readIndex变化情况:0-->36
// writerIndex:41-->41(缓存字节没变)
in.skipBytes((int) frameLength);
} else {
// Enter the discard mode and discard everything received so far.
discardingTooLongFrame = true;
bytesToDiscard = discard;
// 直接丢弃前面的字节
in.skipBytes(in.readableBytes());
}
failIfNecessary(true);
// 继续下一次 while ByteToMessageDecode解码
return null;
}
// 5.完整的消息包
// never overflows because it's less than maxFrameLength
int frameLengthInt = (int) frameLength;
// 第一次 in.readableBytes():41
// 第二次 in.readableBytes():5
if (in.readableBytes() < frameLengthInt) {
return null;
}
if (initialBytesToStrip > frameLengthInt) {
in.skipBytes(frameLengthInt);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than initialBytesToStrip: " + initialBytesToStrip);
}
in.skipBytes(initialBytesToStrip);
// extract frame
// 6.组装最终ByteBuf:frame
// in.readerIndex():36
int readerIndex = in.readerIndex();
int actualFrameLength = frameLengthInt - initialBytesToStrip; // 5-0(部分场景需要)
// in:io.netty.buffer.UnpooledUnsafeHeapByteBuf
// frame:io.netty.buffer.UnpooledSlicedByteBuf
// extractFrame具体动作:buffer.retainedSlice(index, length):构造新的 ByteBuf,refCnt:2与in一样,同一个指针引用
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
// 修改in.readerIndex下标
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
// 获取具体消息体buf中 headLen长度的值
protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
buf = buf.order(order);
long frameLength;
switch (length) {
case 1:
frameLength = buf.getUnsignedByte(offset);
break;
case 2:
frameLength = buf.getUnsignedShort(offset);
break;
case 3:
frameLength = buf.getUnsignedMedium(offset);
break;
case 4:
frameLength = buf.getUnsignedInt(offset);
break;
case 8:
frameLength = buf.getLong(offset);
break;
default:
throw new DecoderException(
"unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");
}
return frameLength;
}
3总结
3.1项目应用
3.1.1自定义消息对应的变长参数
@MessageDecodeAnnotattion(
maxFrameLength = 1024,
lengthFieldOffset = 5, // 跳过前面5个字节就到长度字段
lengthFieldLength = 1, // 消息内容数据长度
lengthAdjustment = 0, // 完整包的进行长度微调
initialBytesToStrip = 0, // 数据全要 ok
failFast = true, readerIdleTimeSeconds = 30, writerIdleTimeSeconds = 0, allIdleTimeSeconds = 0)
3.1.2自定义消息结构
@OffsetAnnoattion(position = 0, datType = EnumDataType.ByteArray, DataLength = 2)
private byte[] head; // 数组,2长度。头部标识 0xAAAA
@OffsetAnnoattion(position = 2, datType = EnumDataType.Byte)
private byte addr; // 地址,1表示设备收
@OffsetAnnoattion(position = 3, datType = EnumDataType.Byte)
private byte sys; // 系统号,0表示系统1或通道1或射频1,1表示系统2或通道2或射频2
@OffsetAnnoattion(position = 4, datType = EnumDataType.Byte)
private byte type; // 消息类型
@OffsetAnnoattion(position = 5, datType = EnumDataType.Byte)
private byte data_length; // 消息数据长度,紧随其后的是消息ID和消息数据长度的总和,如果这个值是:00x10且lengthAdjustment = 0,表示还要读16个字节,netty才能任务是一个完整的包
@OffsetAnnoattion(position = 6, datType = EnumDataType.Short)
private short messageid; // 消息ID [是上位机每发一个消息的递增序号,可以总是填充全0]
@OffsetAnnoattion(position = 8, datType = EnumDataType.ByteArray, DataLength = -1)
private byte[] content_data; // 消息数据[254]
3.1.3获取指定属性名称的字节内容
涉及知识点:.capacity()、.position(XXX)、get(dest,0,length)
/**
* 获取指定属性名称的字节数组
@param propertyName 属性名称,如content_data(数组类型)
* @return 字节数据
*/
protected byte[] GetBytes(String propertyName) {
// 1.获取OffsetAnnoattion字段注解参数
PropertyInfo info = ParseOffsetAnnoattion(propertyName);
OffsetAnnoattion annoation = info.getOffset();
int dataLength = annoation.DataLength();
// 2.如果长度没有指定,则直接当前字节流去掉前面的已知长度
int position = annoation.position();
if (dataLength < 0) {
// java.nio.ByteBuffer.m_buffer:完整消息包
dataLength = this.m_buffer.capacity() - position;
}
// 3.构建指定长度的数组
byte[] dest = new byte[dataLength];
// 4.数组校验
if (annoation.datType() != EnumDataType.ByteArray) {
return null;
}
// 5.填充dest字节数组
((ByteBuffer) m_buffer.position(position)).get(dest, 0, dataLength);
// 6.返回dest
return dest;
}