vert.x & netty

Netty系列(4)Protobuf 及其在 Netty 中的应

2020-03-25  本文已影响0人  suxin1932

1.protobuf

RPC(一般通过SOCKET进行数据传输)
1.定义一个接口说明文件:描述了对象(结构体),对象成员, 接口方法等信息
2.通过RPC框架所提供的编译器, 将接口说明文件编译成具体的语言文件,如java文件, python文件
3.在客户端和服务端分别引入RPC编译器所生成的文件, 即可像调用本地方法一样, 调用远程方法

1.1windows上安装protobuf编译器

https://github.com/protocolbuffers/protobuf/releases

protobuf编译器-windows版-下载后解压.png protobuf编译器-windows版-环境变量配置.png cmd验证.png

3.示例用法

Netty-protobuf 多协议解决方案1代码结构.png

3.0 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>java-tools</artifactId>
        <groupId>com.zy</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <properties>
        <org.slf4j.version>1.7.25</org.slf4j.version>
        <log4j.version>1.2.17</log4j.version>
        <lombok.version>1.16.20</lombok.version>
        <protobuf.version>3.9.1</protobuf.version>
    </properties>

    <artifactId>tools-netty</artifactId>
    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.48.Final</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${org.slf4j.version}</version>
        </dependency>
        <!--slf4j-log4j12包含了log4j依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${org.slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>${protobuf.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java-util</artifactId>
            <version>${protobuf.version}</version>
        </dependency>
    </dependencies>
    
</project>

3.1 定义 protoc 文件, 生成代码

syntax = "proto3";

option java_package = "com.zy.netty.netty03";
option java_outer_classname = "DataInfo";

// 这里定义传递多种实例的 protobuf 与 netty 结合的写法
// protobuf 多协议传输的解决方案一
message Message {

    enum DataType {
        SchoolType = 0;
        TeacherType = 1;
        StuType = 2;
    }

    DataType data_type = 1;
    oneof dataBody {
        School school = 2;
        Teacher teacher = 3;
        Stu stu = 4;
    }
}

message Stu {
    string name = 1;
    int32 age = 2;
    string gender = 3;
}

message Teacher {
    string name = 1;
    int32 age = 2;
    string gender = 3;
}

message School {
    string name = 1;
    float square = 2;
}

执行 protoc --java_out=src/main/java src/main/resources/proto/Data.proto 命令, 生成代码 com.zy.netty.netty03.DataInfo

3.2 server

package com.zy.netty.netty03;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Server03 {
    public static void main(String[] args) {
        ServerBootstrap server = new ServerBootstrap();
        // server 这里的 boss, worker 都没有设置 nThreads, 走默认值: io.netty.channel.MultithreadEventLoopGroup.DEFAULT_EVENT_LOOP_THREADS
        // 1.当 worker 不存在, server.group(boss, boss) 是 Reactor 的单线程模型
        // 2.当 worker 存在, boss 的 nThreads == 1 时,  server.group(boss, worker) 是 Reactor 的多线程模型
        // 3.当 worker 存在, boss 的 nThreads > 1 时,  server.group(boss, worker) 是 Reactor 的主从线程模型
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(new DefaultThreadFactory("bossGroupExecutor", true));
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(new DefaultThreadFactory("bossGroupExecutor", true));

        try {
            server.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline()
                                    // ProtobufVarint32FrameDecoder 用于解决粘包拆包问题
                                    .addLast(new ProtobufVarint32FrameDecoder())
                                    // ProtobufDecoder 仅仅支持解码, 不支持半包处理
                                    // 这个解码器定义了要解码的数据类型: 这里限制了只能解码Student.Stu类型, 思考解决方案, 如何通用?
                                    .addLast(new ProtobufDecoder(DataInfo.Message.getDefaultInstance()))
                                    .addLast(new ProtobufVarint32LengthFieldPrepender())
                                    .addLast(new ProtobufEncoder())
                                    .addLast(new ServerHandler03());
                        }
                    });

            ChannelFuture channelFuture = server.bind("127.0.0.1", 8099).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error.", e);
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    /**
     * 这里讲述下 @Sharable 的作用:
     * 1.一般情况下, Server 每与 Client 建立一个连接, 都会建立一个 Channel, 一个 ChannelPipeline
     * 在每一个 ChannelPipeline 中都会新建 ChannelHandler, 如: ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
     *
     * 2.如果要想定义一个全局的 ChannelHandler (单例) 供所有的 Channel 使用, 比如统计全局的 metrics 信息, 则可以:
     * ServerHandler03 sharableHandler = new ServerHandler03(); // 这个定义在 server.group 之前
     * server.group(boss, worker).
     * ... // 这里 加入
     * ch.pipeline().addLast(sharableHandler);
     *
     * 3.需要说明的是: 第2步中的定义的 sharableHandler 必须被 @Sharable 修饰, 否则, 当有第二个 Channel 建立时, 将会报错, 详见:
     * io.netty.channel.DefaultChannelPipeline#checkMultiplicity(io.netty.channel.ChannelHandler)
     *
     * 4.当然, 如果只是每建立一个 Channel, 就 new 一个 ChannelHandler, 则 @Sharable 修饰与否 都不影响
     */
    @ChannelHandler.Sharable
    private static class ServerHandler03 extends SimpleChannelInboundHandler<DataInfo.Message> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, DataInfo.Message msg) throws Exception {
            // protobuf 多协议传输的解决方案一
            if (msg.getDataType() == DataInfo.Message.DataType.SchoolType) {
                System.out.println("server received msg: " + msg.getSchool());
            } else if (msg.getDataType() == DataInfo.Message.DataType.TeacherType) {
                System.out.println("server received msg: " + msg.getTeacher());
            }  else if (msg.getDataType() == DataInfo.Message.DataType.StuType) {
                System.out.println("server received msg: " + msg.getStu());
            }
        }
    }
}

