Nettynetty学习

Netty笔记之七:Google Protobuf与Netty结

2017-07-04  本文已影响488人  二月_春风

背景

学过java的都使用过RMI框架(remote method invocation),远程方法调用,比如A,B二个服务器,A调用B服务器上的方法就像调用本地方法一样,但是本质上是跨机器的调用了,A机器将调用的方法名,参数通过字节码的形式传输到B这台机器上,B这台机器将这些字节码转换成对B机器上具体方法的调用,并将相应的返回值序列化成二进制数据传输到A服务器上。

RPC(Remote Procedure Call)其实和rmi及其类似,RPC与RMI框架对比的优势就是好多RPC框架都是跨语言的。

RMI只针对java,A,B服务都使用java编写。几乎所有的RPC框架都存在代码生成,自动代码屏蔽了底层序列化通信等各种细节的处理,使得用户(开发者)可以像调用本地方法一样调用远程的方法。一般这种自动生成的代码在客户端我们称为stub,服务端我们称为skeleton。

序列化与反序列化技术,也称为编码与解码技术,比如我们本篇博客讨论的Google Protobuf,和marshalling等技术。

从广义上来讲,webservice也可以称为RPC框架,但是相比于其他的RPC框架来说,webservice的性能稍微差点,因为决定一个rpc性能的优秀与否在于其底层对象编解码性能。RPC一般都是基于socket协议传输的,而webservice基于http传输的,socket协议的性能也要高于http协议传输数据。所以,一般在公司内部各个微服务之间的服务调用都使用RPC框架多一点,因为在性能上的考虑,而我们总所周知的dubbo虽然也算是RPC框架,但其实并不支持多语言。

什么是protocol buffers?

Protocol buffers是谷歌的语言中立,平台中立的,可扩展机制的序列化数据结构框架-可以看作是xml,但是体积更小,传输速率更快,使用更加简单。一旦你定义了你的数据格式,你可以使用生成源代码去轻松地从各种数据流读和写你的结构化数据并且使用不同的语言。protobuf有2.0版本和3.0版本,3.0版本十grpc框架的基础

Protocol buffers目前支持Java, Python, Objective-C, 和C++生成代码。新的proto3语言版本,你可以使用Go, JavaNano, Ruby, 和 C#。

为什么使用Protocol buffers

使用一个简单的可以从一个文件中去读写人员联系信息"地址簿"程序。每个在地址簿的人有姓名,id,邮箱地址和一个联系人电话号码属性。

你如何序列化和检索这样的结构化数据? 有几种方法来解决这个问题:
使用java原生的序列化。这是一种默认的方式因为是内嵌于java语言的,但是有一大堆众所周知的问题(参考Effective Java这本书),并且你不能将数据分享于C++和Python应用(也就是不能跨语言)。

还可以将数据项编码为单个字符串的ad-hoc方式 - 例如将4个ints编码为“12:3:-23:67”。 这是一个简单而灵活的方法,尽管它需要编写一次性编码和解析代码,并且解析具有很小的运行时成本。 这最适合编码非常简单的数据。

将数据序列化为XML。 这种方法可能非常有吸引力,因为XML是(可能的)人类可读的,并且有很多语言的绑定库。 如果您想与其他应用程序/项目共享数据,这可能是一个很好的选择。 然而,XML浪费性能,编码/解码可能会对应用程序造成巨大的性能损失。 另外,检索XML DOM树比在一般类中简单的字段检索要复杂得多。

Protocol buffers是灵活,高效,自动化的解决方案来解决这个问题。 使用Protocol buffers,您可以编写一个.proto描述您希望存储的数据结构。 Protocol buffers编译器创建一个实现自动编码和解析协议缓冲区数据的类,并使用高效的二进制格式。 生成的类为组成Protocol buffers的字段提供getter和setter。

使用Protobuf编写一个编码解码的最简单程序

定义一个Student.proto文件

syntax ="proto2";

package com.zhihao.miao.protobuf;

//optimize_for 加快解析的速度
option optimize_for = SPEED;
option java_package = "com.zhihao.miao.protobuf";
option java_outer_classname="DataInfo";

