Netty整合Protobuf编解码,并解决半包问题
Google的工具在平时的开发中也用过,但是听到Protobuf编解码框架时,还是忍不住去用一用,想知道它为什么在业界这么流行,而且是非常流行。
在使用Google Protobuf编解码框架时,还需要搭建环境,这一点有点麻烦,想着后面可能带来的便利,这一步忍了;本文有两部分,第一部分Protobuf编解码java文件的生成;第二部分Protobuf编解码示例。
第一部分,Protobuf编解码java文件的生成;
首先,下载生成代码的压缩包,打开https://github.com/protocolbuffers/protobuf/releases页面,本次使用的压缩包是protoc-3.10.0-win32;
第二步,对下载后的压缩包解压,根据需要编写proto文件,文件的后缀为.proto;
syntax = "proto3";
option java_outer_classname="ProtoMsgProto";
message ProtoMsg{
int32 subReqId = 1;
string desc = 2;
}
语法简要说明:
- syntax代表版本号,默认proto2,这里使用的是3,所以必须是proto3;
- package为生成的java加包名,防止message类冲突;
- option java_outer_classname是要输出的java文件的类名,不可与message后的名字重复;
- option java_package为每个message类加包名;
- option java_multiple_files默认false,表示生成java时的打包方式;
false表示所有的消息都作为内部类,打包到一个外部类中;
true代表一个消息打包成一个java类;
把option java_multiple_files=true;加入到proto文件里,再次使用命令生成,可以看到生成的java文件有三个,也就是一个message对应三个java类文件。
java_multiple_files为true生成的文件
第三步,使用管理员的身份打开cmd窗口,进入解压的bin文件夹下,输入命令进行java文件的生成,
C:\protoc-3.10.0-win32\bin>protoc.exe ProtoMsg.proto --java_out=./
protoc生成的java文件
在使用protoc.exe生成java文件的时候,我们使用的是3.10.0版本的工具,所以在项目里的pom中引入文件的时候,版本也需要与生成文件的相匹配。
使用protoc.exe生成java文件的方法有多种,这里使用的是最简单便捷的方法,也不需要配置什么环境变量,能省一步是一步。还有一种方法,就是在maven里增加插件与对应的配置来生成java文件,个人感觉比较麻烦,还不如使用cmd命令行来的快。
接下来,进行服务端客户端代码的开发,由于protobuf仅仅支持解码编码,并没有处理粘包/半包的问题,所以还需要借助其他的工具类进行粘包/半包的处理。
第二部分,Protobuf编解码示例;
首先,pom文件的引入;
<!-- netty架包依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!-- protobuf架包 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.10.0</version>
</dependency>
第二,服务端的编码,ProtoMsgProto.java文件是通过protoc.exe生成的文件,这里就忽略不贴代码了;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* netty整合protobuf,服务端模拟
* @author 程就人生
* @date 2019年10月22日
*/
public class ProtobufServer {
public void bind(int port){
//开启线程组
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//解码时,解决粘包/半包的问题
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
//对接收到的信息,使用protobuf进行解码
ch.pipeline().addLast(new ProtobufDecoder(ProtoMsgProto.ProtoMsg.getDefaultInstance()));
//编码时,解决粘包/半包的问题
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
//对于要发送的信息,使用protobuf进行编码
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new ProtobufServerHandler());
}
});
//绑定端口,同步等待成功
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//等待服务器端监听端口关闭
channelFuture.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally{
//优雅退出,并释放线程
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
public static void main(String[] argo){
//启动服务端
ProtobufServer protobufServer = new ProtobufServer();
protobufServer.bind(8080);
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ProtobufServerHandler extends ChannelInboundHandlerAdapter{
private static Logger log = LoggerFactory.getLogger(ProtobufServerHandler.class);
/**
* 接收到消息时的处理
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ProtoMsgProto.ProtoMsg req = (ProtoMsgProto.ProtoMsg) msg;
if(req.getDesc().contains("程就人生")){
log.info("接收到的消息:" + req.getDesc());
ctx.writeAndFlush(resp(req.getSubReqId()));
}
}
/**
* 构造返回的消息
* @param subReqID
* @return
*
*/
private ProtoMsgProto.ProtoMsg resp(int subReqID){
ProtoMsgProto.ProtoMsg.Builder builder = ProtoMsgProto.ProtoMsg.newBuilder();
builder.setSubReqId(subReqID);
builder.setDesc("服务端已经顺利接收到客户端发送的消息,消息id:" + subReqID);
return builder.build();
}
/**
* 异常时关闭
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
//发生异常,关闭连接
ctx.close();
}
}
第三步,客户端的编码;
package com.example.netty.client3;
import com.example.netty.server3.ProtoMsgProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
/**
* Netty整合protobuf客户端模拟
* @author 程就人生
* @date 2019年10月22日
*/
public class ProtobufClient {
public void connect(int port, String host){
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//解码时,解决粘包/半包的问题
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
//对接收到的信息,使用protobuf进行解码
ch.pipeline().addLast(new ProtobufDecoder(ProtoMsgProto.ProtoMsg.getDefaultInstance()));
//编码时,解决粘包/半包的问题
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
//对于要发送的信息,使用protobuf进行编码
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new ProtobufClientHandler());
}
});
//发起异步连接操作
ChannelFuture channelFuture = bootstrap.connect(host,port).sync();
//等待客户端链路关闭
channelFuture.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally{
//优雅退出,释放线程组
group.shutdownGracefully();
}
}
public static void main(String[] argo){
//启动客户端,连接服务端
ProtobufClient protobufClient = new ProtobufClient();
protobufClient.connect(8080, "127.0.0.1");
}
}
package com.example.netty.client3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.netty.server3.ProtoMsgProto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ProtobufClientHandler extends ChannelInboundHandlerAdapter{
private static Logger log = LoggerFactory.getLogger(ProtobufClientHandler.class);
/**
*
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0;i<100;i++){
ctx.writeAndFlush(req(i));
}
}
/**
* 构造返回的消息
* @param subReqID
* @return
*
*/
private ProtoMsgProto.ProtoMsg req(int subReqID){
ProtoMsgProto.ProtoMsg.Builder builder = ProtoMsgProto.ProtoMsg.newBuilder();
builder.setSubReqId(subReqID);
builder.setDesc("程就人生"+subReqID);
return builder.build();
}
/**
* 接收到消息时的处理
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ProtoMsgProto.ProtoMsg req = (ProtoMsgProto.ProtoMsg) msg;
log.info("接收到的消息:" + req.getDesc());
}
/**
* 异常时关闭
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
最后,测试;先启动服务端,再启动客户端,查看控制台的输出;
客户端测试结果
protobuf之所以这么流行,原因有很多:第一,它是跨语言的,除了java还支持python、C++等语言;第二,编码后是以二进制的形式进行传输的,传输速度快。但是,它只负责编解码,粘包/半包的问题,还需通过其他的工具类进行解决。
除了使用上文的ProtobufVarint32FrameDecoder类进行半包的处理,还可以使用LengthFieldBasedFrameDecoder工具类来解决;LengthFieldBasedFrameDecoder工具类的使用,在Netty整合MessagePack、LengthFieldBasedFrameDecoder解决粘包/拆包问题中有说明;除此之外,还可以继承ByteToMessageDecoder类,自己处理半包消息的问题。
Netty整合Protobuf框架进行编解码的简单操作,就这样告一段落了,更多的还需要继续看源码,加油吧。