sofabolt

SOFABolt 源码分析21 - 超时与快速失败机制的设计

2018-10-21  本文已影响117人  原水寒

SOFABolt 的超时分为两种:连接超时和调用超时。

  • 连接超时
  • 仅客户端可设置,因为只有客户端会建连
  • 连接超时时间的设置只作用于建连的时候,有两种方式可以指定 - addr 拼接属性参数 或者 url 设置超时时间属性
  • 连接超时机制底层实际上使用的 Netty 的超时参数设置方式
  • 调用超时:调用端无论是 RpcClient 还是 RpcServer,除了 oneway 模式下,剩余的三种调用模式 sync / future / callback 的方法都提供了一个必填参数 int timeoutMillis,该参数用来指定调用超时时间
    调用超时时间对于同步调用和异步调用不同
  • sync:使用 CountDownLatch # boolean await(long timeout, TimeUnit unit) 实现当前线程的实现阻塞等待
  • future 和 callback:使用 Netty 的 HashedWheelTimer 实现 - 在发出请求的时候创建 timeoutMillis 时间的超时任务 task,当在 timeoutMillis 时间内没有返回响应,则执行 task 中的内容(构造超时响应,返回给调用程序);否则,取消 task

fail-fast 机制

  • 仅用在被调用端,即请求的处理端
  • fail-fast 机制指的是在序列化全部内容之前,会做一个判断,如果处理当前请求的 UserProcessor 开启了 fail-fast 功能,并且此时已经超时,并且不是 oneway 模式(oneway 没有超时概念),则直接丢弃该请求,不再进行后续的序列化和业务逻辑处理操作
  • fail-fast 机制是由 UserProcessor#timeoutDiscard() 指定的,如果返回 true,则打开 fail-fast,默认情况下 fail-fast 机制是打开的;如果想关闭该功能,手动覆盖 UserProcessor#timeoutDiscard() 方法即可,直接返回 false,之后是否超时就从 DefaultBizContext#isRequestTimeout() 进行判断

一、连接超时机制

默认情况下,连接超时时间由RpcConfigs.CONNECT_TIMEOUT_KEY来指定,默认为 1000ms,可以通过在 addr 上拼接参数或者在 url 上设置值来指定连接超时时间。

使用姿势

// addr 拼接方式
String addr = "127.0.0.1:8888?_CONNECTTIMEOUT=3000";

// url 设置值的方式
Url url = new Url("127.0.0.1",8888);
url.setConnNum(1);
url.setConnectTimeout(3000);

源码分析

以 url 方式为例,addr 只是将 _CONNECTTIMEOUT=3000 properties 属性设置到了url.connectTimeout 中。后续还是使用 url 方式进行调用。在调用的过程中会获取或者创建连接,连接超时作用于创建连接。

public abstract class AbstractConnectionFactory implements ConnectionFactory {
    @Override
    public Connection createConnection(Url url) throws Exception {
        // 创建连接
        Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
        ...
        return conn;
    }

    protected Channel doCreateConnection(String targetIP, int targetPort, int connectTimeout) {
        // 预处理 connectTimeout,最小为 1000
        connectTimeout = Math.max(connectTimeout, 1000);
        // 设置 Netty 的连接超时时间属性
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
        // 进行连接
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));

        future.awaitUninterruptibly();
        ...
        return future.channel();
    }
}

二、调用超时机制(调用端)

调用超时时间对于同步调用和异步调用不同

  • sync:使用 CountDownLatch # boolean await(long timeout, TimeUnit unit) 实现当前线程的实现阻塞等待
  • future 和 callback:使用 Netty 的 HashedWheelTimer 实现

调用端无论是 RpcClient 还是 RpcServer,除了 oneway 模式下,剩余的三种调用模式 sync / future / callback 的方法都提供了一个必填参数 int timeoutMillis,该参数用来指定调用超时时间

