(六)ChannelPipeline与ChannelHandle
本来是想直接看Channel相关的事件是如何传播的,但是发现在看如何传播之前,还是有必要梳理一下ChannelPipeline与ChannelHandlerContext的结构。方便后续理解。之前在梳理Netty是如何将自定义的ChannelHandler添加到ChannelPipeline中的时候有粗略讲到基本的结构。
基本概念
每个Channel都有自己的ChannelPipeline,用于管理ChannelHandlerContext。
而ChannelPipeline中有一个双向的ChannelHandlerContext链表,用于管理ChannelHandlerContext。
而ChannelHandlerContext保存了自身与其他ChannelHandlerContext的关联关系,以及保存了ChannelHandler。
ChannelHandlerContext说白了就是用来保存ChannelHandler的。
而ChannelPipeline说白了就是控制channelHandler的执行顺序的。(通过控制ChannelHandlerContext链的顺序)
ChannelInboundInvoker
感觉看了蛮久的源码,总算get到为什么里面的方法都是fire了。
某种意义上,InBound入站的操作都是都是被动的。
比如fireChannelRegistered这个方法,是Channel完成注册(注册到selector中,与对应的eventLoop绑定),随后要做的操作,可以通过fire(发射事件),去处理一些后续的操作。
所以可以理解成,都是一些操作完成之后,再去做的事情(有点像回调)
public interface ChannelInboundInvoker {
ChannelInboundInvoker fireChannelRegistered();
ChannelInboundInvoker fireChannelUnregistered();
ChannelInboundInvoker fireChannelActive();
ChannelInboundInvoker fireChannelInactive();
ChannelInboundInvoker fireExceptionCaught(Throwable cause);
ChannelInboundInvoker fireUserEventTriggered(Object event);
ChannelInboundInvoker fireChannelRead(Object msg);
ChannelInboundInvoker fireChannelReadComplete();
ChannelInboundInvoker fireChannelWritabilityChanged();
}
ChannelOutboundInvoker
OutBound出站的操作。都是主动发起的,比如连接服务端。
public interface ChannelOutboundInvoker {
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ChannelFuture disconnect();
ChannelFuture close();
ChannelFuture deregister();
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
ChannelFuture disconnect(ChannelPromise promise);
ChannelFuture close(ChannelPromise promise);
ChannelFuture deregister(ChannelPromise promise);
ChannelOutboundInvoker read();
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
ChannelPromise newPromise();
ChannelProgressivePromise newProgressivePromise();
ChannelFuture newSucceededFuture();
ChannelFuture newFailedFuture(Throwable cause);
ChannelPromise voidPromise();
}
ChannelPipeline
从这个接口来看,ChannelPipeline具备了以下功能
1.管理ChannelHandlerContext的链。
2.入站操作,去完成某些事件的后续操作。
3.出站操作,主动去做些什么。
4.覆写了接口,入站的方法,返回值都改为ChannelPipeline(ChannelPipeline为ChannelInboundInvoker的子类)
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline remove(ChannelHandler handler);
ChannelHandler remove(String name);
<T extends ChannelHandler> T remove(Class<T> handlerType);
ChannelHandler removeFirst();
ChannelHandler removeLast();
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
<T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
ChannelHandler newHandler);
ChannelHandler first();
ChannelHandlerContext firstContext();
ChannelHandler last();
ChannelHandlerContext lastContext();
ChannelHandler get(String name);
<T extends ChannelHandler> T get(Class<T> handlerType);
ChannelHandlerContext context(ChannelHandler handler);
ChannelHandlerContext context(String name);
ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);
Channel channel();
List<String> names();
Map<String, ChannelHandler> toMap();
@Override
ChannelPipeline fireChannelRegistered();
@Override
ChannelPipeline fireChannelUnregistered();
@Override
ChannelPipeline fireChannelActive();
@Override
ChannelPipeline fireChannelInactive();
@Override
ChannelPipeline fireExceptionCaught(Throwable cause);
@Override
ChannelPipeline fireUserEventTriggered(Object event);
@Override
ChannelPipeline fireChannelRead(Object msg);
@Override
ChannelPipeline fireChannelReadComplete();
@Override
ChannelPipeline fireChannelWritabilityChanged();
@Override
ChannelPipeline flush();
}
DefaultChannelPipeline的基本结构
因为在Channel实例化的时候,默认使用的是DefaultChannelPipeline,所以这里只需要关注这个类即可.
结构如下。再说DefaultChannelPipeline的具体实现的时候,先要看看ChannelHandlerContext的基本结构。
DefaultChannelPipeline
ChannelHandlerContext
ChannelHandlerContext也是ChannelInboundInvoker, ChannelOutboundInvoker的子类。
从接口的结构上看,具备以下功能
1.ChannelHanderContext,也跟channel绑定了
2.也有自己对应的EventLoop(就是专门处理它的线程)
3.此外具备了出站,入站的功能。就是可以被动去做些什么,也可以主动去做一些事情。
4.此外也可以返回这个ChannelHandlerContext所在的ChannelPipeline
5 覆写了入站的方法,返回类型改为ChannelHandlerContext(ChannelHandlerContext为ChannelInboundInvoker的子类)
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
Channel channel();
EventExecutor executor();
String name();
ChannelHandler handler();
boolean isRemoved();
@Override
ChannelHandlerContext fireChannelRegistered();
@Override
ChannelHandlerContext fireChannelUnregistered();
@Override
ChannelHandlerContext fireChannelActive();
@Override
ChannelHandlerContext fireChannelInactive();
@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);
@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);
@Override
ChannelHandlerContext fireChannelRead(Object msg);
@Override
ChannelHandlerContext fireChannelReadComplete();
@Override
ChannelHandlerContext fireChannelWritabilityChanged();
@Override
ChannelHandlerContext read();
@Override
ChannelHandlerContext flush();
ChannelPipeline pipeline();
ByteBufAllocator alloc();
/**
* @deprecated Use {@link Channel#attr(AttributeKey)}
*/
@Deprecated
@Override
<T> Attribute<T> attr(AttributeKey<T> key);
/**
* @deprecated Use {@link Channel#hasAttr(AttributeKey)}
*/
@Deprecated
@Override
<T> boolean hasAttr(AttributeKey<T> key);
}
AbstractChannelHandlerContext
AbstractChannelHandlerContext是ChannelHandlerContext的实现类,其实大多数方法都是这个类实现的。所以这个类很重要。
这个类主要做的事情有以下
1.通过成员变量,可以看得出,维系了前后ctx的关系
2.另外也有自己的状态,正在添加,添加完成,删除完成,初始化
3.根据这个ctx对应的ChannelHandler去设置对应的出,入站属性。
4.也有绑定对应的EventLoop
5.其中也实现了大多数的操作,如以下的例子,fireChannelRegistered,其实说白了就是通过入参的ctx绑定的eventLoop去执行invokeChannelRegistered()方法。
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
维护了前后的关系,因为ChannelHandlerContext是处于一个链表中
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
ctx对应的几个状态,正在添加,添加完成,删除完成,初始化
private static final int ADD_PENDING = 1;
private static final int ADD_COMPLETE = 2;
private static final int REMOVE_COMPLETE = 3;
private static final int INIT = 0;
是出站,还是入站,这个得看具体ChannelHandler的是入站,还是出站来决定
private final boolean inbound;
private final boolean outbound;
对应的ChannelPipeline
private final DefaultChannelPipeline pipeline;
负责处理方法的执行器,用于执行方法。
final EventExecutor executor;
一开始的状态默认为初始化
private volatile int handlerState = INIT;
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
DefaultChannelHandlerContext的基本实现以及结构。
其实从代码来看,也就是多加了俩个方法,用于判断要给Inbound,outbound去设置值罢了。
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
@Override
public ChannelHandler handler() {
return handler;
}
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
}
结构图如下
DefaultChannelHandlerContext
DefaultChannelPipeline
前面说完了DefaultChannelHandlerContext,最后来看看DefaultChannelPipeline的基本结构。
public class DefaultChannelPipeline implements ChannelPipeline {
ctx链的头和尾
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
DefaultChannelPipeline所对应的channel
private final Channel channel;
构造方法
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
TailContext 是inbound=true的ChannelHandlerContext
其实很多方法都没有实现。
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
tail是inbound为true的ctx
super(pipeline, null, TAIL_NAME, true, false);
添加后,顺便将状态改为添加完成
setAddComplete();
}
@Override
public ChannelHandler handler() {
本身TailContext就是ChannelHandler的子类
return this;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ReferenceCountUtil.release(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
onUnhandledInboundException(cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
}
HeadContext 是outbound=true的ChannelHandlerContext。
但是有点特殊,实现了ChannelOutboundHandler以及ChannelInboundHandler 的接口。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
说明HeadContext是outbound=true的ctx
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
@Override
public ChannelHandler handler() {
return this;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
if (!channel.isOpen()) {
destroy();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
}
}
ChannelHandler
从ChannelHandler的接口结构看。
ChannlHandler可以给自己设置ctx
public interface ChannelHandler {
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value
}
}