Nacos源码解析
完整流程
image.pngNacos服务注册表结构:Map<namespace, Map<group::serviceName, Service>>
举例说明:
image.png
一:服务注册
客户端阅读入口
<!--nacos客户端-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
1. 查看spring.factories文件中帮我们自动装配的类
2.查看自动装配的类
com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration
->com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration#nacosAutoServiceRegistration()
//返回 new NacosAutoServiceRegistration()
3.NacosAutoServiceRegistration里的调用链路
com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration
->com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration#nacosAutoServiceRegistration()
//返回 new NacosAutoServiceRegistration()
//监听spring的WebServerInitializedEvent启动事件时
->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#bind
->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#start
->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#register
->com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
->com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(String serviceName, Instance instance)
//做了两件事
//1.调用com.alibaba.nacos.client.naming.beat.BeatReactor#addBeatInfo(String serviceName, BeatInfo beatInfo)添加心跳信息
//2.调用代理执行注册com.alibaba.nacos.client.naming.net.NamingProxy#registerService(String serviceName, String groupName, Instance instance)
->com.alibaba.nacos.client.naming.net.NamingProxy#reqAPI()
小结:其实就是使用http请求了服务端的注册接口
--
服务端阅读
com.alibaba.nacos.naming.controllers.InstanceController#register
->com.alibaba.nacos.naming.core.ServiceManager#registerInstance
->com.alibaba.nacos.naming.core.ServiceManager#addInstance
->com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put
->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put
->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#onPut
->com.alibaba.nacos.naming.consistency.ephemeral.distro.DataStore#put
->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#addTask
添加到队列里
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
/**
* Add new notify task to queue.
*
* @param datumKey data key
* @param action action for data
*/
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
}
public int getTaskSize() {
return tasks.size();
}
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
Pair<String, DataOperation> pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
调用链路
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#run
->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#handle
->com.alibaba.nacos.naming.core.Service#onChange
->com.alibaba.nacos.naming.core.Service#updateIPs
->com.alibaba.nacos.naming.core.Cluster#updateIps
public void updateIps(List<Instance> ips, boolean ephemeral) {
....
toUpdateInstances = new HashSet<>(ips);
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
为Cluster类的成员变量
private Set<Instance> ephemeralInstances = new HashSet<>();
run()方法什么时候触发?
可以看到,使用了@PostConstruct注解将notifier提交到了一个线程池里面
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
....
private volatile Notifier notifier = new Notifier();
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
....
}
小结:使用了阻塞队列来提高了并发能力,但是否队列会被撑爆?注册成功延时会有多少?
我们可以看到队列大小为1024*1024
同时并发注册的情况应该很小,此外为写内存操作,所以从队列中获取内容进行消费应该也是很快的
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
那还为注册完,消费端直接从注册中心获取是否会读到脏数据?
我们常规的操作可能是直接加锁,写完才运行读,但无疑会影响吞吐量
我们来看nacos是怎么处理的?
回到
public void updateIps(List<Instance> ips, boolean ephemeral) {
//如果为ephemeral 则复制出一份副本
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
//复制操作
for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}
//基于oldIpMap 即我们复制出来的 进行注册操作
List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
....
//最终将 toUpdateInstances 赋值给ephemeralInstances 或者 persistentInstances
toUpdateInstances = new HashSet<>(ips);
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
使用到了写时复制,即读写分离的思想
**那会不会出现多个实例同时写,然后出现覆盖的问题?
我们回顾之前的初始化逻辑,只会初始化一次,所以这里是一个单线程不断从队列里面拿然后执行注册逻辑,所以不会出现覆盖写的问题
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
写时复制会不会占用很多内存空间?
我们可以看到其实只是复制了Cluster里面的Set<Instance>集合(详细的可能看下面的nacos的完整存储模型),而不是复制了整个注册表,所以我们使用写时复制时需要考虑复制的粒度问题
我们再来看下nacos底层的存储模型是什么样的?
/**
* Core manager storing all services in Nacos.
*
* @author nkorange
*/
@Component
public class ServiceManager implements RecordListener<Service> {
/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
}
Service
我们可以看到 'service --> cluster --> instance' model, in which service stores a list of clusters, which contain a list of instances.
一个sercie可能部署了一个集群,一个集群可能会有多个实例
/**
* Service of Nacos server side
*
* <p>We introduce a 'service --> cluster --> instance' model, in which service stores a list of clusters, which
* contain
* a list of instances.
*
* <p>his class inherits from Service in API module and stores some fields that do not have to expose to client.
*
* @author nkorange
*/
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
private Map<String, Cluster> clusterMap = new HashMap<>();
}
Cluster
就包含了我们在updateIps()方法内最终更新的persistentInstances 和ephemeralInstances
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
@JsonIgnore
private Set<Instance> persistentInstances = new HashSet<>();
@JsonIgnore
private Set<Instance> ephemeralInstances = new HashSet<>();
}
小结:它的数据模型就是一个Namespace下面可能会有一个服务分组,一个服务分组下面可能会有多个服务,一个服务可能会有一个集群,一个集群可能会有多个实例
心跳发送
com.alibaba.nacos.client.naming.NacosNamingService#registerInstance()
->com.alibaba.nacos.client.naming.beat.BeatReactor#addBeatInfo
addBeatInfo()方法
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
...
//1.发送心跳
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
...
//继续嵌套调用
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
即客户端最终使用Http请求服务端接口/instance/beat
服务端心跳检查
在服务注册的Init方法中com.alibaba.nacos.naming.core.Service#init
开启了心跳检查
com.alibaba.nacos.naming.healthcheck.HealthCheckReactor#scheduleCheck(com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask)
public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
默认的心跳超时时间
默认15秒
ClientBeatCheckTask.run()中instance.getInstanceHeartBeatTimeOut()
默认的delete时间
默认30秒
ClientBeatCheckTask.run()中instance.getIpDeleteTimeout()