message Student{
    required string name = 1;
    optional int32 age = 2;
    optional string address = 3;
}

在Java项目中,除非你已经明确指定了java_package,否则package 用作Java的包名。即使您提供java_package,您仍然应该定义一个package,以避免在Protocol Buffers名称空间和非Java语言中的名称冲突。

在package的定义之后,我们可以看到两个定义的java选项:java_packagejava_outer_classnamejava_package指定您生成的类应该存放的Java包名称。 如果没有明确指定它,将会使用package定义的name作为包名,但这些名称通常不是适合的Java包名称(因为它们通常不以域名开头)。 java_outer_classname选项定义应该包含此文件中所有类的类名。 如果你不明确地给出一个java_outer_classname,它将通过将文件名转换为驼峰的方式来生成。 例如,默认情况下,“my_proto.proto”将使用“MyProto”作为外部类名称。

每个元素上的“= 1”,“= 2”标记标识字段在二进制编码中使用的唯一“标签”。你可以将经常使用或者重复的字段标注成1-15,因为在进行编码的时候因为少一个字节进行编码,所以效率更高。

required:必须提供该字段的值,否则被认为没有初始化。尝试构建一个未初始化的值被会抛出RuntimeException。解析一个为初始化的消息会抛出IOException。除此之外与optional一样。
optional:可以设置或不设置该字段。 如果未设置可选字段值,则使用默认值。
repeated:字段可能重复任意次数(包括零)。 重复值的顺序将保留在protocol buffer中。 将重复的字段视为动态大小的数组。(本列子中没有字段定义成repeated类型,定义成repeated类型其实就是java中List类型的字段。

慎重使用required类型,将required类型的字段更改为optional会有一些问题,而将optional类型的字段更改为required类型,则没有问题。

编译

使用protocol buffers编译器将对应的.proto文件编译成对应的类
关于编译器的安装,下载地址

下载页面图示

修改环境变量

➜  vim .bash_profile
export PATH=/Users/naeshihiroshi/software/work/protoc-3.3.0-osx-x86_64/bin
➜  source .bash_profile
➜  which protoc
/Users/naeshihiroshi/software/work/protoc-3.3.0-osx-x86_64/bin/protoc

进入项目目录,执行编译语句如下:

➜  netty_lecture git:(master) ✗ protoc --java_out=src/main/java  src/protobuf/Student.proto   

--java_out后面第一个参数指定代码的路径,具体的包名在.proto文件中的java_package指定了,第二个指定要编译的proto文件。

自动生成的类名是DataInfo(在java_outer_classname中指定了),自动生成的类太长,这边就不列出来了。

编写序列化反序列化测试类

package com.zhihao.miao.protobuf;

//实际使用protobuf序列化框架客户端将对象转译成字节数组,然后通过协议传输到服务器端,服务器端可以是其他的语言框架(比如说python)将
//字节对象反编译成java对象
public class ProtobuffTest {
    public static void main(String[] args) throws Exception{
        DataInfo.Student student = DataInfo.Student.newBuilder().
                setName("张三").setAge(20).setAddress("北京").build();

        //将对象转译成字节数组,序列化
        byte[] student2ByteArray = student.toByteArray();

        //将字节数组转译成对象,反序列化
        DataInfo.Student student2 = DataInfo.Student.parseFrom(student2ByteArray);

        System.out.println(student2.getName());
        System.out.println(student2.getAge());
        System.out.println(student2.getAddress());
    }
}

执行测试类,控制台打印:

张三
20
北京

Google Protobuf与netty结合

protobuf做为序列化的一种方式,序列化之后通过什么样的载体在网络中传输?

使用netty使得经过protobuf序列化的对象可以通过网络通信进行客户端和服务器的信息通信。客户端使用protobuf将对象序列化成字节码,而服务器端通过protobuf将对象反序列化成原本对象。

写一个使用Protobuf作为序列化框架,netty作为传输层的最简单的demo,需求描述:

定义的.proto文件如下:

syntax ="proto2";

package com.zhihao.miao.netty.sixthexample;

option optimize_for = SPEED;
option java_package = "com.zhihao.miao.test.day06";
option java_outer_classname="DataInfo";

message RequestUser{
    optional string user_name = 1;
    optional int32 age = 2;
    optional string password = 3;
}

message ResponseBank{
    optional string bank_no = 1;
    optional double money = 2;
    optional string bank_name=3;
}

使用Protobuf编译器进行编译,生成DataInfo对象,

服务器端代码:

package com.zhihao.miao.test.day06;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class ProtoServer {
    public static void main(String[] args) throws Exception{
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup wokerGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ProtoServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            wokerGroup.shutdownGracefully();
        }
    }
}

