Vert.x + Protobuf二进制协议解析
这一期介绍如何解析二进制私有协议。
先说几句题外话,就是绝大多数情况下,可能根本用不着使用私有二进制协议,除非你的业务对性能极其敏感,否则HTTP足矣。
协议
我们的协议非常简单,先是一个4字节的整数表示数据长度,然后紧接着就是protobuf序列化后的字节数组。proto定义如下:
syntax = "proto2";
package cn.fh.vertx.demo.proto;
option java_multiple_files = true;
message Message {
required int32 type = 1;
required string content = 2;
}
在实际业务场景中,消息格式的定义是有讲究的,这里不深究,重点在于vert.x解析。
RecordParser
Vert.x中只有一个二进制协议解析辅助类,即RecordParser
,可以很好的解决粘包/拆包问题。它有两种工作模式,一是delimited mode
, 即通过固定分隔符分隔数据包,这种用的其实比较少;二是fixed size mode
,即固定数据长度模式。诈一看可能很多人会有疑虑,多数协议都是可变长度的啊,这两种模式看起来都不能满足要求。其实模式二支持在处理的过程中随时动态的改变size
值,这样就可以间接完成对变长协议的解析。
首先,RecordParser
有两类Builder
方法,RecordParser.newDelimited()
和RecordParser.newFixed(4)
,分别对应上面的模式一和模式二。构造完成以后,跟其它vert.x的方法一样,我们需要定义一个Handler, RecordParser
每分割出一段字节数组都会调用一次Handler,业务逻辑处理就在此Handler中进行。对于上面"长度 + 数据"的协议,我们可以首先构造一个以4字节为单位的RecordParser
,在收到长度数据后,再动态将解析器修改成指定长度状态,从而完成对数据部分的分割。代码示例如下:
// 先以长度4构造对象
RecordParser parser = RecordParser.newFixed(4);
// 设置处理器
parser.setOutput(new Handler<Buffer>() {
// 表示当前数据长度
int size = -1;
@Override
public void handle(Buffer buffer) {
// -1表示当前还没有长度信息,需要从收到的数据中取出长度
if (-1 == size) {
// 取出长度
size = buffer.getInt(0);
// 动态修改长度
parser.fixedSizeMode(size);
} else {
// 如果size != -1, 说明已经接受到长度信息了,接下来的数据就是protobuf可识别的字节数组
byte[] buf = buffer.getBytes();
Message msg = null;
try {
msg = Message.parseFrom(buf);
} catch (InvalidProtocolBufferException e) {
System.out.println(e.getMessage());
socket.close();
return;
}
System.out.println(msg);
// 处理完后要将长度改回4
parser.fixedSizeMode(4);
// 重置size变量
size = -1;
}
}
});
虽然用起来比较别扭,但这的确是标准使用方法。
如果表示长度的数据不是从0开始的,比如0 ~ 3为消息类型,4 ~ 7才表示body的长度,那么可以在构造解析器时先将长度设为8, 然后取后4字节做为新的长度即可。
其实这个类所做的事正是Netty里LengthFieldBasedFrameDecoder
的功能,希望Vert.x以后能直接提供好类似的处理器,不要让用户再手动取长度了,毕竟低层的Netty都支持,你更高级的封装怎么可以没有呢。
这个RecordParser
怎么用呢?看一下完整的代码吧:
Vertx vertx = Vertx.vertx();
// 创建TCP Server
NetServer server = vertx.createNetServer();
// 设置Handler
server.connectHandler(socket -> {
// 构造parser
RecordParser parser = RecordParser.newFixed(4);
parser.setOutput(new Handler<Buffer>() {
int size = -1;
@Override
public void handle(Buffer buffer) {
if (-1 == size) {
size = buffer.getInt(0);
parser.fixedSizeMode(size);
} else {
byte[] buf = buffer.getBytes();
Message msg = null;
try {
msg = Message.parseFrom(buf);
} catch (InvalidProtocolBufferException e) {
System.out.println(e.getMessage());
socket.close();
return;
}
parser.fixedSizeMode(4);
size = -1;
}
}
});
socket.handler(parser);
});
// 监听
server.listen(8008, "localhost", res -> {
if (res.succeeded()) {
System.out.println("tcp server is listening at 8008");
} else {
System.out.println(res.cause());
}
});
写起来会有一点点奇怪,适应就好。