netty

netty事件和异常的传播

2022-11-28  本文已影响0人  virtual灬zzZ

关于ChannelRead事件的传播

在自定义handler的时候,通常要重写channelRead函数,如果想要将该事件向后传播(注意,传播顺序与handler添加顺序相同),需要调用fireChannelRead函数,ChannelRead事件便在这里中断

通常在重写的channelRead函数里,有两种传播ChannelRead事件的方式

public void channelRead(ChannelHandlerContext ctx, Object msg) {   
 //第一种   
 ctx.fireChannelRead(msg);    
//第二种   
 ctx.channel().pipeline().fireChannelRead(msg);
}

这两种方式的主要区别在于接下来传播的起始位置,非常重要

  • 使用第一种方式,事件会从该节点开始继续向后传播
  • 使用第二种方式,事件会从head节点开始传播

下面分析源码来做说明

第一种传播方式

跟进到AbstractChannelHandlerContext#fireChannelRead方法

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

findContextInbound方法就是在寻找下一个节点,看看这个方法的代码

private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    do {
        ctx = ctx.next;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
    return ctx;
}

继续看看skipContext方法

private static boolean skipContext(
    AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
    return (ctx.executionMask & (onlyMask | mask)) == 0 ||
        (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}

可以看到,这里判断的关键就在executionMask这个成员变量,而这个成员变量就在AbstractChannelHandlerContext里被赋值

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline,
                              EventExecutor executor,
                              String name, 
                              Class<? extends ChannelHandler> handlerClass) {
    //省略其他代码
    this.executionMask = mask(handlerClass);
}

mask方法最终调用到mask0方法

private static int mask0(Class<? extends ChannelHandler> handlerType) {
    int mask = MASK_EXCEPTION_CAUGHT;
    try {
        if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_INBOUND;
 
            if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_REGISTERED;
            }
            if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
                mask &= ~MASK_CHANNEL_READ;
            }
            //省略部分代码
        }
 
        if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_OUTBOUND;
 
            if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                            SocketAddress.class, ChannelPromise.class)) {
                mask &= ~MASK_BIND;
            }
            //省略部分代码
        }
 
        //省略部分代码
    } catch (Exception e) {
        //省略部分代码
    }
 
    return mask;
}

isSkippable函数只有在找不到函数或者函数被@Skip注解时才返回false

可以看到,实际上executionMask就是用来记录handler的类型信息和方法注解信息

skipContext方法实际上就是在寻找下一个没有用@Skip注解了ChannelRead方法的inbound节点

private static boolean isSkippable(
            final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
        return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
            @Override
            public Boolean run() throws Exception {
                Method m;
                try {
                    m = handlerType.getMethod(methodName, paramTypes);
                } catch (NoSuchMethodException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(
                            "Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
                    }
                    return false;
                }
                return m != null && m.isAnnotationPresent(Skip.class);
            }
        });
    }

继续看fireChannelRead函数里调用到的invokeChannelRead函数

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        //不在当前eventloop,放到异步任务队列里
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
 
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            //省略
        }
    } else {
        fireChannelRead(msg);
    }
}

可以看到,这里就是在调用下一个节点的channelRead方法

第二种传播方式

跟进到DefaultChannelPipeline#fireChannelRead方法

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

继续跟进invokeChannelRead方法

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

可以看到,这里就是传入head节点,从head节点开始向后传播channelRead事件

资源释放相关问题

若自定义的handler继承自ChannelInboundHandlerAdapter,并且在ChannelRead函数里没有将事件向后传播,那么需要自行调用函数处理资源释放,如下

ReferenceCountUtil.release(msg);

Write事件传播

write事件的传播顺序与handler的添加顺序相反(即最后添加的outboundHandler最先处理write事件)

类似的,在用户代码里传播write事件也有两种方式

第一种方式,从当前节点,往前寻找outbound,继续传播
ctx.write(msg);

 第二种方式,从tail节点开始,往前寻找outbound传播
ctx.channel().write(msg);
如果没有中断,最终write事件会传播到head节点,然后head节点会调用unsafe的write方法

异常的传播

异常的产生

首先,异常是在ChannelReadChannelRegister等这些函数中抛出的,然后在形如invokeChannelXXX(例如invokeChannelRead)中捕获,例如

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            //捕获异常
            invokeExceptionCaught(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

看看invokeExceptionCaught方法

private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
        try {
            handler().exceptionCaught(this, cause);
        } catch (Throwable error) {
            //省略
        }
    } else {
        fireExceptionCaught(cause);
    }
}

可以看到,这里调用exceptionCaught方法处理异常

传播异常

异常的传播方向与handler的添加方向一致,并且不区分是inboundHandler还是outboundHandler(即异常可以从inboundHandler传播到outboundHandler,反之亦可)

默认情况下,如果不重写exceptionCaught方法,那么会把该异常继续向后传播,最终会传播到tail节点,tail节点会打印一条日志表明该异常未被处理

如果重写了exceptionCaught方法,并且想将该异常继续向后传播,那么需要调用fireExceptionCaught方法

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
    throws Exception {
    //其他处理代码
    ctx.fireExceptionCaught(cause);
}

关联:Netty4中Handler的执行顺序以及ctx.close() 与 ctx.channel().close()的区别

上一篇下一篇

猜你喜欢

热点阅读