Eureka源码分析(六) TimedSupervisorTas
2018-11-10 本文已影响0人
skyguard
之前我们分析了eureka的注册服务实例信息,下面我们来分析下eureka的续租。当一个租约到期后,就有两种情况,一种是过期,EurekaServer将下线过期的节点,一种是续租,当EurekaServer检测到节点还能正常通信时,将执行续租的操作。我们知道,检测节点状态是ScheduledExecutorService的schedule方法,那么定时检测节点状态的任务是怎么执行的呢,答案就是TimedSupervisorTask。我们先来看下TimedSupervisorTask都有哪些属性
/**
* 定时任务服务
*/
private final ScheduledExecutorService scheduler;
/**
* 执行子任务线程池
*/
private final ThreadPoolExecutor executor;
/**
*子 任务执行超时时间
*/
private final long timeoutMillis;
/**
* 子任务
*/
private final Runnable task;
/**
* 当前子任务执行频率
*/
private final AtomicLong delay;
/**
* 最大子任务执行频率
*
* 子任务执行超时情况下使用
*/
private final long maxDelay;
TimedSupervisorTask 执行时,提交 task 到 executor 执行任务。
当 task 执行正常,TimedSupervisorTask 再次提交自己到scheduler 延迟 timeoutMillis 执行。
当 task 执行超时,重新计算延迟时间( 不允许超过 maxDelay ),再次提交自己到scheduler 延迟执行。
再来看下run方法的具体实现
public void run() {
Future<?> future = null;
try {
// 提交 任务
future = executor.submit(task);
//
threadPoolLevelGauge.set((long) executor.getActiveCount());
// 等待任务 执行完成 或 超时
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
// 设置 下一次任务执行频率
delay.set(timeoutMillis);
//
threadPoolLevelGauge.set((long) executor.getActiveCount());
} catch (TimeoutException e) {
logger.error("task supervisor timed out", e);
timeoutCounter.increment(); //
// 设置 下一次任务执行频率
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.error("task supervisor rejected the task", e);
}
rejectedCounter.increment(); //
} catch (Throwable e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.error("task supervisor threw an exception", e);
}
throwableCounter.increment(); //
} finally {
// 取消 未完成的任务
if (future != null) {
future.cancel(true);
}
// 调度 下次任务
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
续租应用实例信息的请求,映射 InstanceResource的renewLease方法,看下具体的实现
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
// 续租
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// 续租失败
// Not found in the registry, immediately ask for a register
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// 比较 InstanceInfo 的 lastDirtyTimestamp 属性
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
Response response = null;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else { // 成功
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
return response;
}
调用 AbstractInstanceRegistry的renew方法,续租应用实例信息,看下具体的实现
public boolean renew(String appName, String id, boolean isReplication) {
// 增加 续租次数 到 监控
RENEW.increment(isReplication);
// 获得 租约
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
// 租约不存在
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
// 获得 应用实例状态
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
// 应用实例状态未知,无法续约
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
// 设置 应用实例状态
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
Object[] args = {
instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId()
};
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", args);
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
// 新增 续租每分钟次数
renewsLastMin.increment();
// 设置 租约最后更新时间(续租)
leaseToRenew.renew();
return true;
}
}
调用 Lease的renew方法,设置租约最后更新时间( 续租 ),看下具体的实现
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
整个过程修改的租约的过期时间,即使并发请求,也不会对数据的一致性产生不一致的影响,因此不需要加锁。
Eureka续租的操作就完成了。
TimedSupervisorTask的分析就到这里了。