Spring Cloud springCloud 学习面试精选

Spring Cloud——Eureka服务续约(心跳机制)

2021-09-09  本文已影响0人  小波同学

前言

Eureka Client的应用启动时,在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中,会做以下几件事:

概览

以下图片来自Netflix官方,图中显示Eureka Client会发起Renew向注册中心做周期性续约,这样其他Eureka client通过Get Registry请求就能获取到新注册应用的相关信息:

来自官方文档的指导信息

最准确的说明信息来自Netflix的官方文档,地址:
https://github.com/Netflix/eureka/wiki/Understanding-eureka-client-server-communication#renew

关于续约的理解:

服务续约执行简要流程图

下面这张图大致描述了服务续约从Client端到Server端的大致流程,详情如下:

Eureka 续约源码分析

1、Eureka Client发起续约

Eureka Client向Eureka Server发起注册应用实例成功后获得租约,Eureka Client固定间隔向Eureka Server发起续约(renew),避免租约过期。

默认情况下,租约有效期为90秒,续约频率为30秒。两者比例为1:3,保证在网络异常等情况下,有三次重试的机会。

1)、初始化定时任务

Eureka Client在初始化过程中,创建心跳线程,固定间隔向Eureka Server发起续约。实现代码如下

@Singleton
public class DiscoveryClient implements EurekaClient {

    /**
     * 初始化所有计划的任务
     */
    private void initScheduledTasks() {
        //获取注册信息的定时任务
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            //心跳定时任务
            // Heartbeat timer
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()
            );
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            //服务实例同步定时任务
            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
            
            // 注册应用实例状态变更监听器
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            //初始化定时服务注册任务
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
}

2)、发起续约

@Singleton
public class DiscoveryClient implements EurekaClient {

    //最后成功向Eureka Server心跳时间戳
    private volatile long lastSuccessfulHeartbeatTimestamp = -1;

    private class HeartbeatThread implements Runnable {
        public void run() {
            // 调用续约方法
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
    
    //服务续约
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            //发Restful请求,即心跳
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            //404错误会触发注册逻辑
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            //返回码200表示心跳成功
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }   
}

AbstractJerseyEurekaHttpClient的renew()方法使用PUT请求调用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}接口,参数为status、lastDirtyTimestamp、overriddenstatus,实现续约。

继续展开上面代码段中的 eurekaTransport.registrationClient.sendHeartBeat方法,源码在EurekaHttpClientDecorator类中:

@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(final String appName,
                                                      final String id,
                                                      final InstanceInfo info,
                                                      final InstanceStatus overriddenStatus) {
    return execute(new RequestExecutor<InstanceInfo>() {
        @Override
        public EurekaHttpResponse<InstanceInfo> execute(EurekaHttpClient delegate) {
            //网络处理委托给代理类完成
            return delegate.sendHeartBeat(appName, id, info, overriddenStatus);
        }

        @Override
        public RequestType getRequestType() {
            //请求类型为心跳
            return RequestType.SendHeartBeat;
        }
    });
}

继续展开delegate.sendHeartBeat,多层调用一路展开,最终由JerseyApplicationClient类来完成操作,对应源码在父类AbstractJerseyEurekaHttpClient中,如下所示,主要工作是利用jersey库的Restful Api将自身的信息PUT到Eureka server,注意:这里不是POST,也不是GET,而是PUT:

@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
    String urlPath = "apps/" + appName + '/' + id;
    ClientResponse response = null;
    try {
        //请求参数有两个:Eureka client自身状态、自身关键信息(状态、元数据等)最后一次变化的时间
        WebResource webResource = jerseyClient.resource(serviceUrl)
                .path(urlPath)
                .queryParam("status", info.getStatus().toString())
                .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
        if (overriddenStatus != null) {
            webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        addExtraHeaders(requestBuilder);
         //注意:这里不是POST,也不是GET,而是PUT
        response = requestBuilder.put(ClientResponse.class);
        EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
        if (response.hasEntity() &&
                !HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the server
            eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
        }
        return eurekaResponseBuilder.build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

至此,Eureka client向服务续租的源码就分析完毕了,过程相对简单,DiscoveryClient、TimedSupervisorTask、JerseyApplicationClient等实例各司其职,定时发送PUT请求到Eureka server。

2、Eureka Server接收续约

Eureka Server接收续约核心流程如下图:

1)、接收续约请求

@Produces({"application/xml", "application/json"})
public class InstanceResource {

    @PUT
    public Response renewLease(
            // 是否是Replication模式 复制,同步
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            
            @QueryParam("overriddenstatus") String overriddenStatus,    // 实例的覆盖状态
            
            @QueryParam("status") String status,    // 实例状态
            
            // 实例信息在EurekClient端上次被修改的时间
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        // 续约
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

        // Not found in the registry, immediately ask for a register
        // 续租失败,返回404,EurekaClient端收到404后会发起注册请求
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        // 比较InstanceInfo的lastDirtyTimestamp属性
        Response response;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
             // 验证传入的lastDirtyTimestamp和EurekaServer端保存的lastDirtyTimestamp是否相同
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
             // 续约成功,返回200
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
        return response;
    }
}

PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的renew(...)方法续约实例信息

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    public boolean renew(final String appName, final String id, final boolean isReplication) {
        // 调用父类里的renew(appName, id, isReplication)方法续约
        if (super.renew(appName, id, isReplication)) {
            // 如果是续约请求则向其他EurekaServer节点同步续约信息

            // 如果是同步信息请求则直接返回
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }
}

2)、续约应用实例信息

调用了AbstractInstanceRegistry的renew(...)方法,续约实例信息,代码如下:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {

    public boolean renew(String appName, String id, boolean isReplication) {
        // 增加续约次数到监控
        RENEW.increment(isReplication);
        // 获取应用名对应的租约,即根据实例名称取出实例信息集合
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            // 根据实例id取出具体实例租约信息
            leaseToRenew = gMap.get(id);
        }
        // 租约不存在
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                // 获得实例的覆盖状态
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                
                // 实例覆盖状态为UNKNOWN,续租失败
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                
                // 实例状态与覆盖状态不一致
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId());
                    
                    // 强行把实例的覆盖状态设为实例状态
                    // 即status = overriddenInstanceStatus
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
                }
            }
            // 新增续租每分钟次数
            renewsLastMin.increment();
            // 续租(设置lastUpdateTimestamp(租约最后更新时间))
            leaseToRenew.renew();
            return true;
        }
    }
}
public class Lease<T> {

    enum Action {
        Register, Cancel, Renew
    };
    
    private volatile long lastUpdateTimestamp;
    
    public void renew() {
        // 设置租约最后更新时间戳
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
    }
}

续约的整个过程修改租约的过期时间,即使并发请求,也不会对数据的一致性产生影响,因此不需要像注册操作一样加锁。

3)、eureka引入overriddenstatus用来解决状态被覆盖问题

客户端调用updateStatus方法时,同时更新server端实例的status和overriddenStatus状态。

客户端调用renew方法时,也要更新server端实例的status和overriddenstatus状态,但是有以下规则的

参考:
https://xinchen.blog.csdn.net/article/details/82915355

https://blog.csdn.net/qq_40378034/article/details/119079180

https://blog.csdn.net/NEW_BUGGER/article/details/93710797

https://www.cnblogs.com/liujunj/p/13401808.html

上一篇下一篇

猜你喜欢

热点阅读