Netty自定义TCP协议通讯实例
Netty自定义TCP协议通讯实例
网络编程的基本模型就是客户机到服务器模型,简单来说就是进程与进程之间的通讯
两个进程之间必须要有一个提供固定的位置,让另一个进程知道这个位置并建立联系,这样两个进程就可以相互通讯
其中提供固定位置的叫服务端,连接固定位置的叫客户端
实际上因为是双向通讯,服务端客户端并没有太大的界限
今天写一个实例,使用netty4来实现一个自定义的tcp报文解析
先说说自定义tcp协议的意义
文本协议:
对协议头的 size 没特别要求对解析速度没特别要求需要协议轻松
支持变长 header 且保持较友好的扩展性(如像 HTTP 一样可以随意增加 header)
可升级协议且不增加协议本身复杂度
要求容易调试
二进制协议:
要求协议头 size 要足够小
要求解析速度要足够快
协议头相对较固定,变动的可能性较低,但仍然需要留一些扩展位
比如即时通讯就很适合用二进制协议,因为即时通讯需要频繁收发消息,对于传输和解析的速度都有比较高的要求
另外一个就是二进制协议没有协议文档几乎不可读
进入代码阶段
本文代码链接:后端代码
一、服务端
1.新建工程
这次不需要boot了,新建一个空的maven项目引入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.53.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<!-- log -->
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.15</version>
</dependency>
其中logback是给lombok的@Slf4j的实现,commons-codec用于获取md5
2.实现一个服务端
netty的基本使用就是建立一个worker和一个boss EventLoopGroup,然后配置编码解码器以及自定义handler,事件监听等,使得开发者的核心可以转移到处理数据。
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true).handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
// 自定义解码器
new DecodeHandler(),
new StringEncoder(CharsetUtil.UTF_8),
// 自定义的文件处理handler
new TcpServerHandler());
}
});
try {
int port = 8888;
String host = "127.0.0.1";
ChannelFuture future = bootstrap.bind(host, port).sync();
log.info("启动服务器 端口[{}]", port);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("%s", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
除了常规配置之外,我们要做的就是实现一个自定义的解码器DecodeHandler以及消息处理器TcpServerHandler
3.协议制定
开始解码前我们需要有一个协议,才能知道该怎么解
业务的报文处理
解析
定义协议 PS:x表示占几个字节
[[版本x2][报文总长度x2][消息类型x1][文件标识x16][分片序号x2][文件总大小x8][分片文件长度x2][分片文件数据]]
消息类型 0x01 上传文件 0x02 合并文件
示例报文(16进制):
0055041d015b886f17f511ed7908128e68520b8aa300000000000001159cf50400504b03041400000000003b0a3948000000000000000000000000180000007068616e746f6d6a732d322e312e312d77696e646f77732f504b0304140000000000670a39480000000000000000000000001c0000007068616e746f6d6a732d322e312e312d77696e646f77732f62696e2f504b0304140000000800bd0a394821ec01d2cbbf140100a01b01290000007068616e746f6d6a732d322e312e312d77696e646f77732f62696e2f7068616e746f6d6a732e657865ec9a8b3fd3ff1ec777373336b5106313b94d482a22e6325b85264285ad92cba2167329b731b799cbeae474fd955427bf2ebf748e44c45c6a2315e9942256a9c8a52531b9ec7cbf9ddbe35c1e8fd31f70be0f1eefafd79efb787edf9feff7cdc383ef4e31040e814010c0a74a0581d442fe7a5021fffba04221102dd25d2d48b5fa23e35aa8cf23e3c0e898043237fe6054fcee38f2dedd070e1ce491f7ec23c7271e20c71c207b6d0d20c71d8cd867a3a98931fddb1aebdb1ef4245caee4fffd73edd4267e3250c79107f987809a3193cc27ffedb5f81f5f6fe2b32e8195c15f0bd4df7646f27997c19af2e375ad7e063fe30a58937fbcff81b30f3ff5c7eb7bf9d13fd6f9951f7e05cc197cc48fd737f1e3ae8395f7b7f513800a726e4005f3a41f755bccde68d0e1df7bc0a441203e503844e07739e8ef991c02836a40b11088d8000a210341d504944bc3c021920928840954b0bb7820874120a81feff867857081d7f17f3d8543a85900f8e3e39ff5af85fa8b0216f20e02896561e062601d70dd58dc7feed1fc6e0c1c0bf9df47281d0367407efed8ce0cb103ca8f6bfbabd78f9be85f0e3ed00a805b0d9cfee8c58feb2f00eabf715480b3894f88df0b7e41feebb54008402d02d87fe52490ff1f3f7bacb1715a0dee93b1a63a1a1588db19d4fccaf6eb7d730c7c7f2ed0573b185c0702d963fc7a56150db579201e865a5ec7ee68ed24950f0e86c75771542b6c77b7d5c1be3db2860a069b7c1bcf52a56437eb7b29111780fdf157f52da82ec0be86f0d7b4318bbd4cd11117cd39c8ac61c8a275ace0ee8b86f136b4d00e518c92e2f7fda95aa5520d6c80af7511598bd0b9ed3c5dc1ac8a4712b4ab14b86634ae4cd27560d74c8e2451db890383310317ba8862eceb17b13138b65eac4fbf50f3c28e59b27334119f00756bb177350fb6db88c0464477b8cfb02ec2be94eb71ec8d535c89a38d9ff7ae264a911832cc6c8dc4199a3a6ac5ba8cbabed63472bb458c615f12c99a4371af7e106e0da3d80de2c2a6590e656be19b22e4b68eac59f1de52172a8731ec62ea9d93217b2f4678315c901c8ca7565dda3e071fe65d4ae34420c7d69d1ae2835a495d1d6253469adb175a8655e3e0b8ec8059b1674434c7923fd39b3c4644536259dc52a2ba71faf5b95e5e560a47
推荐使用NetAssist调试
4.解码器
解码器的意义在于先一步处理得到报文
TCP是一个“流”协议,所谓流,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
所以我们需要在解码器对每一包进行校验,小于校验和的包需要粘包,大于校验和的包需要拆包,这样才能正确解析
至于怎么解决这个可以专门分析,网上也有很多方案,这里不谈
除了粘包拆包问题,我们还可能有对原始报文进行解密的操作,为了保证下游的handler拿到的bytebuf是与协议文档一样的格式,我们需要先解密,并把业务部分报文传达下去,甚至可以在定义了一定格式的报文后,直接在decoder中把二进制数据变成对象或者等类型,下游再解析。
public class DecodeHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
// 取到tag与报文长度后再处理,各2字节
if (byteBuf.readableBytes() < 4) {
return;
}
// 记录当前ByteBuf的读指针位置,以便下面取报文长度字节
// pos是一个完整报文的开始位置,报文整体会在ByteBuf中移动,类似内存管理,所以基于字节的判断报文长度等等,都是基于pos,否则可以在byteBuf.readBytes()之后加,byteBuf.discardReadBytes();整理ByteBuf,使pos回到0开始位置
int pos = byteBuf.readerIndex();
//不同协议头建议使用策略模式处理
short protocol = byteBuf.readShort();//前面两个字节是协议版本 0x55
int msgLen = byteBuf.readShort();//第三第四个字节表示报文长度
// 收到的报文长度不足一个完整的报文,继续接收
if (byteBuf.readableBytes() < msgLen) {
return;
}
// 提出完整报文(readBytes读到msg中),放到list给下一个Handler处理
if (msgLen > 0) {
out.add(byteBuf.readBytes(msgLen));
}
}
}
5.自定义handler
从上面的DecodeHandler可以看到,前4个字节用来做协议版本和报文长度,已经read了,丢到out中给到下游的是这四个字节以外的数据
// 前面4个字节已经在decode阶段跳过了
byte msgType = byteBuf.readByte();// 读取一个字节为消息类型
switch (msgType) {
case 0x01:
upload(byteBuf);
break;
case 0x02:
merge(byteBuf);
break;
}
所以在handler中可以直接读取一个字节判断当前的报文类型再分发
然后根据已有的协议进行解析
// 业务的报文处理
// 解析
// 定义协议 PS:x表示占几个字节
// [[版本x2][报文总长度x2][消息类型x1][文件标识x16][分片序号x2][文件总大小x8][分片文件长度x2][分片文件数据]]
// 消息类型 0x01 上传文件 0x02 合并文件
byte[] filenameBytes = new byte[16];
buf.readBytes(filenameBytes);// 读取16字节是文件名
String filename = ByteUtils.bytesToHex(filenameBytes);// 我这里是md5不会有中文没有乱码的问题
short fileseq = buf.readShort();// 读取2字节是分片序号
long filesize = buf.readLong();// 读取8字节是文件总大小
short chunksize = buf.readShort();// 读取2字节是分片文件长度
byte[] dataBytes = new byte[chunksize];
buf.readBytes(dataBytes);
已经有大佬写好框架,只需要做解析的同学,做这个不算太难,稍微有些枯燥,需要认真细心,不要解错
二、客户端
相对服务端而言,客户端需要搜集本次通讯的报文数据,无论是protobuf也好byte数据也好,都比服务端要复杂一些
由于我们的netty实现了一个tcp服务端,所以可以客户端只要使用tcp协议就可以通讯,我们使用java提供的socketapi即可
核心代码
/**
* TODO 本来发送是不需要返回值的,但是这里为了使用latch来计算发了多少,通过返回true来表示发送成功一个
*
* @param msg
* @return
*/
synchronized public boolean sendMsgBySocket(ByteBuffer msg) {
try {
// Socket socket = getSocketClient();
//这段代码仅仅调试用,尽量一个不要开多个socket跟服务端通信,应该使用同一个链路,多次发送消息,服务端解决粘包和拆包的问题
// 要连接的服务端IP地址和端口-----------------------------------
String host = "127.0.0.1";
int port = 8888;
// 与服务端建立连接
this.socket = new Socket(host, port);
//---------------------------------------------------------
// 建立连接后获得输出流
OutputStream outputStream = socket.getOutputStream();
byte[] msgArray = msg.array();
// 报文头start-----------------------------------------------
// 55表示协议版本
short protocol = 0x55;
ByteBuffer header = ByteBuffer.allocate(4);
header.putShort(protocol);// 版本x2
// 写入本次报文总长度 固定是37 + 分片文件长度
short length = (short) (msgArray.length);
header.putShort(length);// 报文总长度x2
// 报文头end-----------------------------------------------
outputStream.write(header.array());
outputStream.write(msgArray);
outputStream.flush();
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[1024];
int read = 0;
while (true) {
// 死循环直到服务器返回 这里可以添加超时机制,防止服务器出问题后无限循环
read = inputStream.read(bytes);
if (read > 0) {
String result = new String(bytes);
log.info("receive : {}", result);
if ("ok".equals(result)) {
// 假定回复ok就是成功了
return true;
}
break;
}
}
} catch (Exception e) {
log.error("[{}]", e);
}
return false;
}
这里的含义是我们先组装好要发送的报文ByteBuffer,然后调用这个发起socket通讯
其他分享的代码
这次做的是通过tcp分片上传大文件
思想还是和上篇一样,客户端分片,通过md5获得唯一标识以免重复,每个报文包括分片序号,唯一标识,总大小分片大小还有分片数据
因为我们要规定每个分片的最大大小才能计算分片数量
所以这里可以利用FileChannel,分段读取,无需切片文件,也避免把整个文件读取到内存
FileChannel fileChannel = FileChannel.open(Paths.get(file.getAbsolutePath()),
EnumSet.of(StandardOpenOption.READ));
// 每个分片最大1024,所以只有最后一片不一定是1024,但是也能算出来
int lastSize = (int) (fileChannel.size() - ((chunk - 1) * 1024));// 总长度减去前片的和
for (int chunkNum = 0; chunkNum < chunk; chunkNum++) {
int size = chunkNum == chunk ? lastSize : chunkSize;
ByteBuffer buffer = ByteBuffer.allocate(size);
fileChannel.read(buffer);
我规定每个分片最大1024字节也就是1kb,所以每次最多读1k,最后一片可能小于等于1k,每循环一次就把报文放到线程池中等待发送
利用CountDownLatch设定一个屏障,要求发送的总数达到才关闭线程池,在服务端返回ok后latch.countDown()达到计数的效果
// 设定一个计数器
CountDownLatch latch = new CountDownLatch(chunk);
fixedThreadPool.execute(() -> {
// 将组装好的报文,在线程池中创建线程运行
client.sendMsgBySocket(msgBuffer);
// 应该要判断是否成功的。这里不判断认为都成功
latch.countDown();
});
// 所有线程没发送完全之前 一直等待
latch.await();
分片大小1k确实有点小,不过这是为了看测试效果,实际上要根据通讯网络来定义分片大小,带宽好的大一些