注册中心-Nacos

nacos源码1-配置中心-客户端

2019-01-05  本文已影响0人  modou1618

nacos官方主页

一 结构

结构.png

二 ConfigFactory

public class ConfigFactory {
    public static ConfigService createConfigService(Properties properties) throws NacosException {
        ...
    }

    public static ConfigService createConfigService(String serverAddr) throws NacosException {
        ...
    }
}

三 NacosConfigService

3.1 配置管理

3.1.1 配置发布,删除

接口文档配置请求参数,调用ServerHttpAgent接口

3.1.2 配置获取

String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant)

3.2 配置变更监听管理

调用ClientWork的配置变更监听添加删除接口

四 ClientWork

4.1 获取配置接口

调用ServerHttpAgent接口获取配置,获取成功则保存或删除本地snapshot配置。

HttpResult result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
switch (result.code) {
            case HttpURLConnection.HTTP_OK:
                LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
                return result.content;
            case HttpURLConnection.HTTP_NOT_FOUND:
                LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
                return null;
...
}

4.2 配置变更监听存储

static public String getKeyTenant(String dataId, String group, String tenant) {
        StringBuilder sb = new StringBuilder();
        urlEncode(dataId, sb);
        sb.append('+');
        urlEncode(group, sb);
        if (StringUtils.isNotEmpty(tenant)) {
            sb.append('+');
            urlEncode(tenant, sb);
        }
        return sb.toString();
    }

4.3 配置变更监听处理

4.3.1 定时任务

executor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    log.error(agent.getName(), "NACOS-XXXX", "[sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);

public void checkConfigInfo() {
        // 分任务
        int listenerSize = cacheMap.get().size();
        // 向上取整为批数
        int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
                // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
                executorService.execute(new LongPullingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }

4.3.2 LongPullingRunnable

4.3.2.1 failover配置变更监听

// check failover config
                for (CacheData cacheData : cacheMap.get().values()) {
                    if (cacheData.getTaskId() == taskId) {
                        cacheDatas.add(cacheData);
                        try {
                            checkLocalConfig(cacheData);
                            if (cacheData.isUseLocalConfigInfo()) {
                                cacheData.checkListenerMd5();
                            }
                        } catch (Exception e) {
                            log.error("NACOS-CLIENT", "get local config info error", e);
                        }
                    }
                }
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);

4.3.2.2 服务端配置变更监听

4.3.2.3 配置变更监听回调

String content = getServerConfig(dataId, group, tenant, 3000L);
                        CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                        cache.setContent(content);
void checkListenerMd5() {
        for (ManagerListenerWrap wrap : listeners) {
            if (!md5.equals(wrap.lastCallMd5)) {
                safeNotifyListener(dataId, group, content, md5, wrap);
            }
        }
    }
Runnable job = new Runnable() {
            public void run() {
                ...
                listener.receiveConfigInfo(contentTmp);
                listenerWrap.lastCallMd5 = md5;
                ...
            }
        };

if (null != listener.getExecutor()) {
                listener.getExecutor().execute(job);
            } else {
                job.run();
            }

五 ServerHttpAgent

5.1 ServerListManager配置服务端管理

5.1.1 配置服务器地址

public void refreshCurrentServerAddr() {
        currentServerAddr = iterator().next();
    }

    public String getCurrentServerAddr() {
        if (StringUtils.isBlank(currentServerAddr)) {
            currentServerAddr = iterator().next();
        }
        return currentServerAddr;
    }

5.1.2 配置地址服务器

使用定时线程,从地址服务器获取最新配置服务器地址列表,更新本地缓存List<String> serverUrls

5.2 Limiter调用限速

private static int CAPACITY_SIZE = 1000;
    private static Cache<String, RateLimiter> cache = CacheBuilder.newBuilder()
        .initialCapacity(CAPACITY_SIZE).expireAfterAccess(1, TimeUnit.MINUTES)
        .build();
rateLimiter = cache.get(accessKeyID, new Callable<RateLimiter>() {
                @Override
                public RateLimiter call() throws Exception {
                    return RateLimiter.create(5);
                }
            });
上一篇 下一篇

猜你喜欢

热点阅读