Nacos源码解析

2021-02-28  本文已影响0人  知止9528

完整流程

image.png

Nacos服务注册表结构:Map<namespace, Map<group::serviceName, Service>>


举例说明:


image.png

一:服务注册

客户端阅读入口

<!--nacos客户端-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>

1. 查看spring.factories文件中帮我们自动装配的类

image.png

2.查看自动装配的类

com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration
  ->com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration#nacosAutoServiceRegistration()
  //返回 new NacosAutoServiceRegistration()

3.NacosAutoServiceRegistration里的调用链路

com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration
  ->com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration#nacosAutoServiceRegistration()
  //返回 new NacosAutoServiceRegistration()
     //监听spring的WebServerInitializedEvent启动事件时
   ->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#bind
     ->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#start
       ->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#register
         ->com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
           ->com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(String serviceName, Instance instance)
           //做了两件事
           //1.调用com.alibaba.nacos.client.naming.beat.BeatReactor#addBeatInfo(String serviceName, BeatInfo beatInfo)添加心跳信息
           //2.调用代理执行注册com.alibaba.nacos.client.naming.net.NamingProxy#registerService(String serviceName, String groupName, Instance instance)
             ->com.alibaba.nacos.client.naming.net.NamingProxy#reqAPI()
  

小结:其实就是使用http请求了服务端的注册接口

--

服务端阅读

com.alibaba.nacos.naming.controllers.InstanceController#register
 ->com.alibaba.nacos.naming.core.ServiceManager#registerInstance
  ->com.alibaba.nacos.naming.core.ServiceManager#addInstance
   ->com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put
    ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put
     ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#onPut
      ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DataStore#put
      ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#addTask

添加到队列里

public class Notifier implements Runnable {
        
        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
        
        private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
        
        /**
         * Add new notify task to queue.
         *
         * @param datumKey data key
         * @param action   action for data
         */
        public void addTask(String datumKey, DataOperation action) {
            
            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            tasks.offer(Pair.with(datumKey, action));
        }
        
        public int getTaskSize() {
            return tasks.size();
        }
        
        @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");
            
            for (; ; ) {
                try {
                    Pair<String, DataOperation> pair = tasks.take();
                    handle(pair);
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
        
        private void handle(Pair<String, DataOperation> pair) {
            try {
                String datumKey = pair.getValue0();
                DataOperation action = pair.getValue1();
                
                services.remove(datumKey);
                
                int count = 0;
                
                if (!listeners.containsKey(datumKey)) {
                    return;
                }
                
                for (RecordListener listener : listeners.get(datumKey)) {
                    
                    count++;
                    
                    try {
                        if (action == DataOperation.CHANGE) {
                            listener.onChange(datumKey, dataStore.get(datumKey).value);
                            continue;
                        }
                        
                        if (action == DataOperation.DELETE) {
                            listener.onDelete(datumKey);
                            continue;
                        }
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                    }
                }
                
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO
                            .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                                    datumKey, count, action.name());
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }
    }

调用链路

com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#run
 ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#handle
  ->com.alibaba.nacos.naming.core.Service#onChange
   ->com.alibaba.nacos.naming.core.Service#updateIPs
    ->com.alibaba.nacos.naming.core.Cluster#updateIps
public void updateIps(List<Instance> ips, boolean ephemeral) {
        
        ....
        
        toUpdateInstances = new HashSet<>(ips);
        
        if (ephemeral) {
            ephemeralInstances = toUpdateInstances;
        } else {
            persistentInstances = toUpdateInstances;
        }
    }

为Cluster类的成员变量

private Set<Instance> ephemeralInstances = new HashSet<>();

run()方法什么时候触发?
可以看到,使用了@PostConstruct注解将notifier提交到了一个线程池里面

public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
    
    ....
    
    private volatile Notifier notifier = new Notifier();
    
    @PostConstruct
    public void init() {
        GlobalExecutor.submitDistroNotifyTask(notifier);
    }
    
    ....
}

小结:使用了阻塞队列来提高了并发能力,但是否队列会被撑爆?注册成功延时会有多少?

我们可以看到队列大小为1024*1024
同时并发注册的情况应该很小,此外为写内存操作,所以从队列中获取内容进行消费应该也是很快的

private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);

