netty

Netty空闲检测之读空闲

2020-11-22  本文已影响0人  书唐瑞

客户端与服务端通信的时候,服务端如何感知到客户端下线.客户端可以每4秒向服务端发送一个数据,服务端每5秒进行空闲检测.如果服务端没有读取到数据,则认为客户端已下线.(实际业务中并不会这么处理,我们这里只是为了描述场景)

在Netty中为我们提供了一个拿来即用的空闲检测处理器

io.netty.handler.timeout.IdleStateHandler

它同时是一个入站和出站处理器,有channelRead()和write()方法.
本篇文章我们讲解这个类是如何进行读空闲检测

image.png

在读取数据的时候,会涉及到channelRead和channelReadComplete两个方法,它在IdleStateHandler类中的重写如下

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
            reading = true;// 这个属性很重要
            firstReaderIdleEvent = firstAllIdleEvent = true;
        }
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
            lastReadTime = ticksInNanos();// 设置最后读取的时间
            reading = false;// 这个属性很重要
        }
        ctx.fireChannelReadComplete();
    }

在读取网络数据的时候,会先调用channelRead方法,等缓冲区中的数据读取完成之后,暂时没有数据可读了,会再调用channelReadComplete方法.在调用channelRead方法的时候,会设置reading = true,表示正在读取中,读取完成之后,channelReadComplete方法会将reading = false,表示不是在读取中(即读取完成).

我们在使用IdleStateHandler类的时候,会向构造器中传入读/写/读写超时时间.

public IdleStateHandler(
        int readerIdleTimeSeconds,
        int writerIdleTimeSeconds,
        int allIdleTimeSeconds) {

    this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
         TimeUnit.SECONDS);
}

readerIdleTimeSeconds表示我们设置的读空闲超时时间,只要超时时间超过这个值,那么就会有对应的事件产生,把这个事件向下传播.

在初始化的时候,会将读/写/读写超时任务加入到定时任务队列中

private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        switch (state) {
        case 1:
        case 2:
            return;
        }

        state = 1;
        initOutputChanged(ctx);

        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
// 将读超时任务放入队列中
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }

它的具体实现,我们通过图形的方式讲解

image.png

在channelRead和channelReadComplete之间如果触发了读空闲定时任务,那么定时任务会重新设置延时时间=5s,继续放入定时任务队列中,等待下次执行检测.​

image.png

然后又过了readerIdleTimeSeconds=5时间,这个时候,channelReadComplete已经完成了.但是最后一次读的时间距离当前时间(红色点)并没有超过设置的readerIdleTimeSeconds值,因此依然需要继续放入定时任务队列中.但是放入队列的延时时机已经不再是之前设置的readerIdleTimeSeconds值了,而是图中绿色那个长度的时间值.

image.png

又过了绿色值的时间段,触发了读空闲定时任务,如果读取到了数据,那么重新设置延时时间=5s,继续放入定时任务队列中,等待下次执行检测. 可是如果没有读取到数据,那么这个时候,就会触发读空闲事件,向下传播.

image.png

根据上面的分析,我们可以这么理解,在channelRead到channlReadComplete这期间触发的定时任务其实是没用的,也就是在reading=true的范围内,定时任务的延时时间都是readerIdleTimeSeconds值,这个范围是安全的,因为一直在读取数据.
即便是读取完成,只要触发时间在readerIdleTimeSeconds范围内也是安全的.
我们可以认为,其实定时任务是这么执行的

image.png

在channelReadComplete时提交的读空闲定时任务,然后每隔指定的readerIdleTimeSeconds时间执行一次定时任务,检测是否读取到了数据.

当然实际上定时任务的执行还是我们一开始分析的那样,可是我们可以等价于它是这样执行的.​

// 源码位置: io.netty.handler.timeout.IdleStateHandler.ReaderIdleTimeoutTask

private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

        ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {

            System.out.println("ReaderIdleTimeoutTask线程名称: " + Thread.currentThread().getName());
            long nextDelay = readerIdleTimeNanos;
            if (!reading) {
                nextDelay -= ticksInNanos() - lastReadTime;
            }

            if (nextDelay <= 0) {
                readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstReaderIdleEvent;
                firstReaderIdleEvent = false;

                try {
                    // 读取超时,会向下传递一个读空闲事件
                    IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // 安全范围
                readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

公众号 Netty历险记

上一篇下一篇

猜你喜欢

热点阅读