eureka如何剔除实例
2017-06-30 本文已影响110人
go4it
序
除了服务实例主动注销注册外,eureka如何剔除过期的实例呢。
相关参数
- evictionIntervalTimerInMs(
清理无效节点的时间间隔,默认60秒
)
eureka.server.evictionIntervalTimerInMs=5000
如果没有设置的话默认是60秒:
eureka-core-1.4.12-sources.jar!/com/netflix/eureka/DefaultEurekaServerConfig.java
public long getEvictionIntervalTimerInMs() {
return configInstance.getLongProperty(
namespace + "evictionIntervalTimerInMs", (60 * 1000)).get();
}
定时任务
private Timer evictionTimer = new Timer("Eureka-EvictionTimer", true);
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
- renewalPercentThreshold
触发自我保护的心跳数比例阈值
evict方法
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
EvictionTask
eureka-core-1.4.12-sources.jar!/com/netflix/eureka/registry/AbstractInstanceRegistry.java的内部类EvictionTask
/* visible for testing */ class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
/**
* compute a compensation time defined as the actual time this task was executed since the prev iteration,
* vs the configured amount of time for execution. This is useful for cases where changes in time (due to
* clock skew or gc for example) causes the actual eviction task to execute later than the desired time
* according to the configured cycle.
*/
long getCompensationTimeMs() {
long currNanos = getCurrentTimeNano();
long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
if (lastNanos == 0l) {
return 0l;
}
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
return compensationTime <= 0l ? 0l : compensationTime;
}
long getCurrentTimeNano() { // for testing
return System.nanoTime();
}
}
计算补偿时间
long compensationTime = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos) - serverConfig.getEvictionIntervalTimerInMs();
即:当前时间点-task最近一次执行的时间点-清除间隔,小于0则取0
算出过期的lease
eureka-core-1.4.12-sources.jar!/com/netflix/eureka/lease/Lease.java
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
当cancel的时候evictionTimestamp会设置为cancel的时间,就会大于0
/**
* Cancels the lease by updating the eviction time.
*/
public void cancel() {
if (evictionTimestamp <= 0) {
evictionTimestamp = System.currentTimeMillis();
}
}
duration为lease的存活时间,默认为90秒
public static final int DEFAULT_LEASE_DURATION = 90;
private InstanceInfo decorateInstanceInfo(Lease<InstanceInfo> lease) {
InstanceInfo info = lease.getHolder();
// client app settings
int renewalInterval = LeaseInfo.DEFAULT_LEASE_RENEWAL_INTERVAL;
int leaseDuration = LeaseInfo.DEFAULT_LEASE_DURATION;
// TODO: clean this up
if (info.getLeaseInfo() != null) {
renewalInterval = info.getLeaseInfo().getRenewalIntervalInSecs();
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
info.setLeaseInfo(LeaseInfo.Builder.newBuilder()
.setRegistrationTimestamp(lease.getRegistrationTimestamp())
.setRenewalTimestamp(lease.getLastRenewalTimestamp())
.setServiceUpTimestamp(lease.getServiceUpTimestamp())
.setRenewalIntervalInSecs(renewalInterval)
.setDurationInSecs(leaseDuration)
.setEvictionTimestamp(lease.getEvictionTimestamp()).build());
info.setIsCoordinatingDiscoveryServer();
return info;
}
过期的计算逻辑相当于:最后更新时间 + lease的存活时间[默认是90s]+补偿时间 < 当前时间,则过期。
计算剔除个数
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
evictionLimit=注册的实例个数*(1-触发自我保护的心跳数比例阈值renewalPercentThreshold)
cancel过期实例
/**
* {@link #cancel(String, String, boolean)} method is overridden by {@link PeerAwareInstanceRegistry}, so each
* cancel request is replicated to the peers. This is however not desired for expires which would be counted
* in the remote peers as valid cancellations, so self preservation mode would not kick-in.
*/
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
read.lock();
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
read.unlock();
}
}
更新ResponseCache
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
invalidateCache(appName, vip, svip);
invalidateCache更新ResponseCacheImpl,这个主要是给eureka client查询的缓存,之后就是client端的定时更新逻辑。