那还为注册完,消费端直接从注册中心获取是否会读到脏数据?

我们常规的操作可能是直接加锁,写完才运行读,但无疑会影响吞吐量

我们来看nacos是怎么处理的?
回到

public void updateIps(List<Instance> ips, boolean ephemeral) {
        //如果为ephemeral 则复制出一份副本
        Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
        
        HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
        
        //复制操作
        for (Instance ip : toUpdateInstances) {
            oldIpMap.put(ip.getDatumKey(), ip);
        }
        
        //基于oldIpMap 即我们复制出来的 进行注册操作
        List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
        
        ....

        //最终将 toUpdateInstances 赋值给ephemeralInstances 或者 persistentInstances
        toUpdateInstances = new HashSet<>(ips);
        
        if (ephemeral) {
            ephemeralInstances = toUpdateInstances;
        } else {
            persistentInstances = toUpdateInstances;
        }
    }

使用到了写时复制,即读写分离的思想
**那会不会出现多个实例同时写,然后出现覆盖的问题?

我们回顾之前的初始化逻辑,只会初始化一次,所以这里是一个单线程不断从队列里面拿然后执行注册逻辑,所以不会出现覆盖写的问题

@PostConstruct
    public void init() {
        GlobalExecutor.submitDistroNotifyTask(notifier);
    }

写时复制会不会占用很多内存空间?

我们可以看到其实只是复制了Cluster里面的Set<Instance>集合(详细的可能看下面的nacos的完整存储模型),而不是复制了整个注册表,所以我们使用写时复制时需要考虑复制的粒度问题


我们再来看下nacos底层的存储模型是什么样的?

/**
 * Core manager storing all services in Nacos.
 *
 * @author nkorange
 */
@Component
public class ServiceManager implements RecordListener<Service> {
    
    /**
     * Map(namespace, Map(group::serviceName, Service)).
     */
    private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
}

Service
我们可以看到 'service --> cluster --> instance' model, in which service stores a list of clusters, which contain a list of instances.
一个sercie可能部署了一个集群,一个集群可能会有多个实例

/**
 * Service of Nacos server side
 *
 * <p>We introduce a 'service --> cluster --> instance' model, in which service stores a list of clusters, which
 * contain
 * a list of instances.
 *
 * <p>his class inherits from Service in API module and stores some fields that do not have to expose to client.
 *
 * @author nkorange
 */
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
private Map<String, Cluster> clusterMap = new HashMap<>();
}

Cluster
就包含了我们在updateIps()方法内最终更新的persistentInstances 和ephemeralInstances

public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
    @JsonIgnore
    private Set<Instance> persistentInstances = new HashSet<>();
    
    @JsonIgnore
    private Set<Instance> ephemeralInstances = new HashSet<>();
}

小结:它的数据模型就是一个Namespace下面可能会有一个服务分组,一个服务分组下面可能会有多个服务,一个服务可能会有一个集群,一个集群可能会有多个实例


心跳发送

com.alibaba.nacos.client.naming.NacosNamingService#registerInstance()
  ->com.alibaba.nacos.client.naming.beat.BeatReactor#addBeatInfo

addBeatInfo()方法

class BeatTask implements Runnable {
        
        BeatInfo beatInfo;
        
        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }
        
        @Override
        public void run() {
            if (beatInfo.isStopped()) {
                return;
            }
            long nextTime = beatInfo.getPeriod();
            ...
            //1.发送心跳
            JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);

            ...
            //继续嵌套调用
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
    }

即客户端最终使用Http请求服务端接口/instance/beat


服务端心跳检查

在服务注册的Init方法中com.alibaba.nacos.naming.core.Service#init
开启了心跳检查
com.alibaba.nacos.naming.healthcheck.HealthCheckReactor#scheduleCheck(com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask)

public void init() {
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }

默认的心跳超时时间
默认15秒

ClientBeatCheckTask.run()中instance.getInstanceHeartBeatTimeOut()

默认的delete时间
默认30秒

ClientBeatCheckTask.run()中instance.getIpDeleteTimeout()
上一篇下一篇

猜你喜欢

热点阅读