3.3 client

package com.zy.netty.netty03;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;

@Slf4j
public class Client03 {
    public static void main(String[] args) {
        Bootstrap client = new Bootstrap();
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();

        try {
            client.group(nioEventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new ProtobufVarint32FrameDecoder())
                                    .addLast(new ProtobufDecoder(DataInfo.Message.getDefaultInstance()))
                                    .addLast(new ProtobufVarint32LengthFieldPrepender())
                                    .addLast(new ProtobufEncoder())
                                    .addLast(new ClientHandler03());
                        }
                    });

            ChannelFuture channelFuture = client.connect("127.0.0.1", 8099).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("client error.", e);
        } finally {
            nioEventLoopGroup.shutdownGracefully();
        }
    }

    private static class ClientHandler03 extends SimpleChannelInboundHandler<DataInfo.Message> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, DataInfo.Message msg) throws Exception {
            System.out.println("client receive msg: " + msg);
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // protobuf 多协议传输的解决方案一
            DataInfo.Message message;
            int i = new Random().nextInt(3);
            switch (i) {
                case 0:
                    message = DataInfo.Message.newBuilder().setDataType(DataInfo.Message.DataType.SchoolType)
                            .setSchool(DataInfo.School.newBuilder().setName("nanjingjinlingzhognxue").setSquare(800).build())
                            .build();
                    break;
                case 1:
                    message = DataInfo.Message.newBuilder().setDataType(DataInfo.Message.DataType.TeacherType)
                            .setTeacher(DataInfo.Teacher.newBuilder().setName("tom").setAge(30).build())
                            .build();
                    break;
                default:
                    message = DataInfo.Message.newBuilder().setDataType(DataInfo.Message.DataType.StuType)
                            .setStu(DataInfo.Stu.newBuilder().setName("jerry").setAge(10).build())
                            .build();
            }
            ctx.writeAndFlush(message);
        }
    }
}

参考资料
https://developers.google.com/protocol-buffers/docs/proto3
https://github.com/protocolbuffers/protobuf
https://www.jianshu.com/p/506667a6651f
https://blog.csdn.net/u011518120/article/details/54604615

上一篇 下一篇

猜你喜欢

热点阅读