运维

SpringBoot+Netty构建高并发稳健的部标JT808网

2019-11-22  本文已影响0人  哆啦A梦叮当

应很多朋友的要求,今天分享一下如何使用SpringBoot和Netty构建高并发稳健的JT808网关,并且是兼容JT808-2011和JT808-2019的网关,此网关已经有多个客户在商用。

JT808网关作为部标终端连接的服务端,承载了终端登录、心跳、位置、拍照等基础业务以及信令交互,是整个系统最核心的模块,一旦崩溃,则所有部标终端都会离线,所有信令交互包括1078和主动安全的信令交互也会大受影响。所以,JT808网关的并发性稳定性健壮性成为整个系统最重要的考量之一。

很多朋友用Mina或者Netty编写网关程序时遇到过很多问题:

本文使用JDK8+的环境开发,使用SpringBoot2.x以及Netty4.x,如有不懂JDK8的新语法,请查阅资料。
此网关的特性:
1.支持JT808-2011、JT808-2019、JT1078报警、主动安全报警
2.使用MQ和Redis解耦,多模块数据共享订阅,不与任何数据库关联
3.多环境开发
4.跨平台,部署简单
5.支持ProtoBuf和JSON序列化
6.本公司首创的利用策略模式的底层封装库,模板可用于任何协议的开发,简化了网络编程的复杂度,只专注于业务开发,无任何网络编程经验的人员都可接手,节省开发成本。

1.通用TcpServer创建
public class TcpServer {
    public TcpServer(int threadPoolSize, int port) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                    }
                });

        serverBootstrap.bind(port).addListener(future -> {
            if (future.isSuccess()) {
            //启动成功
            } else {
            //启动失败
            }
        });
    }
}
2.接着设计最重要的Channel Pipeline中的链式处理器

先贴上我们pipeline的处理器都有哪些:

