mpush消息编解码器代码解读

2018-07-10  本文已影响0人  王剑_a9e1

mpush是一个开源的消息推送系统,本文介绍消息编解码器的实现
mpush文档地址:http://mpush.mydoc.io/?t=134818
mpush实现了一个可扩展的私有消息协议。该协议由定长header和body组成,定长header为13个字节。

名称 类型 长度 说明
length int 4 表示body的长度
cmd byte 1 表示消息协议类型
checkcode short 2 是根据body生成的一个校验码
flags byte 1 表示当前包启用的特性,比如是否启用加密,是否启用压缩
sessionId int 4 消息会话标识用于消息响应
lrc byte 1 纵向冗余校验,用于校验header

编码器

编码器继承自MessageToByteEncoder,MessageToByteEncoder是一种 ChannelOutboundHandler的具体实现。其负责将入站数据从一种协议消息格式成字节流即Outbound ByteBuf。 在netty中使用解码器很简单,就是将入站数据转换格式后传递到 ChannelPipeline 中的下一个ChannelInboundHandler 进行处理,这样的处理是很灵活的,我们可以将解码器放在 ChannelPipeline 中。

public static void encodePacket(Packet packet, ByteBuf out) {
    if (packet.cmd == Command.HEARTBEAT.cmd) {
        out.writeByte(Packet.HB_PACKET_BYTE);
    } else {
        //ByteBuf存放字节流,Packet自定义协议的消息对象。
        out.writeInt(packet.getBodyLength());
        out.writeByte(packet.cmd);
        out.writeShort(packet.cc);
        out.writeByte(packet.flags);
        out.writeInt(packet.sessionId);
        out.writeByte(packet.lrc);
        if (packet.getBodyLength() > 0) {
            out.writeBytes(packet.body);
        }
    }
    packet.body = null;
}

解码器


解码器继承自ByteToMessageDecoder,是一种 ChannelInboundHandler 的具体实现。负责将入站数据从一种字节流转成协议消息格式即packet对象。下图解码器的流程图。



根据协议第一个字节是心跳标志字节,所以先去缓冲区读取一个字节。然后判断缓冲区的可读字节是否大于一个packet头部长度,如果小于说明当前缓冲区内还没有一个完整的packet头部,因此继续等待直到缓冲区有足够的缓冲字节。如果大于则标记当前缓冲区索引,判断缓冲区可读字节字节是否大于packet包体和packet包头之和,如果大于说明缓冲区可读字节足够,根据协议创建packet对象。如果小于说明当前缓冲区可读字节不够,继续等待。

@Override
public final class PacketDecoder extends ByteToMessageDecoder {

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        decodeHeartbeat(in, out);
        decodeFrames(in, out);
    }

    private void decodeHeartbeat(ByteBuf in, List<Object> out) {
        while (in.isReadable()) {
            if (in.readByte() == Packet.HB_PACKET_BYTE) {
                out.add(Packet.HB_PACKET);
            } else {
                in.readerIndex(in.readerIndex() - 1);
                break;
            }
        }
    }

    private void decodeFrames(ByteBuf in, List<Object> out) {
        if (in.readableBytes() >= Packet.HEADER_LEN) {
            //1.记录当前读取位置位置.如果读取到非完整的frame,要恢复到该位置,便于下次读取
            in.markReaderIndex();

            Packet packet = decodeFrame(in);
            if (packet != null) {
                out.add(packet);
            } else {
                //2.读取到不完整的frame,恢复到最近一次正常读取的位置,便于下次读取
                in.resetReaderIndex();
            }
        }
    }

    private Packet decodeFrame(ByteBuf in) {
        int readableBytes = in.readableBytes();
        int bodyLength = in.readInt();
        if (readableBytes < (bodyLength + Packet.HEADER_LEN)) {
            return null;
        }
        if (bodyLength > maxPacketSize) {
            throw new TooLongFrameException("packet body length over limit:" + bodyLength);
        }
        return decodePacket(new Packet(in.readByte()), in, bodyLength);
    }
}
上一篇下一篇

猜你喜欢

热点阅读