sofa-bolt 心跳检测

2019-06-14  本文已影响0人  heyong

一、概述

soft-bolt心跳机制是基于Netty,因此在分析soft-bolt的心跳之前,先分析一下netty心跳实现。

二、netty心跳实现

Netty提供了IdleStateHandler类,用于支持心跳检查,其构造参数

/**
 * Creates a new instance firing {@link IdleStateEvent}s.
 *
 * @param readerIdleTime 读超时时间
 * @param writerIdleTime 写超时时间
 * @param allIdleTime    读写超时时间
 * @param unit           时间单位
 */
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) 

该类是一个ChannelHandler,需要加入到ChannelPipeline里面,参考代码如下:

this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
​
    @Override
    protected void initChannel(SocketChannel channel) {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast("decoder", codec.newDecoder());
        pipeline.addLast("encoder", codec.newEncoder());
        if (idleSwitch) {
            pipeline.addLast("idleStateHandler", new IdleStateHandler(5, 0, 0, TimeUnit.MILLISECONDS));
            pipeline.addLast("serverIdleHandler", serverIdleHandler);
        }
        pipeline.addLast("connectionEventHandler", connectionEventHandler);
        pipeline.addLast("handler", rpcHandler);
        createConnection(channel);
    }

在channel链中加入了IdleSateHandler,第一个参数是5,单位是秒,服务器端会每隔5秒来检查一下channelRead方法被调用的情况,如果在5秒内该链上的channelRead方法都没有被触发,就会调用userEventTriggered方法 ,下面看一下IdleStateHandler中的channelRead方法。

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
        reading = true;
        firstReaderIdleEvent = firstAllIdleEvent = true;
    }
    ctx.fireChannelRead(msg);
}

该方法只是记录了一下调用时间,然后将请求往下透传,接下来看一下channelActive方法。

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // This method will be invoked only if this handler was added
    // before channelActive() event is fired.  If a user adds this handler
    // after the channelActive() event, initialize() will be called by beforeAdd().
    initialize(ctx);
    super.channelActive(ctx);
}

在客户端与服务端建立连接以后,会调用channelActive方法,在IdleSateHandler的channelActive方法中调用initialize()方法进行连接心跳的初始化操作,具体实现如下:

private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
    // See: https://github.com/netty/netty/issues/143
    switch (state) {
    case 1:
    case 2:
        return;
    }
​
    state = 1;
    initOutputChanged(ctx);
​
    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
        //创建定时任务处理读超时
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        //创建定时任务,处理写超时
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        // 创建定时任务,处理读写超时
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}

上面启动了定时任务,来处理心跳问题,下面具体来分析ReaderIdleTimeoutTask定时任务做了什么操作?

