Spring Cloud——Eureka服务实例下线

2021-09-09  本文已影响0人  小波同学

应用服务实例下线

1、Eureka Client发起下线

应用实例关闭时,Eureka Client向Eureka Server发起下线应用实例。需要满足如下条件才可发起:

Eureka Client端有一个shutdown方法,服务实例下线的时候,会主动调用这个方法。

@Singleton
public class DiscoveryClient implements EurekaClient {

    /**
     * 关闭Eureka客户端。还将注销请求发送到Eureka服务器
     */
    @PreDestroy
    @Override
    public synchronized void shutdown() {
        if (isShutdown.compareAndSet(false, true)) {
            logger.info("Shutting down DiscoveryClient ...");

            if (statusChangeListener != null && applicationInfoManager != null) {
                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }
            
            //将定时调度任务都停止
            cancelScheduledTasks();

            // If APPINFO was registered
            //如果注册到了Eureka Server,通知Server下线
            if (applicationInfoManager != null
                    && clientConfig.shouldRegisterWithEureka()
                    && clientConfig.shouldUnregisterOnShutdown()) {
                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                
                //服务下线
                unregister();
            }

            if (eurekaTransport != null) {
                eurekaTransport.shutdown();
            }

            heartbeatStalenessMonitor.shutdown();
            registryStalenessMonitor.shutdown();

            Monitors.unregisterObject(this);

            logger.info("Completed shut down of DiscoveryClient");
        }
    }
    
    void unregister() {
        // It can be null if shouldRegisterWithEureka == false
        if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
            try {
                logger.info("Unregistering ...");
                // 调用AbstractJerseyEurekaHttpClient的cancel方法
                EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
                logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
            } catch (Exception e) {
                logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
            }
        }
    }   
}

将定时调度任务都停止——cancelScheduledTasks()

cancelScheduledTasks方法中会将client端启动的定时任务都停止,包括:心跳续约,定时拉取增量注册表等等。

@Singleton
public class DiscoveryClient implements EurekaClient {

    private void cancelScheduledTasks() {
        if (instanceInfoReplicator != null) {
            instanceInfoReplicator.stop();
        }
        if (heartbeatExecutor != null) {
            heartbeatExecutor.shutdownNow();
        }
        if (cacheRefreshExecutor != null) {
            cacheRefreshExecutor.shutdownNow();
        }
        if (scheduler != null) {
            scheduler.shutdownNow();
        }
        if (cacheRefreshTask != null) {
            cacheRefreshTask.cancel();
        }
        if (heartbeatTask != null) {
            heartbeatTask.cancel();
        }
    }
}

AbstractJerseyEurekaHttpClient的cancel()方法使用DELETE请求调用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}接口,实现应用实例信息的下线

public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {

    @Override
    public EurekaHttpResponse<Void> cancel(String appName, String id) {
        String urlPath = "apps/" + appName + '/' + id;
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder.delete(ClientResponse.class);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }
}

2、Eureka Server接收下线

Eureka Server接收下线请求核心流程如下图:


1)、接收下线请求

@Produces({"application/xml", "application/json"})
public class InstanceResource {

    private final PeerAwareInstanceRegistry registry;

    @DELETE
    public Response cancelLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        try {
        
            // 服务下线
            boolean isSuccess = registry.cancel(app.getName(), id,
                "true".equals(isReplication));

            if (isSuccess) {
                logger.debug("Found (Cancel): {} - {}", app.getName(), id);
                return Response.ok().build();
            } else {
                logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
                return Response.status(Status.NOT_FOUND).build();
            }
        } catch (Throwable e) {
            logger.error("Error (cancel): {} - {}", app.getName(), id, e);
            return Response.serverError().build();
        }

    }
}
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    @Override
    public boolean cancel(final String appName, final String id,
                          final boolean isReplication) {
        // 调用父类方法下线服务实例               
        if (super.cancel(appName, id, isReplication)) {
            // Eureka Server复制
            replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);

            return true;
        }
        return false;
    }
}

PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的cancel(...)方法下线应用实例信息。

2)、下线应用实例信息

调用了AbstractInstanceRegistry的cancel(...)方法,下线应用实例信息,代码如下:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {

    @Override
    public boolean cancel(String appName, String id, boolean isReplication) {
        return internalCancel(appName, id, isReplication);
    }
    
    protected boolean internalCancel(String appName, String id, boolean isReplication) {
        try {
            // 获得读锁
            read.lock();
            // 增加取消注册次数到监控
            CANCEL.increment(isReplication);
            
            // 获取appName 对应的所有实例集合
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            // 移除租约映射
            if (gMap != null) {
                // 根据实例id 移除对应的实例租约信息
                leaseToCancel = gMap.remove(id);
            }
            // 添加到最近取消注册的队列,用于统计
            recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            // 移除应用实例覆盖状态映射
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (instanceStatus != null) {
                logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
            }
            // 租约不存在
            if (leaseToCancel == null) {
                // 添加取消注册不存在到监控
                CANCEL_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                return false;
            } else {
                // 设置租约的取消注册时间戳
                leaseToCancel.cancel();
                // 添加到最近租约变更记录队列
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    //放入最近改变队列中
                    instanceInfo.setActionType(ActionType.DELETED);
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                // 设置响应缓存过期
                invalidateCache(appName, vip, svip);
                logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            }
        } finally {
            // 释放锁
            read.unlock();
        }


        // 更新 需要发送心跳的客户端数量
        synchronized (lock) {
            if (this.expectedNumberOfClientsSendingRenews > 0) {
                // Since the client wants to cancel it, reduce the number of clients to send renews.
                this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
                updateRenewsPerMinThreshold();
            }
        }

        return true;
    }   
}

更新需要发送续约信息客户端数量,毕竟是服务下线,就少了一个需要发送续约信息的客户端,更新自我保护机制触发阈值,这块涉及到Eureka Server 自我保护机制实现原理。

public class Lease<T> {

    enum Action {
        Register, Cancel, Renew
    };
    
    private long evictionTimestamp;
    
    public void cancel() {
        if (evictionTimestamp <= 0) {
            // 设置取消注册时间戳
            evictionTimestamp = System.currentTimeMillis();
        }
    }
}

方法逻辑不复杂,有四步:

服务下线不会主动通知其他客户端,而是等其他客户端来拉取增量注册表的时候,才会感知到。

参考:
https://blog.csdn.net/qq_40378034/article/details/119079180

上一篇 下一篇

猜你喜欢

热点阅读