netty源码分析(五) - pipeline
2020-10-12 本文已影响0人
进击的蚂蚁zzzliu
概述
DefaultChannelPipeline.pngpipeline结构
- ChannelInboundInvoker:主要是触发Pipeline上的下一个Inbound通道处理器ChannelInboundHandler的相关方法
- ChannelOutboundInvoker:主要是触发Pipeline上的下一个Outbound通道处理器ChannelOutboundHandler的相关方法
- DefaultChannelPipeline:pipeline默认实现,主要有两方面的功能,一是:操作由AbstractChannelHandlerContext作为结点的双向链表;二是:实现Inbound/Outbound接口方法作为pipeline处理的入口方法;
- HeadContext:pipeline内部类,pipeline默认提供的头节点
- TailContext:pipeline内部类,pipeline默认提供的尾节点
Context.pngcontext/handler结构
- AbstractChannelHandlerContext:context基础类,由成员变量volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev;构成双向链表结构;
- ChannelHandler:context持有handler,具体业务逻辑委托给handler进行处理
- 默认HeadContext/TailContext都实现了handler接口(即也是一个handler),因此其内部不需要handler的成员变量;用户自定义的handler最终都会被包装成DefaultChannelHandlerContext,会持有private final ChannelHandler handler;
下面开始详细讲解pipeline,主要分为一下三部分:
- pipeline初始化
- 添加/删除ChannelHannel
- 事件的传播
1. pipeline初始化
之前在分析NioServerSocketChannel和NioSocketChannel创建时都看到过pipeline初始化的代码pipeline = newChannelPipeline();
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
//name命名:DefaultChannelPipeline$TailContext或HeadContext#0
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.executionMask = mask(handlerClass)
//true
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
- HeadContext:主要处理两部分,inbound相关(例如:read/write/flush等)会委托给unsafe进行操作;outbound会传递给下一个节点处理;需要注意的是channelActive/channelReadComplete会执行readIfIsAutoRead
- TailContext:只实现了ChannelInboundHandler,基本都是空实现,主要做一些收尾工作,例如ReferenceCountUtil.release(cause)释放内存
2. 添加/删除ChannelHandler
DefaultChannelPipeline # addLast
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//1. 重复性校验
checkMultiplicity(handler);
//2. 创建节点并添加至链表, filterName: name为空会重新生成(handler类名 + $x + #x), name不为空会检查name是否重复(从Head节点遍历比对name是否重复)
newCtx = newContext(group, filterName(name, handler), handler);
//3. 把新创建的newCtx添加到双向链表中
addLast0(newCtx);
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
//4. 回调添加完成事件
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
//4. 回调添加完成事件
callHandlerAdded0(newCtx);
return this;
}
- checkMultiplicity重复性校验
- 创建节点并添加至链表, filterName: name为空会重新生成(handler类名 + $x + #x), name不为空会检查name是否重复(从Head节点遍历比对name是否重复)
- 把新创建的newCtx添加到双向链表中
- 回调添加完成事件
DefaultChannelPipeline # checkMultiplicity
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
//如果没有isSharable注解,并且已经添加过,则报错
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
- 如果没有isSharable注解,并且已经添加过,则报错
AbstractChannelHandlerContext # callHandlerAdded 回调添加完成事件
final void callHandlerAdded() throws Exception {
//cas设置状态为addComplete
if (setAddComplete()) {
//执行用户handler(ChannelInitializer)的handlerAdded方法
handler().handlerAdded(this);
}
}
- cas设置状态为addComplete后执行用户handler(ChannelInitializer)的handlerAdded方法
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
if (initChannel(ctx)) {
removeState(ctx);
}
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) {
try {
//执行用户ChannelInitializer中重写的initChannel方法
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
//执行后remove保证只会在新连接(channel)初始化时执行一次
pipeline.remove(this);
}
}
return true;
}
return false;
}
- 执行用户ChannelInitializer中重写的initChannel方法(通常是添加child handler)
- 执行后remove掉ChannelInitializer保证只会在新连接(channel)初始化时执行一次
删除ChannelHandler逻辑类似,这里就不做分析
3. 事件的传播
ChannelInboundHandlerAdapter.pnghandler和adapter结构
- ChannelHandler:3个回调方法handlerAdded/handlerRemoved/exceptionCaught
- ChannelHandlerAdapter: ChannelHandler接口的默认实现
- ChannelInboundHandlerAdapter:提供ChannelInboundHandler的默认实现,调用context中对应方法,用户自定义inboundHandler一般继承改类
- ChannelOutboundHandlerAdapter:提供ChannelOutboundHandler的默认实现,调用context中对应方法,用户自定义outboundHandler一般继承改类
- SimpleChannelInboundHandler:主要重写了ChannelInboundHandlerAdapte#channelRead方法,提供自动释放byteBuf功能
pipeline (2).png事件和异常传播方向
- ChannelDuplexHandler:双向事件
- Inbound事件顺序为:Head > 1 > 2 > 5 > 6 > Tail,跟add顺序一致,且跳过Outbound事件
- Outbound事件顺序为:Tail > 6 > 5 > 4 > 3 > Head,跟add顺序相反,且跳过Inbound事件
- 异常事件顺序为:Head > 1 > 2 > 3 > 4 > 5 > 6 > Tail,跟add顺序一致,且经过所有事件
以如下EchoServer为例
read事件:
EchoServer:
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO), serverHandler);
}
});
@Sharable
public class EchoServerHandler extends ChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("----EchoServerHandler.channelReads-------");
ctx.pipeline().write(msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println("----EchoServerHandler.write-------");
ctx.write(msg, promise);
}
}
channelRead调用栈.pngchannelRead调用栈如下:
- read事件首先在NioEventLoop的run方法中被从selector轮询到;
- 交给processSelectedKeys处理;
- 在AbstractNioByteChannel中调用pipeline.read;
- 在pipeline中执行inbound事件处理流程:HeadContext > LoggingHandler > EchoServerHandler
- 注意:由于EchoServerHandler # channelRead中没有继续往下传播,所以调用栈中没有显示TailContext
write调用栈.pngwrite事件:
- EchoServerHandler # channelRead中调用执行ctx.pipeline().write(msg);作为write入口
- DefaultChannelPipeline中调用tail.write
- 在pipeline中执行outbound事件处理流程:TailContext > EchoServerHandler > LoggingHandler > HeadContext
- 最后在HeadContext中通过NioSocketChannelUnsafe # write出去
-
注意:ctx.fireChannelRead等事件是从当前结点开始传播;ctx.channel().pipeline().fireChannelRead()等事件是从Head或Tail节点开始传播
至此pipeline流程分析完毕