手写RPC框架

手写RPC框架(5)-Netty入门了解和实践

2019-11-03  本文已影响0人  jwfy

手写RPC框架
1、手写一个RPC框架,看看100个线程同时调用效果如何
2、手写RPC框架(2)-引入zookeeper做服务治理
3、手写RPC框架(3)-引入Hessian序列化工具
4、手写RPC框架(4)-重写服务治理,开启1000个线程看看netty的执行调用情况

Netty是基于NIO的的服务框架,屏蔽了使用Java原生NIO网络模型的各种问题,对外提供灵活的Reactor模型配置,也提供了插拔式的Handler处理器,便于支持各种网络协议和特定业务等操作,也是异步事件驱动,使得性能能够更高。此前RPC中关于Netty的代码逻辑存在些问题,对Netty的一些概念也没有理解到位,所以这次就一起再学习Netty,先写一个demo有大致的了解和印象,随后通过问题介绍各个组件的功能和特点,其原因是什么。

Demo

运行效果如下图

image

服务端

public class Server {
    public void run(int port) throws InterruptedException {
        EventLoopGroup workGroup = new NioEventLoopGroup();
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap
                    .group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new TimeServerHandler());
                        }
                    });

            ChannelFuture cf = serverBootstrap.bind(port).sync();
            cf.channel().closeFuture().sync();
        } finally {
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Server().run(10002);
    }
}
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered ...");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered ...");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive ...");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive ...");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("Client:[" + body + "]");

        String cur = ("Hello, My name is jwfy".equalsIgnoreCase(body) ? "OK" : "ERROR") + System.getProperty("line.separator");
        ctx.writeAndFlush(Unpooled.copiedBuffer(cur.getBytes()));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete ...");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户端

public class Client {
    public void connection(String host, int port) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new TimeClientHandler());
                        }
                    });

            ChannelFuture cf = bootstrap.connect(host, port).sync();
            cf.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Client().connection("127.0.0.1", 10002);
    }
}
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered ...");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered ...");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive ...");
        for(int i = 0; i < 1; i++ ) {
            ctx.writeAndFlush(Unpooled.copiedBuffer(("Hello, My name is jwfy" + System.getProperty("line.separator")).getBytes()));
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive ...");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("Server:[" + body + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

EventLoopGroup

EventLoopGroup 是一种Reactor多线程模型的抽象,具体实现一般都是NioEventLoopGroup。而Reactor模型又有单线程、多线程、以及主从多线程模型,他们有什么区别呢?

单线程模型

image

1个NIO线程原则上可以负责所有IO相关的请求操作,通过acceptor接收客户端发生的TCP请求,当链接建立成功之后,通过Dispatch将对于的请求数据包装成bytebuf指派给相关的handler处理。但是这在某些场景下也不太合适。

多线程模型

image

同样是一个NIO线程接收客户端的请求调用,当链接完成后请求会分配给一个NIO线程池,具体的消息序列化反序列化、数据处理等任务可有NIO线程池中的线程完成。

同样的基本情况下是没有问题的,但是多个客户端连接依旧可能出现性能问题,故有了主从多线程模型

主从多线程模型

主从情况就是从一个NIO线程变成了一个NIO线程池,可同时由多个NIO线程处理客户端的请求连接操作,减少因为性能不足导致的问题,这也是netty推荐的使用方法

EventLoopGroup 则也是一个NIO线程池,即可用于客户端的TCP请连接求,也可用于数据的IO处理,所以在上述代码中观察发现服务端和客户端的EventLoopGroup个数不一样也是这个道理,服务端一个线程池用来接收客户端连接,另一个则用来进行读写IO操作。

粘包、拆包

众所周知,网络上的传输的都是字节流,从TCP/IP协议角度出发无法知道具体的业务数据组装情况,所以实际场景中一个请求可能被分批次传输,也有可能因为请求数据太少故打包多个请求统一传输

image

如上图,正常的情况是分别有D1和D2两个数据包发送到服务端,但是因为网络拥塞比较严重,滑动窗口自适应的缩小,使得1个缓冲区的大小无法装满整个请求体,就会出现拆包的情况;又例如请求体内容较少,无法填充完整缓冲区,那么就会等待多个请求把缓冲区填满再发送出去,就会出现粘包的情况,如下距离:

必须首先处理好拆包和粘包问题,才能保证收到正常的完整的消息,而netty则帮我们解决了大部分问题了,例如根据长度拆分(FixedLengthFrameDecoder),根据换行符拆分(LineBasedFrameDecoder),又或者分割符拆分(DelimiterBasedFrameDecoder),只是在本Demo中使用的是换行符切分的LineBasedFrameDecoder

ChannelPipeline 和 ChannelHandler

image

ChannelPipeline 是一个拥有头(Head)和尾(Tail)的双向链式容器,可自由添加不同的handler处理器以满足不同的业务需求。同时因为有从外界读取数据和发送数据两种场景,所以有inbound和outbound两种情况

image

ChannelHandler 则是具体的处理器,可通过addLast方式添加到pipeline管道链路上,如粘包说的LineBasedFrameDecoder也是一种具体的handler处理器。demo中提到的添加自定义handler代码块如下所示

ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());

通过这种添加方式形成了下面的链路

HEAD                                             TAIL
LineBasedFrameDecoder -> StringDecoder ->  TimeServerHandler

这整个链路是比较清晰完整的,如果把StringDecoder和LineBasedFrameDecoder的处理器顺序换一下,则会发现出现错误,如下图

image

圈住的地方换行符就是我们代码中添加的 System.getProperty("line.separator") 换行导致,因为这个就1次调用,所以会发现只进行了字符串的转换,并没有进行拆包处理,再次把请求的数据量加大些,再测试看看

image

会发现服务端接收到的数据全部错误了,没有一个正确,切记不要把handler处理器顺序搞错,如下图是netty源码中关于顺序的说明情况。

image

Handler 生命周期

handler在处理的时候是有着一定的顺序,例如服务端先接收请求的注册,等到TCP/IP三次握手完成后,相当于channel激活完成,开始接受客户端正常的请求调用,然后返回响应结果等,客户端关闭后,服务端也需要进行取消激活,关闭注册的操作,以放弃该channel的管理操作。通过对其各个步骤的生命周期的管理,可以实现自定义的各种管理和控制。fireXXX又被包装成类似于channelRegistered的名字,如下图的调用过程

**该图来自 http://www.jiangxindc.com/view/2398**

如本demo的运行结果也可以很明显的看出其执行链路。

image

结束

到此netty的学习就结束了,并没有介绍的太深入,也只是把常用的组件知识梳理了一遍,以便于我们在使用netty的时候注意到这些问题,以发挥netty的最大功效,文中很多内容都参考自《Netty权威指南》,大家如果有兴趣的话可以自行阅读学习和加强理解,下一期将会进行RPC代码中的netty部门的改造。

如代码存在的问题欢迎提出~

上一篇下一篇

猜你喜欢

热点阅读