Object invokeSync(String addr, Object request, int timeoutMillis)
RpcResponseFuture invokeWithFuture(String addr, Object request, int timeoutMillis)
void invokeWithCallback(String addr, Object request, InvokeCallback invokeCallback, int timeoutMillis)

同步方式

image.png
============================ class RpcRemoting extends BaseRemoting ============================
    public Object invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
        // 创建请求:设置 int timeout = timeoutMillis(默认为-1,表示永不超时)
        // 该 timeout 参数会用在被调用端判断是否超时,进而做 fail-fast 操作
        RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
        ...
        // 发起请求
        ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand, timeoutMillis);
        ...
        // 解析响应
        Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand, RemotingUtil.parseRemoteAddress(conn.getChannel()));
        return responseObject;
    }

看下解析响应流程
    public static Object resolveResponseObject(ResponseCommand responseCommand, String addr) throws RemotingException {
        ... 预处理响应,服务服务端返回异常信息,直接抛出异常
        preProcess(responseCommand, addr);
        if (responseCommand.getResponseStatus() == ResponseStatus.SUCCESS) {
            return toResponseObject(responseCommand);
        } else {
            ...
        }
    }

    private static void preProcess(ResponseCommand responseCommand, String addr)  throws RemotingException {
        RemotingException e = null;
        // responseCommand == null 超时
        if (responseCommand == null) {
            e = new InvokeTimeoutException(msg);
        } else {
            switch (responseCommand.getResponseStatus()) {
                case TIMEOUT:
                    e = new InvokeTimeoutException(msg);
                    break;
                ...
            }
        }
        // 直接抛出超时异常
        if (null != e) {
            throw e;
        }
    }

============================ BaseRemoting ============================
    protected RemotingCommand invokeSync(Connection conn, RemotingCommand request, int timeoutMillis) {
        // 创建 InvokeFuture
        final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
        // 添加 InvokeFuture 到 Connection
        conn.addInvokeFuture(future);
        // 使用 Netty 发送请求
        conn.getChannel().writeAndFlush(request)
        // 等待 timeoutMillis,如果超时,不等待结果直接返回 response,此时的 response == null,实际代码如下
        //    public ResponseCommand waitResponse(long timeoutMillis) throws InterruptedException {
        //        // 如果超时,知己返回false,this.responseCommand 直接返回 null
        //        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        //        return this.responseCommand;
        //    }
        RemotingCommand response = future.waitResponse(timeoutMillis);
        // response == null 客户端自己创建超时响应
        if (response == null) {
            conn.removeInvokeFuture(request.getId());
            response = this.commandFactory.createTimeoutResponse(conn.getRemoteAddress());
        }
        return response;
    }

注意:RpcRequestCommand 中的 timeout 参数会用在被调用端判断是否超时,进而做 fail-fast 操作

HashWheelTimer 简介

future 和 callback 异步模式的超时机制都使用了 Netty 的 HashWheelTimer 机制。

image.png

Netty 的 HashWheelTimer 是一个环形结构,假设有 8 个格子(格子数可以通过 ticksPerWheel 构造器参数指定,默认为 512),一个格子代表一段时间(越短 Timer 精度越高,每个格子代表的时间可以通过 tickDuration 指定,默认为 100ms,SOFABolt 指定为 10ms)。

以上图为例,假设一个格子是 1s,则整个 wheel 能表示的时间段为 8s,假如当前指针指向 2,此时需要调度一个 3s 后执行的任务,显然应该加入到 (2+3=5) 的方格中的链表中,并且 round = 0,指针再走 3 次就可以执行了;如果任务要在 10s 后执行,应该等指针走完一个 round 零 2 格再执行,因此应放入4((2+10)%8=4)的方格中的链表中,其 round 设为 1。检查到期任务时只执行 round 为 0 的,格子上其他任务的 round 减 1。

Netty 中有一条线程控制 HashWheelTimer 中的指针每隔 tickDuration 移动到下一个格子,执行其中 round 为 0 并且不在 cancel 队列的任务,该格子上的其他任务的 round 减 1。

