Nacos健康检查源码

2021-10-01  本文已影响0人  念䋛

Nacos健康检查源码
在客户端注册服务端的时候,调用的是
InstanceController的register方法

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    
    final Instance instance = parseInstance(request);
    //注册instance
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

调用链
ServiceManager#registerInstance
-->createEmptyService(namespaceId, serviceName, instance.isEphemeral());
-->ServiceManager#createServiceIfAbsent
-->ServiceManager#putServiceAndInit

private void putServiceAndInit(Service service) throws NacosException {
//service可能是没有instance,也可能有instance,没有是新注册的instance可能是
//第一个注册的服务,如果其他服务已经注册,那么service就存在instance 
    putService(service);
//健康检查
    service.init();
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

健康检查

public void init() {
//健康检查,利用多线程
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}

一般看到这种代码就去找ClientBeatCheckTask的run方法

public static void scheduleCheck(ClientBeatCheckTask task) {
    futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

ClientBeatCheckTask的run方法

@Override
public void run() {
//集群模式用,后续回分析
    try {
        if (!getDistroMapper().responsible(service.getName())) {
            return;
        }
        
        if (!getSwitchDomain().isHealthCheckEnabled()) {
            return;
        }
        
        List<Instance> instances = service.allIPs(true);
        
        // first set health status of instances:
        for (Instance instance : instances) {
//当前时间减去上一次的心跳时间,如果小于15面则标注instance是不健康的
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
//标注instance为不健康
                    if (instance.isHealthy()) {
                        instance.setHealthy(false);
                        Loggers.EVT_LOG
                                .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                        instance.getIp(), instance.getPort(), instance.getClusterName(),
                                        service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                        instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
//instance发生变化,会调用serviceChanged方法,下面会分析到                       
getPushService().serviceChanged(service);
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }
        
        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }
        
        // then remove obsolete instances:
        for (Instance instance : instances) {
            
            if (instance.isMarked()) {
                continue;
            }
            //如果当前时间减掉上一次心跳时间大于30秒的话,则删除instance
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // delete instance
                Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                        JacksonUtils.toJson(instance));
//从容器中删除instance
                deleteIp(instance);
            }
        }
        
    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }
    
}

那我们先关注getPushService().serviceChanged(service);当instance发生变化的时候,比如这里的instance没有在15秒之内发送心跳,就会触发事件

public void serviceChanged(Service service) {
    // merge some change events to reduce the push frequency:
    if (futureMap
            .containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
        return;
    }
    //PushService类监听了事件
    this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}

PushService类监听事件
onApplicationEvent方法根据namepaceId和serviceName获取所有的client

ConcurrentMap<String, PushClient> clients = clientMap
        .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));

遍历所有的clients,并执行udpPush方法,作用是利用udp协议发送消息,作用是让客户端发送一次心跳,判断是否健康

udpPush(ackEntry);

clients是什么时候存放到clientMap中的,是客户端定时的从服务端拉去最新的客户端列表的时候

发送udp需要ip和端口,客户端是什么把upd端口发送个服务端的
InstanceController#ObjectNode list(HttpServletRequest request)
-->return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);
-->InstanceController#doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly)
--> pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);
-->PushService#addClient
--> addClient(client);
addClient方法中将client存放clientMap,其中key是namespaceId和servicename
同一个namespaceId和servicename下的所有instance,如果其中一个5秒内没有发送心跳,则利用udp的方式,通知同一个namespaceId和servicename下的所有instance发送一次心跳.
小结一下,就是客户端从幅度段拉去客户端list的时候,将udpPort和端口传给服务端,服务端将namespaceId+servicename作为key,client的map集合作为value存放进clientMap中,当其中一个instance5秒内没有心跳,就会通知clientMap中namespaceId+servicename为key的所有instnce发送一次心跳.

上一篇下一篇

猜你喜欢

热点阅读