Netty 心跳检查的使用和原理

2023-02-14  本文已影响0人  Always_July

心跳检查的必要性

网络应用程序普遍会碰到一个问题:连接假死。
假死的现象:网络断开连接后,应用进程没有捕获到。从TCP层面来说,只有收到四次分手数据包或者一个RST数据包,才表示连接的状态已断开。

假死造成了每个连接都会耗费CPU和内存资源,造成资源的浪费。

如何解决

客户端每隔一段时间发送心跳包给服务端,服务端收到心跳包回复给客户端。

如何使用

// 如下代码是30秒未发送过数据,则发送心跳包。60秒未收到数据,则断开连接。

 // An example that sends a ping message when there is no outbound traffic
 // for 30 seconds.  The connection is closed when there is no inbound traffic
 // for 60 seconds.

 public class MyChannelInitializer extends ChannelInitializer<Channel> {
      @Override
     public void initChannel(Channel channel) {
         channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 30, 0));
         channel.pipeline().addLast("myHandler", new MyHandler());
     }
 }

 // Handler should handle the IdleStateEvent triggered by IdleStateHandler.
 public class MyHandler extends ChannelDuplexHandler {
      @Override
     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
         if (evt instanceof IdleStateEvent) {
             IdleStateEvent e = (IdleStateEvent) evt;
             if (e.state() == IdleState.READER_IDLE) {
                 ctx.close();
             } else if (e.state() == IdleState.WRITER_IDLE) {
                 ctx.writeAndFlush(new PingMessage());
             }
         }
     }
 }

 ServerBootstrap bootstrap = ...;
 ...
 bootstrap.childHandler(new MyChannelInitializer());
 ...

注意:判断心跳的Handler一定要加入的pipline的最前面,加入插入到后面,如果这个这个连接读到了数据,但是在inbound的传播过程中出错了或者数据处理完毕就不往后传了,那么最终IdleStateHandler就不会读到数据,会导致误判。

源码解析

netty版本 4.1.32.Final

     <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.32.Final</version>
        </dependency>

构造方法



  public IdleStateHandler(
            int readerIdleTimeSeconds,
            int writerIdleTimeSeconds,
            int allIdleTimeSeconds) {

        this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
             TimeUnit.SECONDS);
    }

io.netty.handler.timeout.IdleStateHandler#initialize 初始化方法

  private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        switch (state) {
        case 1:
        case 2:
            return;
        }

        state = 1;
        initOutputChanged(ctx);

        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }
    ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
        return ctx.executor().schedule(task, delay, unit);
    }

我们直接看ReaderIdleTimeoutTask,这个Runnable 被定时执行。

ReaderIdleTimeoutTask

 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

        ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {
            long nextDelay = readerIdleTimeNanos;
            // 1
            if (!reading) {
                nextDelay -= ticksInNanos() - lastReadTime;
            }

            if (nextDelay <= 0) {
                // Reader is idle - set a new timeout and notify the callback.
                readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstReaderIdleEvent;
                firstReaderIdleEvent = false;

                try {
                    IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Read occurred before the timeout - set a new timeout with shorter delay.
                readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

// 1 处代码解析

WriterIdleTimeoutTask,AllIdleTimeoutTask和ReaderIdleTimeoutTask逻辑类似,就不贴代码了。

参考资料

IdleStateHandler (Netty API Reference (4.1))

上一篇 下一篇

猜你喜欢

热点阅读