Eureka源码分析(六) TimedSupervisorTas

2018-11-10  本文已影响0人  skyguard

之前我们分析了eureka的注册服务实例信息,下面我们来分析下eureka的续租。当一个租约到期后,就有两种情况,一种是过期,EurekaServer将下线过期的节点,一种是续租,当EurekaServer检测到节点还能正常通信时,将执行续租的操作。我们知道,检测节点状态是ScheduledExecutorService的schedule方法,那么定时检测节点状态的任务是怎么执行的呢,答案就是TimedSupervisorTask。我们先来看下TimedSupervisorTask都有哪些属性

/**
 * 定时任务服务
 */
private final ScheduledExecutorService scheduler;
/**
 * 执行子任务线程池
 */
private final ThreadPoolExecutor executor;
/**
 *子 任务执行超时时间
 */
private final long timeoutMillis;
/**
 * 子任务
 */
private final Runnable task;
/**
 * 当前子任务执行频率
 */
private final AtomicLong delay;
/**
 * 最大子任务执行频率
 *
 * 子任务执行超时情况下使用
 */
private final long maxDelay;

TimedSupervisorTask 执行时,提交 task 到 executor 执行任务。
当 task 执行正常,TimedSupervisorTask 再次提交自己到scheduler 延迟 timeoutMillis 执行。
当 task 执行超时,重新计算延迟时间( 不允许超过 maxDelay ),再次提交自己到scheduler 延迟执行。
再来看下run方法的具体实现

public void run() {
    Future<?> future = null;
    try {
        // 提交 任务
        future = executor.submit(task);
        //
        threadPoolLevelGauge.set((long) executor.getActiveCount());
        // 等待任务 执行完成 或 超时
        future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
        // 设置 下一次任务执行频率
        delay.set(timeoutMillis);
        //
        threadPoolLevelGauge.set((long) executor.getActiveCount());
    } catch (TimeoutException e) {
        logger.error("task supervisor timed out", e);
        timeoutCounter.increment(); //

        // 设置 下一次任务执行频率
        long currentDelay = delay.get();
        long newDelay = Math.min(maxDelay, currentDelay * 2);
        delay.compareAndSet(currentDelay, newDelay);

    } catch (RejectedExecutionException e) {
        if (executor.isShutdown() || scheduler.isShutdown()) {
            logger.warn("task supervisor shutting down, reject the task", e);
        } else {
            logger.error("task supervisor rejected the task", e);
        }

        rejectedCounter.increment(); //
    } catch (Throwable e) {
        if (executor.isShutdown() || scheduler.isShutdown()) {
            logger.warn("task supervisor shutting down, can't accept the task");
        } else {
            logger.error("task supervisor threw an exception", e);
        }

        throwableCounter.increment(); //
    } finally {
        // 取消 未完成的任务
        if (future != null) {
            future.cancel(true);
        }

        // 调度 下次任务
        if (!scheduler.isShutdown()) {
            scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
        }
    }
}

续租应用实例信息的请求,映射 InstanceResource的renewLease方法,看下具体的实现

@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    // 续租
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

    // 续租失败
    // Not found in the registry, immediately ask for a register
    if (!isSuccess) {
        logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
        return Response.status(Status.NOT_FOUND).build();
    }

    // 比较 InstanceInfo 的 lastDirtyTimestamp 属性
    // Check if we need to sync based on dirty time stamp, the client
    // instance might have changed some value
    Response response = null;
    if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
        response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
        // Store the overridden status since the validation found out the node that replicates wins
        if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                && (overriddenStatus != null)
                && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                && isFromReplicaNode) {
            registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
        }
    } else { // 成功
        response = Response.ok().build();
    }
    logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
    return response;
}

调用 AbstractInstanceRegistry的renew方法,续租应用实例信息,看下具体的实现

public boolean renew(String appName, String id, boolean isReplication) {
    // 增加 续租次数 到 监控
    RENEW.increment(isReplication);
    // 获得 租约
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = gMap.get(id);
    }
    // 租约不存在
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // touchASGCache(instanceInfo.getASGName());
            // 获得 应用实例状态
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            // 应用实例状态未知,无法续约
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                        + "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            // 设置 应用实例状态
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                Object[] args = {
                        instanceInfo.getStatus().name(),
                        instanceInfo.getOverriddenStatus().name(),
                        instanceInfo.getId()
                };
                logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                                + "Hence setting the status to overridden status", args);
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
            }
        }
        // 新增 续租每分钟次数
        renewsLastMin.increment();
        // 设置 租约最后更新时间(续租)
        leaseToRenew.renew();
        return true;
    }
}

调用 Lease的renew方法,设置租约最后更新时间( 续租 ),看下具体的实现

public void renew() {
    lastUpdateTimestamp = System.currentTimeMillis() + duration;
}

整个过程修改的租约的过期时间,即使并发请求,也不会对数据的一致性产生不一致的影响,因此不需要加锁。
Eureka续租的操作就完成了。
TimedSupervisorTask的分析就到这里了。

上一篇下一篇

猜你喜欢

热点阅读