SpringBoot精选IT@程序员猿媛

Netty整合Protobuf编解码,并解决半包问题

2019-10-23  本文已影响0人  程就人生

Google的工具在平时的开发中也用过,但是听到Protobuf编解码框架时,还是忍不住去用一用,想知道它为什么在业界这么流行,而且是非常流行。

在使用Google Protobuf编解码框架时,还需要搭建环境,这一点有点麻烦,想着后面可能带来的便利,这一步忍了;本文有两部分,第一部分Protobuf编解码java文件的生成;第二部分Protobuf编解码示例。

第一部分,Protobuf编解码java文件的生成;

首先,下载生成代码的压缩包,打开https://github.com/protocolbuffers/protobuf/releases页面,本次使用的压缩包是protoc-3.10.0-win32;

下载截图

第二步,对下载后的压缩包解压,根据需要编写proto文件,文件的后缀为.proto;

syntax = "proto3";
option java_outer_classname="ProtoMsgProto";
message ProtoMsg{
    int32 subReqId = 1;
    string desc = 2;    
}

语法简要说明:

  1. syntax代表版本号,默认proto2,这里使用的是3,所以必须是proto3;
  2. package为生成的java加包名,防止message类冲突;
  3. option java_outer_classname是要输出的java文件的类名,不可与message后的名字重复;
  4. option java_package为每个message类加包名;
  5. option java_multiple_files默认false,表示生成java时的打包方式;
    false表示所有的消息都作为内部类,打包到一个外部类中;
    true代表一个消息打包成一个java类;

把option java_multiple_files=true;加入到proto文件里,再次使用命令生成,可以看到生成的java文件有三个,也就是一个message对应三个java类文件。


java_multiple_files为true生成的文件

第三步,使用管理员的身份打开cmd窗口,进入解压的bin文件夹下,输入命令进行java文件的生成,

C:\protoc-3.10.0-win32\bin>protoc.exe ProtoMsg.proto --java_out=./
protoc生成的java文件

在使用protoc.exe生成java文件的时候,我们使用的是3.10.0版本的工具,所以在项目里的pom中引入文件的时候,版本也需要与生成文件的相匹配。

使用protoc.exe生成java文件的方法有多种,这里使用的是最简单便捷的方法,也不需要配置什么环境变量,能省一步是一步。还有一种方法,就是在maven里增加插件与对应的配置来生成java文件,个人感觉比较麻烦,还不如使用cmd命令行来的快。

接下来,进行服务端客户端代码的开发,由于protobuf仅仅支持解码编码,并没有处理粘包/半包的问题,所以还需要借助其他的工具类进行粘包/半包的处理。

第二部分,Protobuf编解码示例;

首先,pom文件的引入;

<!-- netty架包依赖 -->
        <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
        </dependency>
<!-- protobuf架包 -->
        <dependency>    
            <groupId>com.google.protobuf</groupId>  
            <artifactId>protobuf-java</artifactId>  
            <version>3.10.0</version>
        </dependency>

第二,服务端的编码,ProtoMsgProto.java文件是通过protoc.exe生成的文件,这里就忽略不贴代码了;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
 * netty整合protobuf,服务端模拟
 * @author 程就人生
 * @date 2019年10月22日
 */
public class ProtobufServer {

