netty使用Google Protobuf协议优化通道的序列化
2022-10-17 本文已影响0人
天草二十六_简村人
一、Google Protobuf 协议
解决序列化和检索效率的问题。
序列化的方法有:
- 使用java原生的序列化
- xml格式
- Protocol buffers
Protocol buffers是灵活,高效,自动化的解决方案来解决这个问题。 使用Protocol buffers,您可以编写一个.proto描述您希望存储的数据结构。 Protocol buffers编译器创建一个实现自动编码和解析协议缓冲区数据的类,并使用高效的二进制格式。 生成的类为组成Protocol buffers的字段提供getter和setter。
1.1、定义文件xxxSocketMessage.proto(首字母小写)
syntax = "proto3";
package com.xxx.channel.core.common.bean.dto;
option java_package = "com.xxx.channel.core.common.bean.dto";
option java_outer_classname = "XxxSocketMessage";
enum MessageType {
// 服务端
SERVER = 0;
// 默认
DEFAULT = 1;
}
/**
* 消息数据包
*/
message Message {
// 消息来源Type
MessageType type = 1;
// 自定义Tag,用于划分消息内容
int32 tag = 2;
// 消息Action
int32 action = 3;
// 消息内容
string data = 4;
}
1.2、java类调用Protobuf的类Message
//实际使用protobuf序列化框架客户端将对象转译成字节数组,然后通过协议传输到服务器端,服务器端可以是其他的语言框架(比如说python)将
//字节对象反编译成java对象
public class ProtobuffTest {
public static void main(String[] args) throws Exception{
// 对应java_outer_classname的名称
XxxSocketMessage.Message message = XxxSocketMessage.Message.newBuilder()
.setTag(XxxSocketMessage.MessageType.SERVER_VALUE)
.setAction(ResponseCode.Notification.ONLINE_LIST)
.setData(onlineListJson).build();
//将对象转译成字节数组,序列化
byte[] message2ByteArray = message.toByteArray();
//将字节数组转译成对象,反序列化
XxxSocketMessage.Message message2 = XxxSocketMessage.Message.parseFrom(student2ByteArray);
System.out.println(message2.getType());
System.out.println(message2.getTag());
System.out.println(message2.getData());
System.out.println(message2.getAction());
}
}
1.3、pom.xml
引入相关jar包
<properties>
<netty.version>4.1.29.Final</netty.version>
<protobuf.version>3.7.1</protobuf.version>
<grpc.version>1.12.0</grpc.version>
</properties>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
</dependency>
maven插件
用于编译.proto文件
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:3.1.0:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
二、定义消息处理类Handler
ClientHandler继承SimpleChannelInboundHandler并重写channelActive方法, 在该方法中我们处理登录逻辑,最后调用方法writeAndFlush()将其发送到server端。
这里,只给出客户端的代码示例,服务端类似。
public class ClientHandler extends SimpleChannelInboundHandler<ProtobufData.Task>{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
responseLoginInfo(ctx.channel());
super.channelActive(ctx);
}
private void responseLoginInfo(Channel channel) {
UserInfo userInfo = getUserInfo();
ProtobufData.LoginPack loginPack = ProtobufData.LoginPack.newBuilder()
.setAppKey(userInfo.getAppKey())
.setUserType(userInfo.getUserType())
.setRoomId(userInfo.getRoomId())
.setUserId(userInfo.getUserId()).build();
ProtobufData.Task task = TaskPackage.login(loginPack);
channel.writeAndFlush(task);
}
}
三、客户端连接服务端的代码示例
import com.google.common.base.Joiner;
import com.google.protobuf.AbstractMessage;
import com.xxx.channel.core.common.constant.ACKType;
import com.xxx.channel.core.common.constant.CMD;
import com.xxx.channel.core.common.protobuf.ProtobufData;
import com.xxx.channel.core.common.protobuf.TaskPackage;
import com.xxx.channel.core.common.utils.UUIDUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.Set;
public class ChannelClient {
/**
* 通道ip
*/
@Value("${channelService.ip}")
private String ip;
/**
* 通道端口
*/
@Value("${channelService.port}")
private int port;
/**
* 连接通道
*/
private Channel channel;
private static final Logger logger = LoggerFactory.getLogger(ChannelClient.class);
/**
* init
*/
private void init() {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
// 下面四行,和Proto协议有关,主要是编解码。
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(ProtobufData.Task.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new IdleStateHandler(0, 3, 0))
// 消息处理类,见上一步的定义
.addLast(new ClientHandler());
}
});
ChannelFuture future;
try {
future = bootstrap.connect(ip, port).sync();
channel = future.channel();
} catch (InterruptedException e) {
logger.error("connect channel-service-server error", e);
}
}
/**
* 每隔3秒尝试重连
*/
@Scheduled(initialDelay = 3000, fixedRate = 10000)
public void reconnect() {
if (!isConnected()) {
init();
logger.info("channel_service_client start connect to:{}", channel.remoteAddress());
}
}
/**
* 获取指定连接
*
* @return
*/
public Channel getChannel() {
return channel;
}
/**
* 连接是否正常
*
* @return boolean
*/
private boolean isConnected() {
return channel != null && channel.isActive();
}
}
四、客户端的对接
4.1、处理消息
实现接口IBusinessHandler的接收和响应报文,在ClientHandler类中重写方法channelRead0()。
private IBusinessHandler businessHandler;
@Override
protected void channelRead0(ChannelHandlerContext ctx, ProtobufData.Task task) {
//业务处理
switch (task.getPackType()) {
// 接收消息
case MESSAGE:
businessHandler.handleMessage(task.getMessagePack());
boolean enableACKResponse = task.getMessagePack().getAckType() > 0;
responseACK(task.getMessagePack().getMessageId(), ctx.channel(), enableACKResponse);
break;
case ACK:
break;
case HEARTBEAT:
break;
// 响应消息
case RESPONSE:
ProtobufData.ResponsePack responsePack = task.getResponsePack();
businessHandler.handleResponse(responsePack);
responseACK(responsePack.getMessageId(), ctx.channel(), false);
break;
default:
break;
}
}
PackType的枚举类见下:
enum PackType {
//登录
Login = 0;
//数据传输
MESSAGE = 1;
//退出
EXIT = 2;
//ack
ACK = 3;
//heartBeat
HEARTBEAT = 4;
//Response pack 响应体
RESPONSE = 5;
}
4.2、IBusinessHandler的实现
import com.google.protobuf.InvalidProtocolBufferException;
import com.xxx.channel.core.common.bean.dto.XxxSocketMessage;
import com.xxx.channel.core.common.constant.ResponseCode;
import com.xxx.channel.core.common.netty.handle.IBusinessHandler;
import com.xxx.channel.core.common.protobuf.ProtobufData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class BusinessHandler implements IBusinessHandler {
private final Logger log = LoggerFactory.getLogger(BusinessHandler.class);
@Override
public void handleResponse(ProtobufData.ResponsePack responsePack) {
short code = (short) responsePack.getCode();
switch (code) {
//房主上线
case ResponseCode.Notification.OWNER_ONLINE:
log.info("online response pack:{}", responsePack);
break;
//房主下线
case ResponseCode.Notification.OWNER_OFFLINE:
log.info("offline response pack:{}", responsePack);
break;
default:
break;
}
}
@Override
public void handleMessage(ProtobufData.MessagePack messagePack) {
try {
XxxSocketMessage.Message message = XxxSocketMessage.Message.parseFrom(messagePack.getDataBytes());
// 调用ChannelClient中的方法,实现消息的上行和下行。
tdService.handlerMessages(message.getData(), messagePack.getFrom());
} catch (InvalidProtocolBufferException e) {
log.error("数据解析异常 e : {}", e);
} catch (Exception e1) {
log.error("业务异常 e : {}", e1);
}
}
}