protected void run(ChannelHandlerContext ctx) {
    long nextDelay = readerIdleTimeNanos;
    if (!reading) {
        nextDelay -= ticksInNanos() - lastReadTime;
    }
​
    if (nextDelay <= 0) {
        // Reader is idle - set a new timeout and notify the callback.
        readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
​
        boolean first = firstReaderIdleEvent;
        firstReaderIdleEvent = false;
​
        try {
            IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Read occurred before the timeout - set a new timeout with shorter delay.
        readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}

当前时间减去最后一次channelRead方法调用的时间,如果该时间间隔大于设置的读超时时间,就触发读空闲时间,并且创建定时任务继续检查。

上面的代码分析了读超时问题,写超时和读写超时的代码类似,可以自行分析。

三、soft-bolt 心跳实现

在soft-bolt实现了通过HeartbeatHandler来处理连接心跳。具体实现如下:

public class HeartbeatHandler extends ChannelDuplexHandler {
    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            ProtocolCode protocolCode = ctx.channel().attr(Connection.PROTOCOL).get();
            Protocol protocol = ProtocolManager.getProtocol(protocolCode);
            protocol.getHeartbeatTrigger().heartbeatTriggered(ctx);
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

上面判断如果事件类型IdleStateEvent,马上就进行心跳处理,心跳处理是heartbeatTriggered方法实现的

public void heartbeatTriggered(final ChannelHandlerContext ctx) throws Exception {
    Integer heartbeatTimes = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
    final Connection conn = ctx.channel().attr(Connection.CONNECTION).get();
  
    //如果心跳超过设定次数没有响应,就断开连接
    if (heartbeatTimes >= maxCount) {
        try {
            conn.close();
            logger.error(
                "Heartbeat failed for {} times, close the connection from client side: {} ",
                heartbeatTimes, RemotingUtil.parseRemoteAddress(ctx.channel()));
        } catch (Exception e) {
            logger.warn("Exception caught when closing connection in SharableHandler.", e);
        }
    } else {
        boolean heartbeatSwitch = ctx.channel().attr(Connection.HEARTBEAT_SWITCH).get();
        if (!heartbeatSwitch) {
            return;
        }
        final HeartbeatCommand heartbeat = new HeartbeatCommand();
        //添加回调listener
        final InvokeFuture future = new DefaultInvokeFuture(heartbeat.getId(),
            new InvokeCallbackListener() {
                @Override
                public void onResponse(InvokeFuture future) {
                    ResponseCommand response;
                    try {
                        response = (ResponseCommand) future.waitResponse(0);
                    } catch (InterruptedException e) {
                        logger.error("Heartbeat ack process error! Id={}, from remoteAddr={}",
                            heartbeat.getId(), RemotingUtil.parseRemoteAddress(ctx.channel()),
                            e);
                        return;
                    }
                    if (response != null
                        && response.getResponseStatus() == ResponseStatus.SUCCESS) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Heartbeat ack received! Id={}, from remoteAddr={}",
                                response.getId(),
                                RemotingUtil.parseRemoteAddress(ctx.channel()));
                        }
                        // 如果心跳请求被成功响应,设置心跳次数为0
                        ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(0);
                    } else {
                        if (response == null) {
                            logger.error("Heartbeat timeout! The address is {}",
                                RemotingUtil.parseRemoteAddress(ctx.channel()));
                        } else {
                            logger.error(
                                "Heartbeat exception caught! Error code={}, The address is {}",
                                response.getResponseStatus(),
                                RemotingUtil.parseRemoteAddress(ctx.channel()));
                        }
                        // 心跳请求响应异常或者超时,心跳次数加1
                        Integer times = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
                        ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(times + 1);
                    }
                }
                @Override
                public String getRemoteAddress() {
                    return ctx.channel().remoteAddress().toString();
                }
            }, null, heartbeat.getProtocolCode().getFirstByte(), this.commandFactory);
        final int heartbeatId = heartbeat.getId();
        conn.addInvokeFuture(future);
        if (logger.isDebugEnabled()) {
            logger.debug("Send heartbeat, successive count={}, Id={}, to remoteAddr={}",
                heartbeatTimes, heartbeatId, RemotingUtil.parseRemoteAddress(ctx.channel()));
        }
        // 发送心跳请求
        ctx.writeAndFlush(heartbeat).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Send heartbeat done! Id={}, to remoteAddr={}",
                            heartbeatId, RemotingUtil.parseRemoteAddress(ctx.channel()));
                    }
                } else {
                    logger.error("Send heartbeat failed! Id={}, to remoteAddr={}", heartbeatId,
                        RemotingUtil.parseRemoteAddress(ctx.channel()));
                }
            }
        });
        // 处理心跳请求超时
        TimerHolder.getTimer().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                InvokeFuture future = conn.removeInvokeFuture(heartbeatId);
                if (future != null) {
                    future.putResponse(commandFactory.createTimeoutResponse(conn
                        .getRemoteAddress()));
                    future.tryAsyncExecuteInvokeCallbackAbnormally();
                }
            }
        }, heartbeatTimeoutMillis, TimeUnit.MILLISECONDS);
    }
}
上一篇下一篇

猜你喜欢

热点阅读