服务端ProtoServerInitializer(初始化连接):

package com.zhihao.miao.test.day06;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;


public class ProtoServerInitializer extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        //解码器,通过Google Protocol Buffers序列化框架动态的切割接收到的ByteBuf
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //服务器端接收的是客户端RequestUser对象,所以这边将接收对象进行解码生产实列
        pipeline.addLast(new ProtobufDecoder(DataInfo.RequestUser.getDefaultInstance()));
        //Google Protocol Buffers编码器
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        //Google Protocol Buffers编码器
        pipeline.addLast(new ProtobufEncoder());

        pipeline.addLast(new ProtoServerHandler());
    }
}

自定义服务端的处理器:

package com.zhihao.miao.test.day06;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ProtoServerHandler extends SimpleChannelInboundHandler<DataInfo.RequestUser> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DataInfo.RequestUser msg) throws Exception {
        System.out.println(msg.getUserName());
        System.out.println(msg.getAge());
        System.out.println(msg.getPassword());

        DataInfo.ResponseBank bank = DataInfo.ResponseBank.newBuilder().setBankName("中国工商银行")
                .setBankNo("6222222200000000000").setMoney(560000.23).build();

        ctx.channel().writeAndFlush(bank);
    }
}

客户端:

package com.zhihao.miao.test.day06;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class ProtoClient {

    public static void main(String[] args) throws Exception{
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new ProtoClientInitializer());

            ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

客户端初始化连接(ProtoClientInitializer),

package com.zhihao.miao.test.day06;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class ProtoClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        //解码器,通过Google Protocol Buffers序列化框架动态的切割接收到的ByteBuf
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //将接收到的二进制文件解码成具体的实例,这边接收到的是服务端的ResponseBank对象实列
        pipeline.addLast(new ProtobufDecoder(DataInfo.ResponseBank.getDefaultInstance()));
        //Google Protocol Buffers编码器
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        //Google Protocol Buffers编码器
        pipeline.addLast(new ProtobufEncoder());

        pipeline.addLast(new ProtoClientHandler());
    }
}

自定义客户端处理器:

package com.zhihao.miao.test.day06;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ProtoClientHandler extends SimpleChannelInboundHandler<DataInfo.ResponseBank> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DataInfo.ResponseBank msg) throws Exception {
        System.out.println(msg.getBankNo());
        System.out.println(msg.getBankName());
        System.out.println(msg.getMoney());
    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        DataInfo.RequestUser user = DataInfo.RequestUser.newBuilder()
                .setUserName("zhihao.miao").setAge(27).setPassword("123456").build();
        ctx.channel().writeAndFlush(user);
    }
}

运行服务器端和客户端,服务器控制台打印:

七月 03, 2017 11:12:03 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xa1a63b58, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0x08c534f3, L:/127.0.0.1:8899 - R:/127.0.0.1:65448]
七月 03, 2017 11:12:03 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xa1a63b58, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
zhihao.miao
27
123456

客户端控制台打印:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
6222222200000000000
中国工商银行
560000.23

总结

本节我们使用Google Protobuf定义消息体格式,使用Netty作为网络传输层框架。其实大多数RPC框架底层实现都是使用序列化框架和NIO通信框架进行结合。下面还会学习基于Protobuf 3.0协议的Grpc框架(Google基于Protobuf 3.0协议的一个跨语言的rpc框架,更加深入的去了解rpc框架)。

参考资料

官方网站
指南
java指南

上一篇下一篇

猜你喜欢

热点阅读