netty使用Google Protobuf协议优化通道的序列化

2022-10-17  本文已影响0人  天草二十六_简村人

一、Google Protobuf 协议

解决序列化和检索效率的问题。

序列化的方法有:

Protocol buffers是灵活,高效,自动化的解决方案来解决这个问题。 使用Protocol buffers,您可以编写一个.proto描述您希望存储的数据结构。 Protocol buffers编译器创建一个实现自动编码和解析协议缓冲区数据的类,并使用高效的二进制格式。 生成的类为组成Protocol buffers的字段提供getter和setter。

1.1、定义文件xxxSocketMessage.proto(首字母小写)

syntax = "proto3";

package com.xxx.channel.core.common.bean.dto;

option java_package = "com.xxx.channel.core.common.bean.dto";
option java_outer_classname = "XxxSocketMessage";


enum MessageType {
    // 服务端
    SERVER = 0;
    // 默认
    DEFAULT = 1;
}

/**
 * 消息数据包
 */
message Message {
    // 消息来源Type
    MessageType type = 1;
    // 自定义Tag,用于划分消息内容
    int32 tag = 2;
    // 消息Action
    int32 action = 3;
    // 消息内容
    string data = 4;
}

1.2、java类调用Protobuf的类Message

//实际使用protobuf序列化框架客户端将对象转译成字节数组,然后通过协议传输到服务器端,服务器端可以是其他的语言框架(比如说python)将
//字节对象反编译成java对象
public class ProtobuffTest {
    public static void main(String[] args) throws Exception{
// 对应java_outer_classname的名称
       XxxSocketMessage.Message message =  XxxSocketMessage.Message.newBuilder()
                .setTag(XxxSocketMessage.MessageType.SERVER_VALUE)
                .setAction(ResponseCode.Notification.ONLINE_LIST)
                .setData(onlineListJson).build();

        //将对象转译成字节数组,序列化
        byte[] message2ByteArray = message.toByteArray();

        //将字节数组转译成对象,反序列化
        XxxSocketMessage.Message message2 = XxxSocketMessage.Message.parseFrom(student2ByteArray);

        System.out.println(message2.getType());
        System.out.println(message2.getTag());
        System.out.println(message2.getData());
        System.out.println(message2.getAction());
    }
}

1.3、pom.xml

引入相关jar包

<properties>
        <netty.version>4.1.29.Final</netty.version>
        <protobuf.version>3.7.1</protobuf.version>
        <grpc.version>1.12.0</grpc.version>
    </properties>

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>${netty.version}</version>
        </dependency>
<dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>${protobuf.version}</version>
        </dependency>
<dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-all</artifactId>
            <version>${grpc.version}</version>
        </dependency>

maven插件

用于编译.proto文件

<plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.0</version>
                <configuration>
                    <protocArtifact>
                        com.google.protobuf:protoc:3.1.0:exe:${os.detected.classifier}
                    </protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
                    </pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

二、定义消息处理类Handler

ClientHandler继承SimpleChannelInboundHandler并重写channelActive方法, 在该方法中我们处理登录逻辑,最后调用方法writeAndFlush()将其发送到server端。
这里,只给出客户端的代码示例,服务端类似。

public class ClientHandler extends SimpleChannelInboundHandler<ProtobufData.Task>{

@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        responseLoginInfo(ctx.channel());
        super.channelActive(ctx);
    }

private void responseLoginInfo(Channel channel) {
        UserInfo userInfo = getUserInfo();
        ProtobufData.LoginPack loginPack = ProtobufData.LoginPack.newBuilder()
                .setAppKey(userInfo.getAppKey())
                .setUserType(userInfo.getUserType())
                .setRoomId(userInfo.getRoomId())
                .setUserId(userInfo.getUserId()).build();
        ProtobufData.Task task = TaskPackage.login(loginPack);
        channel.writeAndFlush(task);
    }

}

三、客户端连接服务端的代码示例


import com.google.common.base.Joiner;
import com.google.protobuf.AbstractMessage;
import com.xxx.channel.core.common.constant.ACKType;
import com.xxx.channel.core.common.constant.CMD;
import com.xxx.channel.core.common.protobuf.ProtobufData;
import com.xxx.channel.core.common.protobuf.TaskPackage;
import com.xxx.channel.core.common.utils.UUIDUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;

import java.util.Set;


public class ChannelClient {
    /**
     * 通道ip
     */
    @Value("${channelService.ip}")
    private String ip;
    
