Netty 心跳检查的使用和原理
2023-02-14 本文已影响0人
Always_July
心跳检查的必要性
网络应用程序普遍会碰到一个问题:连接假死。
假死的现象:网络断开连接后,应用进程没有捕获到。从TCP层面来说,只有收到四次分手数据包或者一个RST数据包,才表示连接的状态已断开。
假死造成了每个连接都会耗费CPU和内存资源,造成资源的浪费。
如何解决
客户端每隔一段时间发送心跳包给服务端,服务端收到心跳包回复给客户端。
如何使用
// 如下代码是30秒未发送过数据,则发送心跳包。60秒未收到数据,则断开连接。
// An example that sends a ping message when there is no outbound traffic
// for 30 seconds. The connection is closed when there is no inbound traffic
// for 60 seconds.
public class MyChannelInitializer extends ChannelInitializer<Channel> {
@Override
public void initChannel(Channel channel) {
channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 30, 0));
channel.pipeline().addLast("myHandler", new MyHandler());
}
}
// Handler should handle the IdleStateEvent triggered by IdleStateHandler.
public class MyHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(new PingMessage());
}
}
}
}
ServerBootstrap bootstrap = ...;
...
bootstrap.childHandler(new MyChannelInitializer());
...
注意:判断心跳的Handler一定要加入的pipline的最前面,加入插入到后面,如果这个这个连接读到了数据,但是在inbound的传播过程中出错了或者数据处理完毕就不往后传了,那么最终IdleStateHandler就不会读到数据,会导致误判。
源码解析
netty版本 4.1.32.Final
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.32.Final</version>
</dependency>
构造方法
public IdleStateHandler(
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds) {
this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
TimeUnit.SECONDS);
}
- readerIdleTimeSeconds - an IdleStateEvent whose state is IdleState.READER_IDLE will be triggered when no read was performed for the specified period of time. Specify 0 to disable. 多少秒没有read将触发IdleState.READER_IDLE
- writerIdleTimeSeconds - an IdleStateEvent whose state is IdleState.WRITER_IDLE will be triggered when no write was performed for the specified period of time. Specify 0 to disable. 多少秒没有write 将触发IdleState.WRITER_IDLE
- allIdleTimeSeconds - an IdleStateEvent whose state is IdleState.ALL_IDLE will be triggered when neither read nor write was performed for the specified period of time. Specify 0 to disable. 多少秒没read和write 将触发 IdleState.ALL_IDLE
io.netty.handler.timeout.IdleStateHandler#initialize 初始化方法
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
return ctx.executor().schedule(task, delay, unit);
}
我们直接看ReaderIdleTimeoutTask,这个Runnable 被定时执行。
ReaderIdleTimeoutTask
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
// 1
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
// 1 处代码解析
- 如果正在读取,nextDealy 等于构造函数的readerIdleTimeSeconds转为的纳秒值
- 如果没有正在读取数据,那么当前的时间减去上次读取数据的时间,然后再使用构造函数的readerIdleTimeSeconds减去。
- 例如我们构造函数readerIdleTimeSeconds是30秒,如果现在正在读取,那么触发下一次检查为30秒后。如果现在没有读取数据并且距离上次读取已经超过30秒,那么触发IdleState.READER_IDLE,如果小于30秒,那么用30秒减去得到下一次检查的时刻。
WriterIdleTimeoutTask,AllIdleTimeoutTask和ReaderIdleTimeoutTask逻辑类似,就不贴代码了。