fescar源码分析-AbstractRpcRemotingCl

2019-02-12  本文已影响43人  do_young

fescar源码分析-AbstractRpcRemotingClient中主要从代码结构上介绍了AbstractRpcRemotingClient,但从功能上来说AbstractRpcRemotingClient类主要实现了RPC客户端的实例创建。

AbstractRpcRemotingClient是使用Netty的Bootstrap类引导创建一个RPC客户端,利用无连接协议和在调用 bind() 或 connect() 之后。下图展示了如何工作

Figure%209
  1. 当 bind() 调用时,Bootstrap 将创建一个新的管道, 当 connect() 调用在 Channel 来建立连接
  2. Bootstrap 将创建一个新的管道, 当 connect() 调用时
  3. 新的 Channel

要使用Netty创建PRC的客户端,需要实现以下逻辑:

Bootstrap bootstrap = new Bootstrap(); //1
bootstrap.group(new NioEventLoopGroup()) //2
    .channel(NioSocketChannel.class) //3
    .option(ChannelOption.TCP_NODELAY, true)//4
    .handler(new SimpleChannelInboundHandler<ByteBuf>() { //5
        @Override
        protected void channeRead0(
            ChannelHandlerContext channelHandlerContext,
            ByteBuf byteBuf) throws Exception {
                System.out.println("Received data");
                byteBuf.clear();
            }
        });
ChannelFuture future = bootstrap.connect(
    new InetSocketAddress("www.manning.com", 80)); //6

下面看一下AbstractRpcRemotingClient的具体实现:

创建Bootstrap

AbstractRpcRemotingClient类中定义了一个Bootstrap成员属性,并在子类创建实例对象的时候及创建实例。

private final Bootstrap bootstrap = new Bootstrap();

指定 EventLoopGroup

在构造函数中,根据配置文件,初始化EventLoopGroup。
其中wrokerGroupSelector线程数大小及名称可以通过配置文数的参数配置。

this.eventLoopGroupWorker = new NioEventLoopGroup(
    selectorThreadSizeThreadSize, 
    new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), 
        selectorThreadSizeThreadSize));

在start()方法中将EventLoopGroup实例设置在Bootstrap属性中

this.bootstrap.group(this.eventLoopGroupWorker)

指定Channel

在start()方法中将根据配置文件及默认参数设置客户端的通道类。

.channel(nettyClientConfig.getClientChannelClazz())//

设置客户端属性

.option(ChannelOption.TCP_NODELAY, true)//
.option(ChannelOption.SO_KEEPALIVE, true)//
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());

......
  bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
         .option(EpollChannelOption.TCP_QUICKACK, true);

设置Handler

首先根据配置参数判断是否使用通道池,如果使用则创建一个通道池,并初始化对应的Hadnler

        if (nettyClientConfig.isUseConnPool()) {
            clientChannelPool = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {//1
                @Override
                protected FixedChannelPool newPool(InetSocketAddress key) {//2
                    FixedChannelPool fixedClientChannelPool = new FixedChannelPool(bootstrap.remoteAddress(key),//3
                            new DefaultChannelPoolHandler() {
                                @Override
                                public void channelCreated(Channel ch) throws Exception {//4
                                    super.channelCreated(ch);
                                    final ChannelPipeline pipeline = ch.pipeline();
                                    pipeline.addLast(defaultEventExecutorGroup,//5
                                            new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
                                                    nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                                                    nettyClientConfig.getChannelMaxAllIdleSeconds()));
                                    pipeline.addLast(defaultEventExecutorGroup, new RpcClientHandler());
                                }
                            }, ChannelHealthChecker.ACTIVE, AcquireTimeoutAction.FAIL,//6
                            nettyClientConfig.getMaxAcquireConnMills(), nettyClientConfig.getPerHostMaxConn(),
                            nettyClientConfig.getPendingConnSize(), false);
                    return fixedClientChannelPool;

                }
            };
        }

否则按下面逻辑初始化Handler,

 else {
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {//1
                @Override
                public void initChannel(SocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();//2
                    pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
                            nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                            nettyClientConfig.getChannelMaxAllIdleSeconds()))
                    .addLast(new MessageCodecHandler());//添加消息加解码器
                    if (null != channelHandlers) {
                        addChannelPipelineLast(ch, channelHandlers);
                    }
                }
            });
        }
上一篇下一篇

猜你喜欢

热点阅读