ProtoBuf 与 Netty 结合,高性能服务端

2020-11-14  本文已影响0人  ShootHzj

写在前面

Netty是高性能的网络框架,同样Proto Buf是高性能的编解码框架。两者的搭配可以说是恰到好处。

高性能的网络框架,也会碰到如何编解码组织消息,如何处理tcp拆包粘包的问题?

高性能的编解码框架,如果没有网络框架搭配,那他的高性能优势也荡然无存。

选择Netty+ProtoBuf的优秀开源项目也很多,比如Pulsar,Bookkeeper都使用了Netty+ProtoBuf。

准备Proto文件

首先,书写一个简单地.proto协议文件

syntax = "proto2";

package demo.netty;

option java_package = "com.github.shoothzj.demo.netty.protobuf.module";
option java_outer_classname = "ProtoBufModule";

message Request {
  required string first_name = 1;
  required string last_name = 2;
}

message Response {
  required int32 status = 1;
}

然后生成它的java类

protoc -I=proto --java_out=java proto/DemoNettyProtocol.proto

书写服务端Netty的handler类,简单地接收到请求发送响应

package com.github.shoothzj.demo.netty.protobuf;

import com.github.shoothzj.demo.netty.protobuf.module.ProtoBufModule;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * @author hezhangjian
 */
@Slf4j
public class ServerHandler extends SimpleChannelInboundHandler<ProtoBufModule.Request> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ProtoBufModule.Request request) throws Exception {
        log.info("request [{}]", request);
        final ProtoBufModule.Response response = ProtoBufModule.Response.newBuilder().setStatus(200).build();

        channelHandlerContext.channel().writeAndFlush(response);
    }
}

拉起Netty服务端

package com.github.shoothzj.demo.netty.protobuf;

import com.github.shoothzj.demo.netty.protobuf.module.ProtoBufModule;
import com.github.shoothzj.javatool.util.LogUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import lombok.extern.slf4j.Slf4j;

/**
 * @author hezhangjian
 */
@Slf4j
public class ProtoServer {

    private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;

    public static void main(String[] args) throws Exception {
        LogUtil.configureLog();
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100);
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    final ChannelPipeline pipeline = socketChannel.pipeline();
                    //解码器,通过Google Protocol Buffers序列化框架动态的切割接收到的ByteBuf
                    pipeline.addLast(new ProtobufVarint32FrameDecoder());
                    //服务器端接收的是客户端RequestUser对象,所以这边将接收对象进行解码生产实列
                    pipeline.addLast(new ProtobufDecoder(ProtoBufModule.Request.getDefaultInstance()));
                    //Google Protocol Buffers编码器
                    pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
                    //Google Protocol Buffers编码器
                    pipeline.addLast(new ProtobufEncoder());
                    pipeline.addLast(new ServerHandler());
                }
            });

            // Start the server.
            ChannelFuture f = bootstrap.bind(9997).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

客户端handler,当连接建立时发送request,收到响应后打印出来

package com.github.shoothzj.demo.netty.protobuf;

import com.github.shoothzj.demo.netty.protobuf.module.ProtoBufModule;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * @author hezhangjian
 */
@Slf4j
public class ClientHandler extends SimpleChannelInboundHandler<ProtoBufModule.Response> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ProtoBufModule.Response response) throws Exception {
        log.info("response is [{}]", response);
    }

    /**
     * 当channel激活的时候,客户端立刻发出请求
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final ProtoBufModule.Request request = ProtoBufModule.Request.newBuilder().setFirstName("Akka").setLastName("Scala").build();
        ctx.channel().writeAndFlush(request);
    }
}

客户端主函数:

package com.github.shoothzj.demo.netty.protobuf;

import com.github.shoothzj.demo.netty.protobuf.module.ProtoBufModule;
import com.github.shoothzj.javatool.util.LogUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import lombok.extern.slf4j.Slf4j;

/**
 * @author hezhangjian
 */
@Slf4j
public class ProtoClient {

    public static void main(String[] args) throws Exception {
        LogUtil.configureLog();
        // configure the client
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //解码器,通过Google Protocol Buffers序列化框架动态的切割接收到的ByteBuf
                            pipeline.addLast(new ProtobufVarint32FrameDecoder());
                            //将接收到的二进制文件解码成具体的实例,这边接收到的是服务端的ResponseBank对象实列
                            pipeline.addLast(new ProtobufDecoder(ProtoBufModule.Response.getDefaultInstance()));
                            //Google Protocol Buffers编码器
                            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
                            //Google Protocol Buffers编码器
                            pipeline.addLast(new ProtobufEncoder());

                            pipeline.addLast(new ClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9997).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

}

上一篇 下一篇

猜你喜欢

热点阅读