当取消一个超时任务时,直接将该任务添加到 cancel 队列,就不会被执行了。

最后看看 HashWheelTimer 在 SOFABolt 中的最佳实践。

public class TimerHolder {
    // 每格 10 ms
    private final static long defaultTickDuration = 10;
    // HashedWheelTimer 单例
    private static class DefaultInstance {
        static final Timer INSTANCE = new HashedWheelTimer(new NamedThreadFactory( "DefaultTimer" + defaultTickDuration, true), defaultTickDuration, TimeUnit.MILLISECONDS);
    }

    private TimerHolder() {
    }

    public static Timer getTimer() {
        return DefaultInstance.INSTANCE;
    }
}

// 使用见下边的分析
Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask() {
    @Override
    public void run(Timeout timeout) throws Exception {
        InvokeFuture future = conn.removeInvokeFuture(request.getId());
        future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
        future.tryAsyncExecuteInvokeCallbackAbnormally();
    }
}, timeoutMillis, TimeUnit.MILLISECONDS);

异步方式(future 模式)

image.png

future 的阻塞点在 RpcResponseFuture.get()操作上,实际上同同步一样,底层也是阻塞在 countDownLatch.await() 上

============================ class RpcRemoting extends BaseRemoting ==================
    public RpcResponseFuture invokeWithFuture(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
        // 创建请求:设置 int timeout = timeoutMillis(默认为-1,表示永不超时)
        // 该 timeout 参数会用在被调用端判断是否超时,进而做 fail-fast 操作
        RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
        ...
        // 发起请求
        InvokeFuture future = super.invokeWithFuture(conn, requestCommand, timeoutMillis);
        // 直接返回响应,在 get 的时候,依然使用 RpcResponseResolver.resolveResponseObject 解析响应,如果超时,直接抛出超时异常
        return new RpcResponseFuture(RemotingUtil.parseRemoteAddress(conn.getChannel()), future);
    }

============================ BaseRemoting ============================
    protected InvokeFuture invokeWithFuture(Connection conn, RemotingCommand request, int timeoutMillis) {
        // 创建 InvokeFuture
        final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
        // 添加 InvokeFuture 到 Connection
        conn.addInvokeFuture(future);
        try {
            // 创建超时任务,添加到 Netty 的 hashWheel 中,在 timeoutMillis 之后如果该超时任务没有被取消,则执行其 run 方法
            // 进而创建超时响应
            Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask() {
                public void run(Timeout timeout) throws Exception {
                    InvokeFuture future = conn.removeInvokeFuture(request.getId());
                    // 创建超时响应 + 设置超时响应 + 唤醒阻塞线程
                    future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
                }
            }, timeoutMillis, TimeUnit.MILLISECONDS);
            // 将 Timeout 设置到 InvokeFuture 中,后续可以从 InvokeFuture 中取出 Timeout,执行取消超时任务的操作
            // 超时任务的取消有两种情况:正常返回响应 + 如下发生异常
            future.addTimeout(timeout);
            conn.getChannel().writeAndFlush(request);
        } catch (Exception e) {
            InvokeFuture f = conn.removeInvokeFuture(request.getId());
            // 如果发生异常,取消超时任务
            f.cancelTimeout();
            f.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e));
        }
        return future;
    }

============================ RpcResponseProcessor ============================
    public void doProcess(RemotingContext ctx, RemotingCommand cmd) {
        // 获取 Connection
        Connection conn = ctx.getChannelContext().channel().attr(Connection.CONNECTION).get();
        // 获取 InvokeFuture
        InvokeFuture future = conn.removeInvokeFuture(cmd.getId());
        // 设置响应,唤醒阻塞线程
        future.putResponse(cmd);
        // 取消 timeout task
        future.cancelTimeout();
        // 回调 callback
        future.executeInvokeCallback();
    }

异步方式(callback 模式)

