第十七节 netty源码分析之pipeline的来源
pipeline的来源
- 继续跟踪源码EchoClient中得Bootstrap类。
根据前面得分析总结如下过程:
Bootstrap中channel(NioSocketChannel.class)
-》NioSocketChannel的初始化时创建-》NioSocketChannel this(DEFAULT_SELECTOR_PROVIDER)
-》NioSocketChannel中this(newSocket(provider)) newSocket方法中的参数provider会根据操作系统选择不同的SelectorProvider(SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();)。 创建socketchannel方法provider.openSocketChannel(),SelectorProvider的用法可参考nio中的
-》继续调用构造器this(null, socket);其中的socket为newSocket方法创建的
-》接下来调用父类的构造器和初始化cofnig
查看NioSocketChannel的构造代码:
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
-》 转到父类的构造器中
/**
* Create a new instance
*
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
*/
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
--》 继续调用父类构造器会发现又一次掉用父类的构造器 super(parent);
--》 下面找到我们费尽心机的pipeline的创建如下:
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
--》 上面的newChannelPipeline方法其实调用DefaultChannelPipeline的构造器我们跟踪一下piple的创建
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
* 从上面的构造方法中可以推算Pipeline包含为一个双向链表,且会有一个头和尾,类图如下
总结:每创建要给channel中会创建一个pipeline. pipeline为一个双向链表,会有一个头和尾
头尾,从两者继承关系来看基本一致。区别在于实现接口不同,head实现ChannelOutboundHandler, ChannelInboundHandler而tail实现ChannelInboundHandler
图片.png
图片.png
再看下他们的父类 AbstractChannelHandlerContext 的构造器, 分别以参数 inbound , outbound .来区分head和tail
结合他们实现的接口,header 是一个 outboundHandler, 而 tail 是一个inboundHandler。
2 piple是如何将handler起到作用的呢?
1、 ChannelInitializer 的添加
初始化 Bootstrap, 会添加我们自定义的 ChannelHandler, 就以我们熟悉的 EchoClient 来举例
Bootstrap b = new Bootstrap();
b.group(group)
//初始化工厂ReflectiveChannelFactory为后续链接connect方法创建NioSocketChannel对象
.channel(NioSocketChannel.class)
//将选项添加到AbstractBootstrap属性options. 实现类中Bootstrap的init(Channel channel)方法设置channel的类型
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
//包含channel(注意channel可抽象为socket链接来理解)实例化Bootstrap.connect -> Bootstrap.doConnect -> AbstractBootstrap.initAndRegister(最终channelFactory.newChannel();这里的factory就是前面设置的ReflectiveChannelFactory)
// NioSocketChannel最终newSocket 来打开一个新的 Java NIO SocketChannel, 最后调用父类AbstractChannel(Channel parent)
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
在调用 handler 时, 传入了 ChannelInitializer 对象, 它提供了一个 initChannel 方法供我们初始化 ChannelHandler. 那么这个初始化过程是怎样的呢
源码追踪:首先 Bootstrap中调用handler方法 .handler(new ChannelInitializer<SocketChannel>(),将ChannelInitializer父类AbstractBootstrap中方法
/**
* the {@link ChannelHandler} to use for serving the requests.
*/
public B handler(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
return self();
}
初始化后的使用过程: 其实再ChannelFuture f = b.connect(HOST, PORT).sync();中的connect方法中,一路看下去会在父类AbstractBootstrap找到initAndRegister()初始化这楼里
是一个很重要的方法。暂时我们只关注 init(channel);这里是我们要关注的地方因为这里是channel和pipeline交互的关键,而且他的具体实现在Bootstrap中
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
// config使用bootstrap.handler(),就是最初ChannelInitializer,可参考上面分析
p.addLast(config.handler());
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
//设置channel类型
setChannelOptions(channel, options, logger);
}
最终我们要关注 p.addLast(config.handler());这里将我们出事的handler添加到了pipleline的末端(pipleline的结构借本介绍在前面已介绍)
为了添加一个 handler 到 pipeline 中, 必须把此 handler 包装成 ChannelHandlerContext. 因此在上面的代码中我们可以看到新实例化了一个 newCtx 对象, 并将 handler 作为参数传递到构造方法中
最后追踪到DefaultChannelPipeline中方法
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//检查是否重名
checkMultiplicity(handler);
//创建DefaultChannelHandlerContext对象
newCtx = newContext(group, filterName(name, handler), handler);
// 将生成的newCtx插入handlercontex链表中,addLast0方法会将newCtx插入tail之前
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
- 我们来关注下newContext方法是如何构造
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
//isInbound isOutbound 方法返回true false
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
ChannelInitializer的继承关系如下图:
图片.png下一节再具体分析pipeline和handler的具体实现细节: