fescar源码分析-AbstractRpcRemotingCl
2019-02-12 本文已影响43人
do_young
在fescar源码分析-AbstractRpcRemotingClient中主要从代码结构上介绍了AbstractRpcRemotingClient,但从功能上来说AbstractRpcRemotingClient类主要实现了RPC客户端的实例创建。
- 1.通过构造函数初始化RPC客户端的相关配置。
- 2.通过init初始化创建RPC客户端的必要属性。
- 3.start配置RPC客户端并启动服务。
- 4.shutdown关闭RPC客户端服务。
AbstractRpcRemotingClient
是使用Netty的Bootstrap
类引导创建一个RPC客户端,利用无连接协议和在调用 bind() 或 connect() 之后。下图展示了如何工作
- 当 bind() 调用时,Bootstrap 将创建一个新的管道, 当 connect() 调用在 Channel 来建立连接
- Bootstrap 将创建一个新的管道, 当 connect() 调用时
- 新的 Channel
要使用Netty创建PRC的客户端,需要实现以下逻辑:
Bootstrap bootstrap = new Bootstrap(); //1
bootstrap.group(new NioEventLoopGroup()) //2
.channel(NioSocketChannel.class) //3
.option(ChannelOption.TCP_NODELAY, true)//4
.handler(new SimpleChannelInboundHandler<ByteBuf>() { //5
@Override
protected void channeRead0(
ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf) throws Exception {
System.out.println("Received data");
byteBuf.clear();
}
});
ChannelFuture future = bootstrap.connect(
new InetSocketAddress("www.manning.com", 80)); //6
- 1.创建一个新的 Bootstrap 来创建和连接到新的客户端管道
- 2.指定 EventLoopGroup
- 3.指定 Channel 实现来使用
- 4.设置 客户端属性
- 5.设置Handler初始化Channel 的Pipline,设置事件中对数据的处理逻辑。
- 6.连接到远端主机
下面看一下AbstractRpcRemotingClient
的具体实现:
创建Bootstrap
在AbstractRpcRemotingClient
类中定义了一个Bootstrap
成员属性,并在子类创建实例对象的时候及创建实例。
private final Bootstrap bootstrap = new Bootstrap();
指定 EventLoopGroup
在构造函数中,根据配置文件,初始化EventLoopGroup。
其中wrokerGroup
的Selector
线程数大小及名称可以通过配置文数的参数配置。
this.eventLoopGroupWorker = new NioEventLoopGroup(
selectorThreadSizeThreadSize,
new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
selectorThreadSizeThreadSize));
在start()方法中将EventLoopGroup实例设置在Bootstrap
属性中
this.bootstrap.group(this.eventLoopGroupWorker)
指定Channel
在start()方法中将根据配置文件及默认参数设置客户端的通道类。
.channel(nettyClientConfig.getClientChannelClazz())//
设置客户端属性
.option(ChannelOption.TCP_NODELAY, true)//
.option(ChannelOption.SO_KEEPALIVE, true)//
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
......
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(EpollChannelOption.TCP_QUICKACK, true);
设置Handler
首先根据配置参数判断是否使用通道池,如果使用则创建一个通道池,并初始化对应的Hadnler
if (nettyClientConfig.isUseConnPool()) {
clientChannelPool = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {//1
@Override
protected FixedChannelPool newPool(InetSocketAddress key) {//2
FixedChannelPool fixedClientChannelPool = new FixedChannelPool(bootstrap.remoteAddress(key),//3
new DefaultChannelPoolHandler() {
@Override
public void channelCreated(Channel ch) throws Exception {//4
super.channelCreated(ch);
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(defaultEventExecutorGroup,//5
new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
nettyClientConfig.getChannelMaxAllIdleSeconds()));
pipeline.addLast(defaultEventExecutorGroup, new RpcClientHandler());
}
}, ChannelHealthChecker.ACTIVE, AcquireTimeoutAction.FAIL,//6
nettyClientConfig.getMaxAcquireConnMills(), nettyClientConfig.getPerHostMaxConn(),
nettyClientConfig.getPendingConnSize(), false);
return fixedClientChannelPool;
}
};
}
- 1.继承AbstractChannelPoolMap,创建一个匿名类。
- 2.重载newPool方法,创建一个固定大小的通道池。
- 3.根据端口,即通道池处理器(ChannelPoolHandler)实例创建通道池。
- 4.创建通道池处理器实例,重载channelCreated方法,实现创建通道实例的逻辑。
- 5.在新创建的通道实例中设置Handler(重点是RpcClientHandler),并用初始化的线程池来执行pipline中的业务逻辑。
-
6.设置其它参数
大致情况如下图所示:
image.png
否则按下面逻辑初始化Handler,
else {
bootstrap.handler(new ChannelInitializer<SocketChannel>() {//1
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();//2
pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
nettyClientConfig.getChannelMaxAllIdleSeconds()))
.addLast(new MessageCodecHandler());//添加消息加解码器
if (null != channelHandlers) {
addChannelPipelineLast(ch, channelHandlers);
}
}
});
}
- 1.创建ChannelInitializer实例
- 2.在pipline中添加Handler,主要是MessageCodecHandler
- 添加子类配置的其它Handler