Netty源码笔记(四)TCP长连接和IdleStateHand
日常工作中TCP长连接在一些优秀的中间件或开源项目中得到大量的使用。比如zookeeper的订阅和监听、日常使用的各种数据库连接池、redis连接池、常用的RPC框架dubbo/美团pigeon、美团的监控系统Cat等等。
使用TCP长连接的优势在于:
1、有效避免频繁的三次握手、四次挥手开销;
2、避免TCP滑动窗口冷启动的低效问题
能极大的提升网络通信的效率。
缺点也比较明显,当客户端因为断电、网线被拔除等原因突然断开时,服务端没办法及时知道客户端已经断线,不能及时回收socket占用的系统资源。
首先,我们来看看netty关于长连接的使用:
//服务端
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
//客户端
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
我们跟踪一下服务端childOption的设置代码:
//ServerBootstrapAcceptor中channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
。。。
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
。。。
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
//DefaultSocketChannelConfig的setOption方法
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
。。。
if (option == SO_KEEPALIVE) {
setKeepAlive((Boolean) value);
} else if (option == SO_REUSEADDR) {
setReuseAddress((Boolean) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public SocketChannelConfig setKeepAlive(boolean keepAlive) {
try {
javaSocket.setKeepAlive(keepAlive);
} catch (SocketException e) {
throw new ChannelException(e);
}
return this;
}
最终我们进入的java.net.Socket.setKeepAlive
public void setKeepAlive(boolean on) throws SocketException {
if (isClosed())
throw new SocketException("Socket is closed");
getImpl().setOption(SocketOptions.SO_KEEPALIVE, Boolean.valueOf(on));
}
我们看看jdk关于SocketOptions.SO_KEEPALIVE的注释:
/**
* When the keepalive option is set for a TCP socket and no data
* has been exchanged across the socket in either direction for
* 2 hours (NOTE: the actual value is implementation dependent),
* TCP automatically sends a keepalive probe to the peer. This probe is a
* TCP segment to which the peer must respond.
* One of three responses is expected:
* 1. The peer responds with the expected ACK. The application is not
* notified (since everything is OK). TCP will send another probe
* following another 2 hours of inactivity.
* 2. The peer responds with an RST, which tells the local TCP that
* the peer host has crashed and rebooted. The socket is closed.
* 3. There is no response from the peer. The socket is closed.
*
* The purpose of this option is to detect if the peer host crashes.
*
* Valid only for TCP socket: SocketImpl
*
* @see Socket#setKeepAlive
* @see Socket#getKeepAlive
*/
@Native public final static int SO_KEEPALIVE = 0x0008;
注释大意:当2个小时没有发生数据交换时,TCP会发送一个探针给对方,如果收到的是ACK标记的应答,则连接保持,否则关闭连接。
为了解决TCP长连接的缺点,基本上所有使用了TCP长连接的优秀开源项目都会自定义一套心跳保持和心跳消息发送机制,保证在客户端异常断开时服务端能及时的收到通知。
而netty通过IdleStateHandler处理器,能极为方便的通过参数配置来实现心跳处理策略。
代码示例:
//超时时间配置
new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, unit);
//超时事件处理
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
log.info("客户端心跳消息发送:" + System.currentTimeMillis()/1000);
HeartBeatMsg msg = new HeartBeatMsg();
ctx.writeAndFlush(heartBeatMsg);
} else {
super.userEventTriggered(ctx, evt);
}
}
readerIdleTime:读空闲超时时间
writerIdleTime:写空闲超时时间
allIdleTime:读和写都空闲的超时时间
接下来我们就来看看IdleStateHandler的实现逻辑。核心代码如下:
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
//初始时间点
lastReadTime = lastWriteTime = ticksInNanos();
//根据配置启动相应的定时任务
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
以ReaderIdleTimeoutTask为例我们来分析一下内部逻辑:
@Override
public void run() {
if (!ctx.channel().isOpen()) {
return;
}
long nextDelay = readerIdleTimeNanos;
//channelReadComplete 未读或读处理已完成
if (!reading) {
//是否超时
nextDelay -= System.nanoTime() - lastReadTime;
}
//读空闲超时
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout =
ctx.executor().schedule(this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, firstReaderIdleEvent);
if (firstReaderIdleEvent) {
firstReaderIdleEvent = false;
}
//超时事件触发
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
}
}
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
接下来我们看看lastReadTime:上一次读结束的时间点记录
我们从服务端响应客户端的IO事件开始分析,核心代码如下:
//AbstractNioByteChannel.NioByteUnsafe.read方法
@Override
public final void read() {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
//触发channelReadComplete回调
pipeline.fireChannelReadComplete();
}
可以看到,当读消息体处理完毕后,会触发pipeline的channelReadComplete回调,最终在IdleStateHandle.channelReadComplete记录读结束时间点
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
lastReadTime = System.nanoTime();
reading = false;
}
ctx.fireChannelReadComplete();
}
本篇分析结束。