Netty责任链模式的数据流处理策略
引言
前段时间完成了一个项目,需要开发一个服务器,其中一个功能要求用TCP/IP协议与下位机(使用STM32作为主控芯片)以特定的帧格式进行通信,这里在实验室某位技术实力过硬的师兄带领下,使用Spring Boot+Netty相结合的方式完成一个稳定、高效率的服务器,其中还包括多线程、缓存、高并发等技术。笔者对其中的数据流处理方法进行了学习,现自己对其做一个总结。
服务器总体框架
服务器总体框架如下图所示:
![](https://img.haomeiwen.com/i14305916/19808c050ca5c1e1.png)
这里的下位机硬件设备其实并不是直接使用以太网与服务器相连,它实际使用的是CAN总线,再使用CAN转以太网模块才与服务器相连,因为下位机的通信与服务器无关,因此框架中的这个部分做了简化,可以直接认为通过以太网进行TCP通信。
数据帧格式
这里的数据帧是下位机设备整理各种信息之后,按照自定义的帧格式组合成数据帧发送给服务器,这里的帧格式如下:
![](https://img.haomeiwen.com/i14305916/d1b27ca8498eb40d.png)
这里将这样一条数据帧格式叫做一条数据主帧,它由帧头和主帧数据体组成。帧头中的内容更具项目需求可以分为帧识别位、业务1、主帧类型、业务2、备用位、主帧数据体长度6个部分,其中每个部分所占字节长度为固定值。这里的数据流处理策略就是要从Netty服务器的接收缓冲区中提取这些有效信息,组合成这个格式的帧的类实例给业务层进行另一步处理。主帧数据体由子帧构成,子帧的格式如下:
![](https://img.haomeiwen.com/i14305916/2a5227f8b3eea461.png)
一条子帧由子帧头和数据体组成,子帧头有功能位(占1字节)和子帧长度(占2字节,代表数据体长度)。根据不同的业务功能可以定义功能位。主帧类型和子帧功能位共同决定了此主帧代表的业务功能。
责任链模式
关于责任链模式的解释和用法网上资源很多并且也很详细,这里就不再做介绍了,不过笔者还是最喜欢《大话设计模式》中用加薪这个情景来进行总结,通俗易懂,感兴趣的朋友可以去找这本书研读下。
Netty中的责任链模式
在《Netty实战》第六章中讲到,ChannelHandlerContext使得ChannelHandler能够通知其所属ChannelPipeline的下一个ChannelHandler,这就将它们连成了一条链,事件沿这条链传递,这就是责任链模式最好的体现,借用《Netty实战》中的图6-3:
![](https://img.haomeiwen.com/i14305916/1b1a383f9ab3f2c3.png)
其中可以使用ChannelInitializer,它提供一个特殊ChannelInboundHandlerAdapter 子类,它定义的
protected abstract void initChannel(C ch) throws Exception;
方法是一种将多个ChannelHandler添加到一个ChannelPipeline中的简便方法。
数据流处理策略
这里将数据流解析模块分为三级:
![](https://img.haomeiwen.com/i14305916/b1c17cb60d84e4e7.png)
第一级数据流解析成帧的handler,其代码如下:
/**
* 帧识别 - 入站处理器 分割出并向后继处理器传递一个完整的主帧数据体及主帧信息 bean
*/
@Service
@ChannelHandler.Sharable
public class FrameRecognitionInBoundHandler extends SimpleChannelInboundHandler<ByteBuf> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
while (true) {
if (byteBuf.readableBytes() < FrameSetting.FRAME_HEAD_LENGTH) {
return;
}
if (byteBuf.readByte() != FrameSetting.MAJOR_FRAME_HEAD_1
|| byteBuf.readByte() != FrameSetting.MAJOR_FRAME_HEAD_2) {
logger.warn("数据接收异常「帧头不匹配」");
return;
}
int groupId = byteBuf.readByte() & 0xFF;//提取业务1,占1字节
int msgId = byteBuf.readByte() & 0xFF;//提取主帧类型,占1字节
int deviceId = byteBuf.readByte() & 0xFF;//提取业务2,占1字节
int backupMsg = byteBuf.readByte() & 0xFF;//提取备用位,占1字节
int dataLength = byteBuf.readShort() & 0xFFFF;//提取数据体长度,占2字节
FrameMajorHeader headMsg = new FrameMajorHeader(msgId, groupId, deviceId, dataLength, backupMsg);
ByteBuf subBuf = ctx.alloc().buffer(dataLength);
byteBuf.readBytes(subBuf, dataLength);
ctx.fireChannelRead(new FrameMajor(headMsg, subBuf));
}
}
}
其中FrameMajorHeader是主帧的帧头类,FrameMajor是完整的主帧类。ctx.fireChannelRead(new FrameMajor(headMsg, subBuf));
将此主帧类的实例传递给第二级。
第二级处理handler,代码:
/**
* 从第一级中获得了一个主帧实例,第二级获取子帧,处理并交给第三级
*/
@Service
@ChannelHandler.Sharable
public class ParsedMessageInBoundHandler extends SimpleChannelInboundHandler<FrameMajor> {
/** 第三级处理数据流模块 */
private final TcpPresenter server;
@Autowired
public ParsedMessageInBoundHandler(TcpPresenter server) {
this.server = server;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FrameMajor msg) throws Exception {
FrameMajorHeader head = msg.getHead();
ByteBuf byteBuf = msg.getData();
while (byteBuf.readableBytes() >= FrameSetting.SUB_FRAME_HEAD_LENGTH) {
int subMsgId = byteBuf.readByte() & 0xFF;
byte[] data = new byte[byteBuf.readShort()];
byteBuf.readBytes(data);
server.decodeAndHuntMessage(head, subMsgId, data, ctx.channel());
}
}
}
第三级处理代码:
/**
* 「内部使用」将 TCP 帧解码并转换为消息对象,而后传递至 Server 该方法可保证解析出的消息对象的设备号正确
*
* @param head TCP 帧头
* @param subMsgId TCP 子帧功能位
* @param data TCP 子帧数据体
* @param channel 该消息帧的容器 Channel
*/
public void decodeAndHuntMessage(FrameMajorHeader head, int subMsgId, byte[] data, Channel channel) {
BaseMsg msg = msgProcessor.decode(head, subMsgId, data);
if (msg == null) {
logger.warn("帧解析出错");
return;
}
if (msg.getGroupId() > DeviceSetting.MAX_GROUP_ID && msg.getDeviceId() > DeviceSetting.MAX_DEVICE_ID) {
logger.warn("设备号出错「GroupId:" + msg.getGroupId() + "; DeviceId:" + msg.getDeviceId() + "」");
return;
}
switch (msg.getJointMsgFlag()) {
case JointMsgType.replyWorkStatus:
case JointMsgType.replyChargeStatus:
server.huntDeviceStatusMsg((MsgReplyDeviceStatus) msg);
break;
case JointMsgType.replyHeartBeat:
// 收到心跳包,设备组已激活
int groupId = msg.getGroupId();
//设定设备时间校对计划任务
ScheduledFuture future = channel.eventLoop().scheduleAtFixedRate(
() -> sendMessageToTcp(MsgCodecTimestamp.create(groupId)), 5, DeviceSetting.TIMESYNC_INTERVAL, TimeUnit.SECONDS);
tcpRepository.accessChannelSuccessful(msg, channel, future);
break;
default:
if (msg instanceof MsgReplyNormal) {
tcpRepository.touchNormalReplyMsg((MsgReplyNormal) msg);
server.touchNormalReplyMsg((MsgReplyNormal) msg);
} else {
server.huntMessage(msg);
}
}
}
到这里数据流的处理基本上就完成了,再往上的功能牵涉到缓存、多线程等技术,这些暂时还不会,以后会了再写吧。
后记
其实这是笔者第一次写这种文章,写完之后发现其实这并没有什么技术含量,感觉像是在写论文,而且自己在写代码的时候知道该这样写,但是写文章想表达出来的时候却发现不知道用什么词汇和方式来表达自己的想法,所以很多地方干脆就不写文字性的东西了。不过本人深刻知道学习过程中写文章总结的必要性,今天算是走出了第一步吧,以后一定努力,多花时间多钻研。Thanks!!!