    public void bind(int port){
        //开启线程组
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap =  new ServerBootstrap();
            serverBootstrap.group(parentGroup, childGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 100)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>(){

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    //解码时,解决粘包/半包的问题
                    ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                    //对接收到的信息,使用protobuf进行解码
                    ch.pipeline().addLast(new ProtobufDecoder(ProtoMsgProto.ProtoMsg.getDefaultInstance()));
                    //编码时,解决粘包/半包的问题
                    ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                    //对于要发送的信息,使用protobuf进行编码
                    ch.pipeline().addLast(new ProtobufEncoder());
                    ch.pipeline().addLast(new ProtobufServerHandler());
                }
                
            });
            //绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //等待服务器端监听端口关闭
            channelFuture.channel().closeFuture().sync();   
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            //优雅退出,并释放线程
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] argo){
        //启动服务端
        ProtobufServer protobufServer = new ProtobufServer();
        protobufServer.bind(8080);
    }
}

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ProtobufServerHandler extends ChannelInboundHandlerAdapter{

    private static Logger log = LoggerFactory.getLogger(ProtobufServerHandler.class);
    
    /**
     * 接收到消息时的处理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        
        ProtoMsgProto.ProtoMsg req = (ProtoMsgProto.ProtoMsg) msg;
        if(req.getDesc().contains("程就人生")){
            log.info("接收到的消息:" + req.getDesc());
            ctx.writeAndFlush(resp(req.getSubReqId()));
        }
    }
    
    /**
     * 构造返回的消息
     * @param subReqID
     * @return
     *
     */
    private ProtoMsgProto.ProtoMsg resp(int subReqID){
        ProtoMsgProto.ProtoMsg.Builder builder = ProtoMsgProto.ProtoMsg.newBuilder();
        builder.setSubReqId(subReqID);
        builder.setDesc("服务端已经顺利接收到客户端发送的消息,消息id:" + subReqID);     
        return builder.build();
    }
    
    
    /**
     * 异常时关闭
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        //发生异常,关闭连接
        ctx.close();
    }
}

第三步,客户端的编码;

package com.example.netty.client3;

import com.example.netty.server3.ProtoMsgProto;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
/**
 * Netty整合protobuf客户端模拟
 * @author 程就人生
 * @date 2019年10月22日
 */
public class ProtobufClient {

    public void connect(int port, String host){
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>(){

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    //解码时,解决粘包/半包的问题
                    ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                    //对接收到的信息,使用protobuf进行解码
                    ch.pipeline().addLast(new ProtobufDecoder(ProtoMsgProto.ProtoMsg.getDefaultInstance()));
                    //编码时,解决粘包/半包的问题
                    ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                    //对于要发送的信息,使用protobuf进行编码
                    ch.pipeline().addLast(new ProtobufEncoder());
                    ch.pipeline().addLast(new ProtobufClientHandler());                 
                }
                
            });
            //发起异步连接操作
            ChannelFuture channelFuture = bootstrap.connect(host,port).sync();
            //等待客户端链路关闭
            channelFuture.channel().closeFuture().sync();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            //优雅退出,释放线程组
            group.shutdownGracefully();
        }
    }
    
    public static void main(String[] argo){
        //启动客户端,连接服务端
        ProtobufClient protobufClient = new ProtobufClient();
        protobufClient.connect(8080, "127.0.0.1");
    }
}

package com.example.netty.client3;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.example.netty.server3.ProtoMsgProto;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ProtobufClientHandler extends ChannelInboundHandlerAdapter{

    private static Logger log = LoggerFactory.getLogger(ProtobufClientHandler.class);
    
    /**
     * 
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for(int i=0;i<100;i++){
            ctx.writeAndFlush(req(i));
        }
    }
    
    /**
     * 构造返回的消息
     * @param subReqID
     * @return
     *
     */
    private ProtoMsgProto.ProtoMsg req(int subReqID){
        ProtoMsgProto.ProtoMsg.Builder builder = ProtoMsgProto.ProtoMsg.newBuilder();
        builder.setSubReqId(subReqID);
        builder.setDesc("程就人生"+subReqID);       
        return builder.build();
    }
     
    /**
     * 接收到消息时的处理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        
        ProtoMsgProto.ProtoMsg req = (ProtoMsgProto.ProtoMsg) msg;
        log.info("接收到的消息:" + req.getDesc());        
    }
    
    /**
     * 异常时关闭
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.close();
    }
}

最后,测试;先启动服务端,再启动客户端,查看控制台的输出;

服务端测试结果
客户端测试结果

protobuf之所以这么流行,原因有很多:第一,它是跨语言的,除了java还支持python、C++等语言;第二,编码后是以二进制的形式进行传输的,传输速度快。但是,它只负责编解码,粘包/半包的问题,还需通过其他的工具类进行解决。

除了使用上文的ProtobufVarint32FrameDecoder类进行半包的处理,还可以使用LengthFieldBasedFrameDecoder工具类来解决;LengthFieldBasedFrameDecoder工具类的使用,在Netty整合MessagePack、LengthFieldBasedFrameDecoder解决粘包/拆包问题中有说明;除此之外,还可以继承ByteToMessageDecoder类,自己处理半包消息的问题。

Netty整合Protobuf框架进行编解码的简单操作,就这样告一段落了,更多的还需要继续看源码,加油吧。

上一篇下一篇

猜你喜欢

热点阅读