分布式系统

Eureka源码解析(二) renew源码分析

2020-10-24  本文已影响0人  nerowu

阅读前的思考

使用netflix eureka做服务管理时,若你只停留在对eureka的概念理解和使用层面,那么你面试时会得到面试官的灵魂拷问,例如:
1)eureka将服务注册信息存放在哪里?服务注册信息都有哪些内容?
2)eureka如何做到高可用?底层的通信机制是什么?
3)心跳机制到底发送些什么内容,有了解吗?
4)服务注册列表是存在客户端还是服务端?如果多复本数据不一致怎么处理?
5)若网络故障服务注册失败了,eureka是如何保证注册成功的?
6)注册,同步,下线,剔除分别是怎么实现的?
7)为什么刚启动的服务没有即时被eureka发现?对此你还遇到过哪些坑?

带着这些问题或疑惑,作者决定推出eureka源码解读系列,从众所周知的Eureka功能着手,对register,renew,heartbeat,fetch,剔除/关闭,数据复制等进行源码解读,意在深入理解eureka功能。

Tip:建议开篇从 Eureka源码解析(一) 开始,之后的文章是基于开篇的分析成果之上进行撰写的。

Renew client端处理流程

DiscoveryClient.renew()方法定义的服务续约的具体流程。

boolean  renew(){
    EurekaHttpResponse<InstanceInfo>httpResponse;
    try{
        //通过sendHeartBeat的方式完成
        httpResponse=eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(),instanceInfo.getId(),instanceInfo,null);
        
        if(httpResponse.getStatusCode()==Status.NOT_FOUND.getStatusCode()){
            REREGISTER_COUNTER.increment();
            long timestamp=instanceInfo.setIsDirtyWithTime();
            boolean success=register();
            if(success){
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode()==Status.OK.getStatusCode();
    }catch(Throwablee){
    }
}

sendHeartBeat一共有两个类实现了方法,AbstractJerseyEurekaHttpClient,JerseyReplicationClient无论是哪个发送内容者是一样的。

public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName,String id,InstanceInfo info,InstanceStatus overriddenStatus){
    String urlPath="apps/"+appName+'/'+id;
    ……
    WebResourcewebResource=jerseyClient.resource(serviceUrl)
        .path(urlPath)
        .queryParam("status",info.getStatus().toString())
        .queryParam("lastDirtyTimestamp",info.getLastDirtyTimestamp().toString());
    if(overriddenStatus!=null){
        webResource=webResource.queryParam("overriddenstatus",overriddenStatus.name());
    }
    ……
    //http put请求
    response=requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class);
    ……
}

public enum InstanceStatus:
UP,//Ready to receive traffic
DOWN,//Do not send traffic-health check callback failed
STARTING,//Just about starting-initializations to be done-do not
//sendtraffic
OUT_OF_SERVICE,//Intentionally shut down for traffic
UNKNOWN;

心跳发送内容:appName+id(instanceId)作为url,参数:status即服务状态,lastDirtyTimestamp 即instance在client端最后被修改的时间戳,overriddenStatus 更新过的服务状态。

服务续约的调用

DiscoveryClient 构造方法中调用了initScheduledTasks方法初始化了heartbeatTask 用于执行心跳发送任务,具体任务实现在HeartbeatThread(Runnable)中

/**
*Theheartbeattaskthatrenewstheleaseinthegivenintervals.
*/
private class HeartbeatThread implements Runnable{

    public void run(){
        //调用renew
        if(renew()){
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

Eureka 默认每隔30秒一次心跳检测具体现实如下:

//Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
    "heartbeat",
    scheduler,
    heartbeatExecutor,
    renewalIntervalInSecs,//默认30秒
    TimeUnit.SECONDS,
    expBackOffBound,
    new HeartbeatThread()
);
scheduler.schedule(
    heartbeatTask,
        renewalIntervalInSecs,TimeUnit.SECONDS);

总结:renew功能与心跳是绑定在一起发送的。定时心跳实现采用JDK的ScheduledExecutorService,执行任务是调用renew函数,发送的内容为appName+instanceId组成的url,参数是服务器状态status 和 最近更新状态的时间戳。以Http put 请求发送

Server 端流程处理

跟register服务处理流程一样,服务器启动后扫描并创建ApplicationResource,根据@Path("{id}")创建InstanceResource并调用@Put修饰的方法。

@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) StringisReplication,
    @QueryParam("overriddenstatus") StringoverriddenStatus,
    @QueryParam("status") Stringstatus,
    @QueryParam("lastDirtyTimestamp") StringlastDirtyTimestamp){
    
    booleanisFromReplicaNode="true".equals(isReplication);
    booleanisSuccess=registry.renew(app.getName(),id,isFromReplicaNode);
    ……

register.renew指向AbstractInstanceRegistry的renew方法,代码如下:

public boolean renew(String appName,String id,boolean isReplication){
    RENEW.increment(isReplication);
    Map<String,Lease<InstanceInfo>>gMap=registry.get(appName);
    Lease<InstanceInfo>leaseToRenew=null;
    if(gMap!=null){
        leaseToRenew=gMap.get(id);
    }
    //如果gMap中找不到服务实例返回false,则续租失败。
    if(leaseToRenew==null){
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS:Registry:leasedoesn'texist,registeringresource:{}-{}",appName,id);
        return false;
    }else{
        InstanceInfoinstanceInfo=leaseToRenew.getHolder();
        if(instanceInfo!=null){
            InstanceStatus overriddenInstanceStatus=this.getOverriddenInstanceStatus(instanceInfo,leaseToRenew,isReplication);
            if(overriddenInstanceStatus==InstanceStatus.UNKNOWN){
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            if(!instanceInfo.getStatus().equals(overriddenInstanceStatus)){
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
            }
        }
        renewsLastMin.increment();
        //更新时间戳 lastUpdateTimestamp=System.currentTimeMillis()+duration;
        leaseToRenew.renew();
        return true;
    }

}

此处renew方法在concurrentHashMap<String,Map<String,Lease<InstanceInfo>>> registry中查找(appName,(instanceId,Lease<InstanceInfo>)).如果判断并更新时间Instance的时间戳。如果查询不到,则返回false.意味着被剔除的服务是无法得到续租的。

eureka没有服务超时一说,每隔30秒一次心跳,服务在map中存在则更新时间戳,90秒没有续租上则执行服务剔除。

上一篇 下一篇

猜你喜欢

热点阅读