netty-自定义协议编解码器

2020-08-18  本文已影响0人  XII01

1. 协议格式

 * * * 自定义协议 数据包格式
 * * * -----------------------------------
 * * Length 数据包长度   Data长度 + Head长度
 * * Version    协议版本号   默认0x0
 * * Command    操作指令码   十六进制
 * * MsgType    数据类型    0x0:Json,0x1:ProtoBuf,0x2:Xml,默认:0x0
 * * SecretKey  密钥key   默认0x0,明文
 * * Data   业务数据包

2. 自定义编码器

/**
 * @author wangxx
 * @data 2020/6/19
 * describe 自定义编码器
 * * * -----------------------------------
 */
public class CustomizeEncoder extends MessageToByteEncoder<IMMessage> {
    private static final String TAG = "CustomizeEncoder";
    /**
     * 头部固定长度7个字节
     */
    public static final int HEAD_LENGTH = 7;
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, IMMessage message, ByteBuf byteBuf) throws Exception {
        Log.i(TAG, channelHandlerContext.toString() + ",pk=" + message.toString());
        if (null == message) {
            throw new Exception();
        }
        String body = message.getData();
        byte[] bodyBytes = null;
        //包长度
        int headLength = message.getLength();
        if (!TextUtils.isEmpty(body)) {
            bodyBytes = body.getBytes(Charset.forName("utf-8"));
            headLength = bodyBytes.length+HEAD_LENGTH;
        }

        byteBuf.writeShort(headLength);
        //版本号
        byteBuf.writeByte(message.getVersion());
        //操作指令
        byteBuf.writeShort(message.getCommand());
        //数据类型
        byteBuf.writeByte(message.getMsgType());
        //密钥key
        byteBuf.writeByte(message.getSecretKey());
        //数据包
        if (bodyBytes != null && bodyBytes.length > 0) {
            byteBuf.writeBytes(bodyBytes);
        }

    }
}

3. 自定义解码器

public class CustomizeDecoder extends ByteToMessageDecoder {

    private static final String TAG = "CustomizeDecoder";
    /**
     * 头部固定长度7个字节
     */
    public static final int HEAD_LENGTH = 7;


    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buffer, List<Object> out) throws Exception {
        Log.i(TAG, "decode ChannelHandlerContext=" + channelHandlerContext.toString() + " ,bytebuf=" + buffer.toString() + ",list=" + out.toString());
        Log.i(TAG, "decode ChannelHandlerContext=" + convertByteBufToString(buffer));
        if (buffer.readableBytes() < HEAD_LENGTH) {
            Log.i(TAG, "数据不足一条");
            return;
        }

        // 记录包头开始的index
        int beginReader;
        int contentLength;
        IMMessage message = new IMMessage();
        while (true) {
            // 获取包头开始的index
            beginReader = buffer.readerIndex();
            // 标记包头开始的index
            buffer.markReaderIndex();
            // 读到了协议的开始标志,结束while循环
            short headLength = buffer.readShort();
            //版本号
            byte version = buffer.readByte();
            //操作指令
            int command = buffer.readUnsignedShort();
            //数据类型
            byte msgType = buffer.readByte();
            //密钥key
            byte secretKey = buffer.readByte();
            message.setLength(headLength);
            message.setVersion(version);
            message.setCommand(command);
            message.setMsgType(msgType);
            message.setSecretKey(secretKey);
            //按照协议计算data的长度
            contentLength = headLength - HEAD_LENGTH;
            if (buffer.readableBytes() == contentLength) {
                break;
            }

            // 未读到包头,略过一个字节
            // 每次略过,一个字节,去读取,包头信息的开始标记
            buffer.resetReaderIndex();
            buffer.readByte();

            // 当略过,一个字节之后,
            // 数据包的长度,又变得不满足
            // 此时,应该结束。等待后面的数据到达
            if (buffer.readableBytes() < HEAD_LENGTH) {
                return;
            }
        }

        // 消息的长度

        // 判断请求数据包数据是否到齐
        if (buffer.readableBytes() < contentLength) {
            // 还原读指针
            buffer.readerIndex(beginReader);
            return;
        }

        // 读取data数据
        byte[] bytes = new byte[contentLength];
        buffer.readBytes(bytes);

        if (message.getCommand() == TcpAPI.MIMSocketCommandHeartBeatPong) {
            message.setTimeStamp(ByteUtil.bytesToInt(bytes));
        } else {
            message.setData(new String(bytes, StandardCharsets.UTF_8));
        }
        out.add(message);
    }


    public String convertByteBufToString(ByteBuf buf) {
        String str;
        if (buf.hasArray()) { // 处理堆缓冲区
            str = new String(buf.array(), buf.arrayOffset() + buf.readerIndex(), buf.readableBytes());
        } else { // 处理直接缓冲区以及复合缓冲区
            byte[] bytes = new byte[buf.readableBytes()];
            buf.getBytes(buf.readerIndex(), bytes);
            str = new String(bytes, 0, buf.readableBytes());
        }
        return str;
    }

}

4. 协议

/**
 * 包协议
 */
public class IMMessage {
    /**
     * 数据包长度
     */
    private int Length=0x0;
    /**
     * 版本
     */
    private byte Version = 0x0;
    /**
     * 命令
     */
    private int Command;
    /**
     *  0x0:Json,0x1:ProtoBuf,0x2:Xml,默认:0x0
     */
    private byte MsgType = 0x0;
    /**
     * 密钥key | 默认0x0,明文
     */
    private byte SecretKey = 0x0;
    /**
     * 数据
     */
    private String Data;
    /**
     * 服务器时间(2分钟收到服务端给的)
     */
    private int timeStamp;

    public int getLength() {
        return Length;
    }

    public void setLength(int length) {
        Length = length;
    }

    public byte getVersion() {
        return Version;
    }

    public void setVersion(byte version) {
        Version = version;
    }

    public int  getCommand() {
        return Command;
    }

    public void setCommand(int command) {
        Command = command;
    }

    public byte getMsgType() {
        return MsgType;
    }

    public void setMsgType(byte msgType) {
        MsgType = msgType;
    }

    public byte getSecretKey() {
        return SecretKey;
    }

    public void setSecretKey(byte secretKey) {
        SecretKey = secretKey;
    }

    public String getData() {
        return Data;
    }

    public void setData(String data) {
        Data = data;
    }

    public void setData(BaseSocketBody body){
        Gson gson = new Gson();
        this.Data = gson.toJson(body);
    }

    public int getTimeStamp() {
        return timeStamp;
    }

    public void setTimeStamp(int timeStamp) {
        this.timeStamp = timeStamp;
    }

    @Override
    public String toString() {
        return "IMMessage{" +
                "Length=" + Length +
                ", Version=" + Version +
                ", Command=" + Command +
                ", MsgType=" + MsgType +
                ", SecretKey=" + SecretKey +
                ", Data='" + Data + '\'' +
                ", timeStamp=" + timeStamp +
                '}';
    }
}

上一篇下一篇

猜你喜欢

热点阅读