image.png
============================ class RpcRemoting extends BaseRemoting ==================
    public void invokeWithCallback(Connection conn, Object request, InvokeContext invokeContext, InvokeCallback invokeCallback, int timeoutMillis) {
        // 创建请求:设置 int timeout = timeoutMillis(默认为-1,表示永不超时)
        // 该 timeout 参数会用在被调用端判断是否超时,进而做 fail-fast 操作
        RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
        ...
        // 发出请求
        super.invokeWithCallback(conn, requestCommand, invokeCallback, timeoutMillis);
    }

============================ BaseRemoting ============================
       protected void invokeWithCallback(Connection conn, RemotingCommand request, InvokeCallback invokeCallback, int timeoutMillis) {
        // 创建 InvokeFuture
        InvokeFuture future = createInvokeFuture(conn, request, request.getInvokeContext(), invokeCallback);
        // 添加 InvokeFuture 到 Connection
        conn.addInvokeFuture(future);

        try {
            // 创建超时任务,添加到 Netty 的 hashWheel 中,在 timeoutMillis 之后如果该超时任务没有被取消,则执行其 run 方法
            // 进而创建超时响应
            Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    InvokeFuture future = conn.removeInvokeFuture(request.getId());
                    // 创建超时响应 + 设置超时响应
                    future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
                    // 想较于 future 而言,多出这一步,执行异常回调 callback.onException(e)
                    future.tryAsyncExecuteInvokeCallbackAbnormally();
                }

            }, timeoutMillis, TimeUnit.MILLISECONDS);
            // 将 Timeout 设置到 InvokeFuture 中,后续可以从 InvokeFuture 中取出 Timeout,执行取消超时任务的操作
            // 超时任务的取消有两种情况:正常返回响应 + 如下发生异常
            future.addTimeout(timeout);
            conn.getChannel().writeAndFlush(request);
        } catch (Exception e) {
            InvokeFuture f = conn.removeInvokeFuture(request.getId());
            // 如果发生异常,取消超时任务
            f.cancelTimeout();
            f.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e));
            // 想较于 future 而言,多出这一步,执行异常回调 callback.onException(e)
            f.tryAsyncExecuteInvokeCallbackAbnormally();
        }
    }

============================ RpcResponseProcessor ============================
    // 这里与 future 的处理方式一样
    public void doProcess(RemotingContext ctx, RemotingCommand cmd) {
        // 获取 Connection
        Connection conn = ctx.getChannelContext().channel().attr(Connection.CONNECTION).get();
        // 获取 InvokeFuture
        InvokeFuture future = conn.removeInvokeFuture(cmd.getId());
        // 设置响应,唤醒阻塞线程
        future.putResponse(cmd);
        // 取消 timeout task
        future.cancelTimeout();
        // 回调 callback
        future.executeInvokeCallback();
    }

心跳请求:RpcHeartbeatTrigger#heartbeatTriggered(final ChannelHandlerContext ctx) 在发送了心跳请求之后,也是用了 TimeOut 做超时任务,与 Callback 使用一致,不再分析。

三、快速失败机制(被调用端)

以上都是在分析调用端的超时逻辑,对于三种调用方式,被调用端的处理都是一套逻辑。

public class RpcRequestCommand extends RequestCommand {
    // 调用超时时间,调用端进行设置,默认值为 -1,代表永不超时
    private int timeout = -1;
    // 不会序列化到对端,该值会在被调用端的解码器收到该消息时,设置为收到的当前时间
    private transient long arriveTime = -1;
}

public class RpcCommandDecoder implements CommandDecoder {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        ...
        RequestCommand command;
        if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
            // 如果是心跳请求消息,直接创建一个 HeartbeatCommand
            command = new HeartbeatCommand();
        } else {
            // 如果是正常请求,创建一个 RpcRequestCommand
            command = createRequestCommand(cmdCode);
        }
        ...
        out.add(command);
    }

    private RpcRequestCommand createRequestCommand(short cmdCode) {
        RpcRequestCommand command = new RpcRequestCommand();
        ...
        // 设置请求到达时间
        command.setArriveTime(System.currentTimeMillis());
        return command;
    }
}