    /**
     * 通道端口
     */
    @Value("${channelService.port}")
    private int port;
    /**
     * 连接通道
     */
    private Channel channel;
    
private static final Logger logger = LoggerFactory.getLogger(ChannelClient.class);

    /**
     * init
     */
    private void init() {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline()
                                // 下面四行,和Proto协议有关,主要是编解码。
                                .addLast(new ProtobufVarint32FrameDecoder())
                                .addLast(new ProtobufDecoder(ProtobufData.Task.getDefaultInstance()))
                                .addLast(new ProtobufVarint32LengthFieldPrepender())
                                .addLast(new ProtobufEncoder())

                                .addLast(new IdleStateHandler(0, 3, 0))
                                
                                // 消息处理类,见上一步的定义
                                .addLast(new ClientHandler());
                    }
                });
        ChannelFuture future;
        try {
            future = bootstrap.connect(ip, port).sync();
            channel = future.channel();
        } catch (InterruptedException e) {
            logger.error("connect channel-service-server error", e);
        }
    }


    /**
     * 每隔3秒尝试重连
     */
    @Scheduled(initialDelay = 3000, fixedRate = 10000)
    public void reconnect() {
        if (!isConnected()) {
            init();
            logger.info("channel_service_client start connect to:{}", channel.remoteAddress());
        }
    }

    /**
     * 获取指定连接
     *
     * @return
     */
    public Channel getChannel() {
        return channel;
    }

    /**
     * 连接是否正常
     *
     * @return boolean
     */
    private boolean isConnected() {
        return channel != null && channel.isActive();
    }

}

四、客户端的对接

4.1、处理消息

实现接口IBusinessHandler的接收和响应报文,在ClientHandler类中重写方法channelRead0()。

private IBusinessHandler businessHandler;

@Override
    protected void channelRead0(ChannelHandlerContext ctx, ProtobufData.Task task) {
        //业务处理
        switch (task.getPackType()) {
// 接收消息
            case MESSAGE:
                businessHandler.handleMessage(task.getMessagePack());
                boolean enableACKResponse = task.getMessagePack().getAckType() > 0;
                responseACK(task.getMessagePack().getMessageId(), ctx.channel(), enableACKResponse);
                break;
            case ACK:
                break;
            case HEARTBEAT:
                break;
// 响应消息
            case RESPONSE:
                ProtobufData.ResponsePack responsePack = task.getResponsePack();
                businessHandler.handleResponse(responsePack);
                responseACK(responsePack.getMessageId(), ctx.channel(), false);
                break;
            default:
                break;
        }

    }

PackType的枚举类见下:

enum PackType {
    //登录
    Login = 0;
    //数据传输
    MESSAGE = 1;
    //退出
    EXIT = 2;
    //ack
    ACK = 3;
    //heartBeat
    HEARTBEAT = 4;
    //Response pack 响应体
    RESPONSE = 5;
}

4.2、IBusinessHandler的实现

import com.google.protobuf.InvalidProtocolBufferException;
import com.xxx.channel.core.common.bean.dto.XxxSocketMessage;
import com.xxx.channel.core.common.constant.ResponseCode;
import com.xxx.channel.core.common.netty.handle.IBusinessHandler;
import com.xxx.channel.core.common.protobuf.ProtobufData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class BusinessHandler implements IBusinessHandler {
    private final Logger log = LoggerFactory.getLogger(BusinessHandler.class);

    @Override
    public void handleResponse(ProtobufData.ResponsePack responsePack) {
        short code = (short) responsePack.getCode();
        switch (code) {
            //房主上线
            case ResponseCode.Notification.OWNER_ONLINE:
                log.info("online response pack:{}", responsePack);
                break;
            //房主下线
            case ResponseCode.Notification.OWNER_OFFLINE:
                log.info("offline response pack:{}", responsePack);
                break;
            default:
                break;
        }
    }

    @Override
    public void handleMessage(ProtobufData.MessagePack messagePack) {
        try {
            XxxSocketMessage.Message message = XxxSocketMessage.Message.parseFrom(messagePack.getDataBytes());
           
// 调用ChannelClient中的方法,实现消息的上行和下行。
 tdService.handlerMessages(message.getData(), messagePack.getFrom());
        } catch (InvalidProtocolBufferException e) {
            log.error("数据解析异常 e : {}", e);
        } catch (Exception e1) {
            log.error("业务异常 e : {}", e1);
        }
    }

}

上一篇下一篇

猜你喜欢

热点阅读