ch.pipeline().addLast(new IdleStateHandler(Jt808Constant.READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new Jt808FrameDecoder());
ch.pipeline().addLast(Jt808ProtocolDecoder.INSTANCE, new Jt808ProtocolEncoder());
ch.pipeline().addLast(Jt808LoginHandler.INSTANCE);
ch.pipeline().addLast(executorGroup, Jt808BusinessHandler.INSTANCE);

以下是Jt808Message的代码,我们要把每条消息所有字段都看成一个整体,没必要把消息头消息体分离出去新建其他类,最后还派生出一堆子类,只会把自己和别人绕晕。

public class Jt808Message extends BaseMessage {
    /**
     * 消息ID
     */
    private int msgId;

    /**
     * 终端手机号
     */
    private String phoneNumber;

    /**
     * 终端手机号数组
     */
    private byte[] phoneNumberArr;

    /**
     * 协议版本号
     */
    private int protocolVersion;

    /**
     * 消息流水号
     */
    private int msgFlowId;

    /**
     * 是否分包
     */
    private boolean multiPacket;

    /**
     * 版本标识
     */
    private int versionFlag;

    /**
     * 加密方式,0:不加密,1:RSA加密
     */
    private int encryptType;

    /**
     * 消息总包数
     */
    private int packetTotalCount;

    /**
     * 包序号
     */
    private int packetOrder;

    /**
     * 协议类型(JT808_2011、JT808_2013、JT905、JT808_2019)
     */
    private ProtocolEnum protocolType;
}

协议解码器代码:

@Slf4j
@Sharable
public class Jt808ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {

    public static final Jt808ProtocolDecoder INSTANCE = new Jt808ProtocolDecoder();

    private Jt808ProtocolDecoder() {
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        //消息长度
        int msgLen = msg.readableBytes();
        //包头
        msg.readByte();
        //消息ID
        int msgId = msg.readUnsignedShort();
        //消息体属性
        short msgBodyAttr = msg.readShort();
        //消息体长度
        int msgBodyLen = msgBodyAttr & 0b00000011_11111111;
        //是否分包
        boolean multiPacket = (msgBodyAttr & 0b00100000_00000000) > 0;
        //版本标识(版本标识0为2011年的版本,1为2019年的版本)
        int versionFlag = (msgBodyAttr & 0b01000000_00000000) >> 14;
        //去除消息体的基础长度
        int baseLen = Jt808Constant.MSG_BASE_LENGTH;
        ProtocolEnum protocolType = ProtocolEnum.JT808_2011;
        if (versionFlag == 1) {
            baseLen = Jt808Constant.JT2019_MSG_BASE_LENGTH;
            protocolType = ProtocolEnum.JT808_2019;
        }

        //根据消息体长度和是否分包得出后面的包长
        int ensureLen = multiPacket ? baseLen + msgBodyLen + 4 : baseLen + msgBodyLen;
        if (msgLen < ensureLen) {
            log.info("包长不对,数据长度:{},正确长度:{},数据:{}", msgLen, ensureLen, ByteBufUtil.hexDump(msg));
            return;
        }

        //数据加密方式
        int encryptType = (msgBodyAttr & 0b00011100_00000000) >> 10;
        //协议版本号
        int protocolVersion = 0;
        //终端手机号数组,JT808-2019为10个字节
        byte[] phoneNumberArr;
        if (protocolType == ProtocolEnum.JT808_2019) {
            protocolVersion = msg.readByte();
            phoneNumberArr = new byte[10];
        } else {
            phoneNumberArr = new byte[6];
        }
        msg.readBytes(phoneNumberArr);
        //终端手机号(去除前面的0)
        String phoneNumber = StringUtils.stripStart(ByteBufUtil.hexDump(phoneNumberArr), "0");
        //消息流水号
        int msgFlowId = msg.readUnsignedShort();
        //消息总包数
        int packetTotalCount = 0;
        //包序号
        int packetOrder = 0;
        //分包
        if (multiPacket) {
            packetTotalCount = msg.readShort();
            packetOrder = msg.readShort();
        }
        //消息体
        byte[] msgBodyArr = new byte[msgBodyLen];
        msg.readBytes(msgBodyArr);
        //校验码
        int checkCode = msg.readUnsignedByte();
        //包尾
        msg.readByte();

        //计算和验证校验码
        ByteBuf checksumBuf = msg.slice(1, msgLen - 3);
        int checksumResult = CommonUtil.xor(checksumBuf);
        if (checksumResult != checkCode) {
            log.error("校验码验证失败,计算结果:{},校验码:{},消息ID:{},手机号:{},数据:{}", checksumResult, checkCode, NumberUtil.formatMessageId(msgId), phoneNumber, ByteBufUtil.hexDump(msg));
            return;
        }

        //构造Jt808消息,传递到下一个handler处理
        Jt808Message jt808Msg = new Jt808Message();
        jt808Msg.setMsgId(msgId);
        jt808Msg.setEncryptType(encryptType);
        jt808Msg.setVersionFlag(versionFlag);
        jt808Msg.setProtocolType(protocolType);
        jt808Msg.setMultiPacket(multiPacket);
        jt808Msg.setProtocolVersion(protocolVersion);
        jt808Msg.setPhoneNumber(phoneNumber);
        jt808Msg.setPhoneNumberArr(phoneNumberArr);
        jt808Msg.setMsgFlowId(msgFlowId);
        jt808Msg.setPacketTotalCount(packetTotalCount);
        jt808Msg.setPacketOrder(packetOrder);
        jt808Msg.setMsgBodyArr(msgBodyArr);
        out.add(jt808Msg);
    }
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Jt808Message msg) throws Exception {
    //未接收完的分包不进入业务处理
    Jt808Message wholeMsg = handleMultiPacket(ctx, msg);
    if (wholeMsg == null) {
        return;
    }

    //获取对应的消息处理器
    int msgId = wholeMsg.getMsgId();
    BaseMessageService messageService = messageServiceProvider.getMessageService(msgId);
    ByteBuf msgBodyBuf = Unpooled.wrappedBuffer(wholeMsg.getMsgBodyArr());
    try {
        Object result = messageService.process(ctx, wholeMsg, msgBodyBuf);
        log.info("收到{}({}),终端手机号:{},消息流水号:{},内容:{}", messageService.getDesc(), NumberUtil.formatMessageId(msgId), wholeMsg.getPhoneNumber(), wholeMsg.getMsgFlowId(), wholeMsg.getMsgBodyItems());
        //发送指令应答给调用方
        if (result != null) {
            downCommandReceiver.sendUpCommand(ctx, NumberUtil.hexStr(msgId), result);
        }
    } catch (Exception e) {
        printExceptionLog(wholeMsg, messageService, e);
    } finally {
        //处理完业务逻辑统一释放资源
        ReferenceCountUtil.release(msgBodyBuf);
    }
}

首先我们先处理分包,不分包的消息直接返回给业务处理器处理。如果是分包的,收到第一包时会创建一个分包接收器,里面会自动判断有无接收完,接收完后会自动把所有分包数据整合在一起,然后返回给业务处理器处理。分包接收器代码篇幅有限暂时不贴出:

private Jt808Message handleMultiPacket(ChannelHandlerContext ctx, Jt808Message msg) {
    //不分包
    if (!msg.isMultiPacket()) {
        return msg;
    }

    //总包数
    int packetTotalCount = msg.getPacketTotalCount();
    //当前包序号
    int packetOrder = msg.getPacketOrder();
    //第一包,创建分包接收器
    if (packetTotalCount > 1 && packetOrder == 1) {
        multiPacketService.createMultiPacketReceiver(ctx, msg);
        Jt808PacketUtil.reply8001(ctx, msg);
        log.info("收到{},终端手机号:{},消息流水号:{},分包总包数:{},第{}包,内容:{}", NumberUtil.formatMessageId(msg.getMsgId()), msg.getPhoneNumber(), msg.getMsgFlowId(), packetTotalCount, packetOrder, ByteBufUtil.hexDump(msg.getMsgBodyArr()));
        return null;
    }
    //后续包
    if (packetTotalCount > 1 && packetOrder > 1) {
        Jt808Message wholeMsg = multiPacketService.addSubPacket(msg);
        Jt808PacketUtil.reply8001(ctx, msg);
        log.info("收到{},终端手机号:{},消息流水号:{},分包总包数:{},第{}包,内容:{}", NumberUtil.formatMessageId(msg.getMsgId()), msg.getPhoneNumber(), msg.getMsgFlowId(), packetTotalCount, packetOrder, ByteBufUtil.hexDump(msg.getMsgBodyArr()));
        return wholeMsg;
    }
    //单个数据包
    return msg;
}

再往下看业务处理,我们设计了一个通用的泛型消息服务类BaseMessageService<T>,T表示各种协议的消息实体类,可以处理任何私有协议(JT809和主动安全程序也使用了这种处理方式),有些私有协议的消息ID是字符串类型的,这个服务类也做了兼容,只需要实现里面的process方法即可。这个方法传递了socket上下文可以获取该socket绑定的终端信息,消息实体类T以及消息体内容的ByteBuf。每种消息类型的处理都集中在这个方法中,按照协议从ByteBuf解析消息体内容即可。
以下是BaseService的代码:

public abstract class BaseMessageService<T extends BaseMessage> {

    /**
     * 消息ID
     */
    private int messageId;

    /**
     * 字符串消息ID
     */
    private String strMessageId;

    /**
     * 消息处理器描述
     */
    private String desc;

    /**
     * 获取终端信息
     *
     * @param ctx socket上下文
     * @return 终端信息
     */
    public TerminalProto getTerminalInfo(ChannelHandlerContext ctx) {
        return SessionUtil.getTerminalInfo(ctx);
    }

    /**
     * 检查消息体长度
     *
     * @param msg        消息
     * @param msgBodyLen 消息体长度
     * @throws ApplicationException 应用异常
     */
    public void checkMessageBodyLen(T msg, int msgBodyLen) throws ApplicationException {
        byte[] msgBody = msg.getMsgBodyArr();
        if (msgBody.length < msgBodyLen) {
            throw new ApplicationException("消息体长度不对,不能小于" + msgBodyLen);
        }
    }

    /**
     * 处理消息
     *
     * @param ctx        socket上下文
     * @param msg        消息
     * @param msgBodyBuf 消息体
     * @return 返回结果
     * @throws Exception 异常
     */
    public abstract Object process(ChannelHandlerContext ctx, T msg, ByteBuf msgBodyBuf) throws Exception;
}

这里贴出0x0200位置汇报的服务类:

public class Message0200Service extends BaseMessageService<Jt808Message> {

    @Autowired
    private RabbitMessageSender messageSender;

    @Override
    public Object process(ChannelHandlerContext ctx, Jt808Message msg, ByteBuf msgBodyBuf) throws Exception {
        //检查消息体长度
        checkMessageBodyLen(msg, 28);
        //通用应答
        Jt808PacketUtil.reply8001(ctx, msg);
        //解析位置信息和附加信息
        LocationProto location = LocationParser.parse(msg, msgBodyBuf);
        //发送到MQ
        messageSender.sendLocation(getTerminalInfo(ctx), location);

        msg.putMessageBodyItem("位置", location);
        return null;
    }
}

几行代码完成了消息应答、位置解析、位置发送到MQ。
以下是拍照的0x0801多媒体数据上传处理:

public class Message0801Service extends BaseMessageService<Jt808Message> {

    @Autowired
    private RabbitMessageSender messageSender;

    @Override
    public Object process(ChannelHandlerContext ctx, Jt808Message msg, ByteBuf msgBodyBuf) throws Exception {
        //多媒体ID
        long mediaId = msgBodyBuf.readUnsignedInt();
        //多媒体类型
        int mediaType = msgBodyBuf.readByte();
        //多媒体格式编码
        int mediaFormatCode = msgBodyBuf.readByte();
        //事件项编码
        int eventItemCode = msgBodyBuf.readByte();
        //通道ID
        int channelId = msgBodyBuf.readByte();

        //老协议不带位置数据(28 bytes),图片数据以0xFFD8开头
        LocationProto location = null;
        if (mediaFormatCode != 0 || msgBodyBuf.getUnsignedShort(0) != 0xFFD8) {
            location = LocationParser.parseLocation(msgBodyBuf);
        }

        //多媒体数据
        byte[] mediaData = new byte[msgBodyBuf.readableBytes()];
        msgBodyBuf.readBytes(mediaData);

        MediaFileProto mediaFile = new MediaFileProto();
        mediaFile.setMediaId(mediaId);
        mediaFile.setMediaType(mediaType);
        mediaFile.setMediaFormatCode(mediaFormatCode);
        mediaFile.setEventItemCode(eventItemCode);
        mediaFile.setChannelId(channelId);
        mediaFile.setLocation(location);
        mediaFile.setMediaData(mediaData);
        mediaFile.setTerminalInfo(getTerminalInfo(ctx));

        //发送到MQ
        messageSender.sendMediaFile(mediaFile);
        return mediaFile;
    }
}

其他协议的处理也是采用这个方法,比如JT809的从链路连接保持请求消息处理:

public class DownLinkTestReqProcessor extends BaseMessageService<Jt809Message> {

    @Autowired
    private MessageSendService messageSendService;

    @Autowired
    private DownLinkTestRspSender downLinkTestRspSender;

    @Override
    public Object process(ChannelHandlerContext ctx, Jt809Message jt809Msg, ByteBuf msgBodyBuf) throws Exception {
        //发送JT809日志到MQ
        Jt809Status jt809Status = Jt809Manager.getStatusAttr(ctx);
        Jt809ConfigDTO jt809Config = jt809Status.getJt809Config();
        messageSendService.publishJt809Log(jt809Config, jt809Msg);

        //发送从链路连接保持应答消息
        downLinkTestRspSender.send(ctx, jt809Status);
        return null;
    }
}

至此,整个网关的工作量就全部集中在每种消息服务的开发了。内存溢出、资源未释放、异常等问题全部都得到了统一的处理,可以放心大胆的开发业务逻辑。有了通用处理模板,开发效率大幅提升,其他私有协议网关的开发也变得异常简单。如果MQ消息传输格式定义好的话,整个网关程序2-3天就能全部开发完。而且无任何网络编程经验的人员都能很快接手。


在这里插入图片描述

整个工程除了业务service,其他类只有十来个:


在这里插入图片描述
3.整合SpringBoot

TcpServer需要另外开启线程启动的,不要占用阻塞SpringBoot的主线程。
配置多环境开发:


在这里插入图片描述

生产环境的配置:application-prod.yml。

gnss:
    jt808:
      tcpPort: 6608
    middleware-ip: 127.0.0.1
    threadPool:
      size: 10
    message:
      converter: proto

spring:
  redis:
    host: ${gnss.middleware-ip}
    port: 6379
    password: gps-pro@cn

  rabbitmq:
    host: ${gnss.middleware-ip}
    port: 5672
    username: guest
    password: guest

支持2种MQ序列化方式:ProtoBuf和JSON,可以在配置文件切换。
ProtoBuf性能高,安全性高,传输量少,Web后台是JAVA开发的话可以选用这种方式,虽然也跨语言但要复杂一些。
JSON性能差,安全性差,传输量大,优点是跨语言兼容性好。如果后台是非JAVA的可以选择这种方式。
如图,发送一条相同的位置到MQ,ProtoBuf需要152字节,JSON需要675字节,传输量差了5倍。


在这里插入图片描述
在这里插入图片描述

启动后会自动加载消息处理器:


在这里插入图片描述
终端连接服务器并且发送位置,然后断开连接时,日志打印:
在这里插入图片描述
上一篇 下一篇

猜你喜欢

热点阅读