public class RpcRequestProcessor {
    public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor) {
        // 反序列化 className
        if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_CLAZZ)) {
            return;
        }
        // 获取 userProcessor
        UserProcessor userProcessor = ctx.getUserProcessor(cmd.getRequestClass());

        // 将 userProcessor.timeoutDiscard() 设置到 RemotingContext 中
        // userProcessor.timeoutDiscard() 用于 fail-fast,该值默认为 true,如果要关闭 fail-fast 功能,需要手动覆盖 UserProcessor#timeoutDiscard()
        ctx.setTimeoutDiscard(userProcessor.timeoutDiscard());

        ... 不管是 IO 线程执行还是业务线程池执行
        // ProcessTask.run() 调用 doProcess()
        executor.execute(new ProcessTask(ctx, cmd));
    }

    public void doProcess(final RemotingContext ctx, RpcRequestCommand cmd) throws Exception {
        long currentTimestamp = System.currentTimeMillis();
        // 设置超时时间 timeout 与到达时间 arriveTime 到 RemotingContext 中,为后续计算是否超时(ctx.isRequestTimeout())做准备
        preProcessRemotingContext(ctx, cmd, currentTimestamp);
        // 如果开启了 fail-fast 并且 (System.currentTimeMillis() - this.arriveTimestamp) > this.timeout 确实超时,直接返回
        // 不再进行后续的反序列化和业务逻辑处理
        if (ctx.isTimeoutDiscard() && ctx.isRequestTimeout()) {
            return;// then, discard this request
        }
        // decode request all
        if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
            return;
        }
        // 派发请求,做真正的业务逻辑处理
        dispatchToUserProcessor(ctx, cmd);
    }

    private void preProcessRemotingContext(RemotingContext ctx, RpcRequestCommand cmd, long currentTimestamp) {
        // 将 RpcRequestCommand.getArriveTime()(该值在解码器中进行设置)设置到 RemotingContext 中
        ctx.setArriveTimestamp(cmd.getArriveTime());
        // 将 RpcRequestCommand.getTimeout() 设置到 RemotingContext 中
        ctx.setTimeout(cmd.getTimeout());
        ...
    }

    // RemotingContext#isRequestTimeout()
    public boolean isRequestTimeout() {
        // timeout = -1 表示永不超时
        // oneway 没有超时概念
        // 是否超时是按照当前时间减去响应达到时间,而非被调用端的当前时间 - 调用端的请求发送时间(这样计算就是使用了两个机器的时钟,跨机器的时钟会有时钟差)
        if (this.timeout > 0 
                && (this.rpcCommandType != RpcCommandType.REQUEST_ONEWAY)
                && (System.currentTimeMillis() - this.arriveTimestamp) > this.timeout) {
            return true;
        }
        return false;
    }
}

注意

  • 是否超时是按照当前时间减去响应达到时间,而非被调用端的当前时间 - 调用端的请求发送时间(这样计算就是使用了两个机器的时钟,跨机器的时钟会有时钟差)
  • fail-fast 机制是由 UserProcessor#timeoutDiscard() 指定的,如果返回 true,则打开 fail-fast,默认情况下 fail-fast 机制是打开的;如果想关闭该功能,手动覆盖 UserProcessor#timeoutDiscard() 方法即可,直接返回 false,之后是否超时就从 DefaultBizContext#isRequestTimeout() 进行判断
DefaultBizContext#isRequestTimeout() 实际上还是调用了 RemotingContext#isRequestTimeout()

    private RemotingContext remotingCtx;
    @Override
    public boolean isRequestTimeout() {
        return this.remotingCtx.isRequestTimeout();
    }
上一篇下一篇

猜你喜欢

热点阅读