第十五节 netty源码分析之客户端源码分析02
handler调用
我们已经知道ChannelInitializer的添加过程其实,是会创建一个DefaultChannelHandlerContext然后添加到ChannelHandlerContext双线链表tail的前面
而我们自定义的handler就在ChannelInitializer的initChannel方法中,那么initChannel何时调用将我们的handler添加到链表中( p.addLast),下面我们开始分析
自定义 ChannelHandler 的添加过程, 发生在 AbstractUnsafe.register0 中, 在这个方法中调用了 pipeline.fireChannelRegistered()
- 首先继续分析之前AbstractBootstrap的initAndRegister() 方法中的方法 init(channel); init的实现再子类bootstrip中
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
// config使用bootstrap.handler(),就是最初ChannelInitializer,可参考上面分析
p.addLast(config.handler());
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
//设置channel类型
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
继续查看addLast方法
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//检查是否重复添加
checkMultiplicity(handler);
//创建DefaultChannelHandlerContext对象包装handler,注意构造参数中
newCtx = newContext(group, filterName(name, handler), handler);
//将生成的newCtx插入handlercontex链表中
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
执行完后pipeline中已包含handler ChannelInitializer
接着根据initAndRegister方法中的ChannelFuture regFuture = config().group().register(channel);//注册
根据上一节的分析:register—>SingleThreadEventLoop#register->AbstractChannel #register->AbstractChannel #register0->AbstractChannel #register0#pipeline.fireChannelRegistered();
fireChannelRegistered方法源码
@Override
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
附上代码
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 由上面的方法此处调用head.invokeChannelRegistered() 方法
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
查看invokeChannelRegistered方法
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
上面的方法handler()其实是fireChannelRegistered方法入参head,在head.fireChannelRegistered 其实是调用的 AbstractChannelHandlerContext.fireChannelRegistered
在head中channelRegistered方法中
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
channelRegistered
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
}
findContextInbound方法:
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
从 head 开始遍历 Pipeline 的双向链表, 然后找到第一个属性 inbound 为 true 的 ChannelHandlerContext 实例. 接着掉ChannelInitializer 的channelRegistered方法
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
// the handler.
if (initChannel(ctx)) {
// we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
// miss an event.
ctx.pipeline().fireChannelRegistered();
// We are done with init the Channel, removing all the state for the Channel now.
removeState(ctx);
} else {
// Called initChannel(...) before which is the expected behavior, so just forward the event.
ctx.fireChannelRegistered();
}
}
和之前分析一致,initChannel 它就是我们在初始化 Bootstrap 时, 调用 handler 方法传入的匿名内部类所实现的方法:
xxx.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});
因此当调用了这个方法后, 我们自定义的 ChannelHandler 就插入到 Pipeline 了, 接着removeState(ctx);移除ChannelInitializer这个handler。自定义handler就添加到了pipeline中