nacos2.0.4配置监听分析

2022-03-15  本文已影响0人  SparkOnly

本篇文章从界面发布配置开始,分析整个配置发布到应用客户端变更的通用过程

主体流程

客户端

ClientWorker,每个5秒执行一次配置监听(发送ConfigBatchListenRequest)。如果时间间隔超过5分钟,则同步所有配置

服务端

ConfigController修改配置

  1. ConfigChangePublisher发布ConfigDataChangeEvent

  2. AsyncNotifyService监听ConfigDataChangeEvent
    2.1 执行AsyncRpcTask(含有各个成员的NotifySingleRpcTask的queue)
    2.2 遍历queue
    2.2.1 如果是自己,dump
    2.2.2 如果是成员,异步同步配置改变ConfigChangeClusterSyncRequest(dataId, group, isBata, lastModified, tag, tenant)
    接收节点,dump,其余流程同3

  3. 异步执行DumpProcessor
    3.1 从config_info里查询出配置内容
    3.2 执行ConfigCacheService.dump
    3.2.1 保存到data目录
    3.2.2 更新CacheItem的md5和修改时间,发布LocalDataChangeEvent事件

  4. RpcConfigChangeNotifier监听到LocalDataChangeEvent,遍历监听的所有客户端连接,构建 ConfigChangeNotifyRequest,异步执行RpcPushTask,推送变更

  5. 客户端ClientWorker接受到ConfigChangeNotifyRequest事件后,将对应的缓存置为不同步,调用配置监听动作

服务端处理

  1. 页面请求,可以看到,调用了/v1/cs/config接口,对应的就是com.alibaba.nacos.config.server.controller.ConfigController#publishConfig方法


    页面发布配置
  2. ConfigController#publishConfig处理
    在没有灰度IP,没有标签的情况下,会保存配置信息到config_info表,插入历史记录表his_config_info
    同时发布配置变更事件:ConfigDataChangeEvent
    具体代码如下:
@PostMapping
    @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
    public Boolean publishConfig(...) throws NacosException {
        ...
        final Timestamp time = TimeUtils.getCurrentTime();
        # 灰度IP
        String betaIps = request.getHeader("betaIps");
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        configInfo.setType(type);
        if (StringUtils.isBlank(betaIps)) {
            if (StringUtils.isBlank(tag)) {
                # 没有标签,则保存信息到表config_info,插入历史记录表his_config_info
                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
                # 发布ConfigDataChangeEvent事件
                ConfigChangePublisher
                        .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
                ConfigChangePublisher.notifyConfigChange(
                        new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
            }
        } else {
            // beta publish
            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }
        ConfigTraceService
                .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),
                        ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
        return true;
    }
  1. 接下来到监听配置变更事件的AsyncNotifyService
    首先是事先注册好监听的事件
    事件处理器里,可以看到遍历成员,会构建两个队列,一个是http队列,一个是rpc队列。
    对于支持长连接的成员,会构建NotifySingleRpcTask任务,放入rpc队列。
    对于rpc客户端,异步执行AsyncRpcTask任务
@Autowired
    public AsyncNotifyService(ServerMemberManager memberManager) {
        this.memberManager = memberManager;
        
        // Register ConfigDataChangeEvent to NotifyCenter.
        NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
        
        // Register A Subscriber to subscribe ConfigDataChangeEvent.
        NotifyCenter.registerSubscriber(new Subscriber() {
            
            @Override
            public void onEvent(Event event) {
                // Generate ConfigDataChangeEvent concurrently
                if (event instanceof ConfigDataChangeEvent) {
                    ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                    long dumpTs = evt.lastModifiedTs;
                    String dataId = evt.dataId;
                    String group = evt.group;
                    String tenant = evt.tenant;
                    String tag = evt.tag;
                    Collection<Member> ipList = memberManager.allMembers();
                    
                    // In fact, any type of queue here can be
                    Queue<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>();
                    Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();
                    # 遍历所有成员
                    for (Member member : ipList) {
                        if (!MemberUtil.isSupportedLongCon(member)) {
                            httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                                    evt.isBeta));
                        } else {
                            rpcQueue.add(
                                    new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
                        }
                    }
                    if (!httpQueue.isEmpty()) {
                        ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));
                    }
                    if (!rpcQueue.isEmpty()) {
                        # 异步执行rpc任务
                        ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
                    }
                    
                }
            }
            
            @Override
            public Class<? extends Event> subscribeType() {
                return ConfigDataChangeEvent.class;
            }
        });
    }
  1. AsyncNotifyService.AsyncRpcTask#run方法
    这里遍历第3步构建的队列,取出每个rpc任务,构建ConfigChangeClusterSyncRequest 请求
    4.1 如果成员是自己,dump请求
    4.2 如果成员为成员列表里的其他成员,发送ConfigChangeClusterSyncRequest到对应节点
public void run() {
            while (!queue.isEmpty()) {
                NotifySingleRpcTask task = queue.poll();
                
                ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
                syncRequest.setDataId(task.getDataId());
                syncRequest.setGroup(task.getGroup());
                syncRequest.setBeta(task.isBeta);
                syncRequest.setLastModified(task.getLastModified());
                syncRequest.setTag(task.tag);
                syncRequest.setTenant(task.getTenant());
                Member member = task.member;
                # 任务的成员为自己
                if (memberManager.getSelf().equals(member)) {
                    if (syncRequest.isBeta()) {
                        dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                                syncRequest.getLastModified(), NetUtils.localIP(), true);
                    } else {
                        # 正常发布
                        dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                                syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
                    }
                    continue;
                }
                # 成员列表里的其他成员
                if (memberManager.hasMember(member.getAddress())) {
                    // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
                    boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());
                    if (unHealthNeedDelay) {
                        // target ip is unhealthy, then put it in the notification list
                        ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                                task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
                                0, member.getAddress());
                        // get delay time and set fail count to the task
                        asyncTaskExecute(task);
                    } else {
    
                        if (!MemberUtil.isSupportedLongCon(member)) {
                            asyncTaskExecute(
                                    new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,
                                            task.getLastModified(), member.getAddress(), task.isBeta));
                        } else {
                            try {
                                configClusterRpcClientProxy
                                        .syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
                            } catch (Exception e) {
                                MetricsMonitor.getConfigNotifyException().increment();
                                asyncTaskExecute(task);
                            }
                        }
                      
                    }
                } else {
                    //No nothig if  member has offline.
                }
                
            }
        }

4.3 其他成员节点,ConfigChangeClusterSyncRequestHandler,接收到ConfigChangeClusterSyncRequest ,同样执行dump操作


其他节点,同样dump

5 dump操作
com.alibaba.nacos.config.server.service.dump.DumpService#dump
5.1 添加一个DumpTask任务

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
            boolean isBeta) {
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
        dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
        DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
    }

5.2 在DumpService实例化时,设置了dupTaskMgr的默认任务处理器DumpProcessor


dumpTaskMgr默认任务处理器

6 Dump任务处理
com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process
这里主要是根据任务里的参数查找配置信息,执行DumpConfigHandler#configDump

public boolean process(NacosTask task) {
        final PersistService persistService = dumpService.getPersistService();
        DumpTask dumpTask = (DumpTask) task;
        String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
        String dataId = pair[0];
        String group = pair[1];
        String tenant = pair[2];
        long lastModified = dumpTask.getLastModified();
        String handleIp = dumpTask.getHandleIp();
        boolean isBeta = dumpTask.isBeta();
        String tag = dumpTask.getTag();
        
        ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
                .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
        
        if (isBeta) {
            // if publish beta, then dump config, update beta cache
            ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant);
            
            build.remove(Objects.isNull(cf));
            build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
            build.content(Objects.isNull(cf) ? null : cf.getContent());
            
            return DumpConfigHandler.configDump(build.build());
        }
        if (StringUtils.isBlank(tag)) {
            # 查找配置信息
            ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);

            build.remove(Objects.isNull(cf));
            build.content(Objects.isNull(cf) ? null : cf.getContent());
            build.type(Objects.isNull(cf) ? null : cf.getType());
        } else {
            ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);

            build.remove(Objects.isNull(cf));
            build.content(Objects.isNull(cf) ? null : cf.getContent());

        }
        return DumpConfigHandler.configDump(build.build());
    }

7 配置dump
com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump
更新事件中,调用ConfigCacheService.dump执行dump操作

public static boolean configDump(ConfigDumpEvent event) {
        final String dataId = event.getDataId();
        final String group = event.getGroup();
        final String namespaceId = event.getNamespaceId();
        final String content = event.getContent();
        final String type = event.getType();
        final long lastModified = event.getLastModifiedTs();
        if (event.isBeta()) {
            ...
        }
        if (StringUtils.isBlank(event.getTag())) {
            ...
            boolean result;
            if (!event.isRemove()) {
               # 非删除事件,执行dump操作
                result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);
                
                if (result) {
                    ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                            ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                            content.length());
                }
            } else {
                result = ConfigCacheService.remove(dataId, group, namespaceId);
                
                if (result) {
                    ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                            ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
                }
            }
            return result;
        } else {
            ...
        }
        
    }

8 ConfigCacheService#dump操作
保存配置文件,更新cache里的md5值,同时发布LocalDataChangeEvent事件

public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
            String type) {
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        CacheItem ci = makeSure(groupKey);
        ci.setType(type);
        final int lockResult = tryWriteLock(groupKey);
        assert (lockResult != 0);
        
        if (lockResult < 0) {
            DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
            return false;
        }
        
        try {
            final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
            
            if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
                DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
                                + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
                        lastModifiedTs);
            } else if (!PropertyUtil.isDirectRead()) {
               # 保存配置到目录/data/tenant-config-data
                DiskUtil.saveToDisk(dataId, group, tenant, content);
            }
            # 更新缓存里的md5,发布LocalDataChangeEvent事件
            updateMd5(groupKey, md5, lastModifiedTs);
            return true;
        } catch (IOException ioe) {
            DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
            if (ioe.getMessage() != null) {
                String errMsg = ioe.getMessage();
                if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
                        .contains(DISK_QUATA_EN)) {
                    // Protect from disk full.
                    FATAL_LOG.error("磁盘满自杀退出", ioe);
                    System.exit(0);
                }
            }
            return false;
        } finally {
            releaseWriteLock(groupKey);
        }
    }

更新md5的操作。更新cache里的md5和最后修改时间,发布LocalDataChangeEvent事件

public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
        CacheItem cache = makeSure(groupKey);
        if (cache.md5 == null || !cache.md5.equals(md5)) {
            cache.md5 = md5;
            cache.lastModifiedTs = lastModifiedTs;
            NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
        }
    }

9 RpcConfigChangeNotifier处理LocalDataChangeEvent事件
遍历groupKey对应的所有客户端连接,构造ConfigChangeNotifyRequest 请求,推送给客户端

public void onEvent(LocalDataChangeEvent event) {
        String groupKey = event.groupKey;
        boolean isBeta = event.isBeta;
        List<String> betaIps = event.betaIps;
        String[] strings = GroupKey.parseKey(groupKey);
        String dataId = strings[0];
        String group = strings[1];
        String tenant = strings.length > 2 ? strings[2] : "";
        String tag = event.tag;
        
        configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);
        
    }

public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,
            List<String> betaIps, String tag) {
        
        Set<String> listeners = configChangeListenContext.getListeners(groupKey);
        if (CollectionUtils.isEmpty(listeners)) {
            return;
        }
        int notifyClientCount = 0;
        for (final String client : listeners) {
            Connection connection = connectionManager.getConnection(client);
            if (connection == null) {
                continue;
            }

            //beta ips check.
            String clientIp = connection.getMetaInfo().getClientIp();
            String clientTag = connection.getMetaInfo().getTag();
            if (isBeta && betaIps != null && !betaIps.contains(clientIp)) {
                continue;
            }
            //tag check
            if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {
                continue;
            }

            ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);

            RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp,
                    connection.getMetaInfo().getAppName());
            push(rpcPushRetryTask);
            notifyClientCount++;
        }
        Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyClientCount, groupKey);
    }

客户端处理

  1. 初始化ClientWorker,构建ConfigRpcTransportClient,每隔5秒执行配置监听


    初始化ConfigRpcTransportClient
    定期执行配置监听
  2. 配置监听逻辑,如果距离上次全量同步时间达到5分钟,则全量同步
    2.1 遍历所有缓存的数据,跳过已同步的缓存,根据缓存是否存在监听器,构造listenCachesMap和removeListenCachesMap
    2.2 遍历listenCachesMap,构造ConfigBatchListenRequest 请求,发送到服务端。根据响应构造changeKey,从配置中心拉取配置,检查监听器的md5和数据的md5是否一致,不一致就调用监听器的监听方法
    2.3 遍历removeListenCachesMap,构造ConfigBatchListenRequest 请求,移除服务端的监听。如果成功则移除本地缓存
public void executeConfigListen() {
            Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
            Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
            long now = System.currentTimeMillis();
            boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
            for (CacheData cache : cacheMap.get().values()) {
                synchronized (cache) {
                    //check local listeners consistent.
                    # 如果本缓存已经和服务端同步 && 不需要全量同步,就跳过处理
                    if (cache.isSyncWithServer()) {
                        cache.checkListenerMd5();
                        if (!needAllSync) {
                            continue;
                        }
                    }
                    
                    if (!CollectionUtils.isEmpty(cache.getListeners())) {
                        //get listen  config
                        if (!cache.isUseLocalConfigInfo()) {
                            List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
                            if (cacheDatas == null) {
                                cacheDatas = new LinkedList<>();
                                listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
                            }
                            cacheDatas.add(cache);
                            
                        }
                    } else if (CollectionUtils.isEmpty(cache.getListeners())) {
                        if (!cache.isUseLocalConfigInfo()) {
                            List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
                            if (cacheDatas == null) {
                                cacheDatas = new LinkedList<>();
                                removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
                            }
                            cacheDatas.add(cache);
                            
                        }
                    }
                }
                
            }
            
            boolean hasChangedKeys = false;
            # 有监听器的缓存处理
            if (!listenCachesMap.isEmpty()) {
                for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
                    String taskId = entry.getKey();
                    Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2);
                    
                    List<CacheData> listenCaches = entry.getValue();
                    for (CacheData cacheData : listenCaches) {
                        timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant),
                                cacheData.getLastModifiedTs().longValue());
                    }
                    
                    ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
                    configChangeListenRequest.setListen(true);
                    try {
                        RpcClient rpcClient = ensureRpcClient(taskId);
                        ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(
                                rpcClient, configChangeListenRequest);
                        if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
                            
                            Set<String> changeKeys = new HashSet<String>();
                            //handle changed keys,notify listener
                            if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
                                hasChangedKeys = true;
                                for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse
                                        .getChangedConfigs()) {
                                    String changeKey = GroupKey
                                            .getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),
                                                    changeConfig.getTenant());
                                    changeKeys.add(changeKey);
                                    boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
                                    refreshContentAndCheck(changeKey, !isInitializing);
                                }
                                
                            }
                            
                            //handler content configs
                            for (CacheData cacheData : listenCaches) {
                                String groupKey = GroupKey
                                        .getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
                                if (!changeKeys.contains(groupKey)) {
                                    //sync:cache data md5 = server md5 && cache data md5 = all listeners md5.
                                    synchronized (cacheData) {
                                        if (!cacheData.getListeners().isEmpty()) {
                                            
                                            Long previousTimesStamp = timestampMap.get(groupKey);
                                            if (previousTimesStamp != null && !cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp,
                                                    System.currentTimeMillis())) {
                                                continue;
                                            }
                                            cacheData.setSyncWithServer(true);
                                        }
                                    }
                                }
                                
                                cacheData.setInitializing(false);
                            }
                            
                        }
                    } catch (Exception e) {
                        
                        LOGGER.error("Async listen config change error ", e);
                        try {
                            Thread.sleep(50L);
                        } catch (InterruptedException interruptedException) {
                            //ignore
                        }
                    }
                }
            }
            # 无监听器的缓存处理
            if (!removeListenCachesMap.isEmpty()) {
                for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
                    String taskId = entry.getKey();
                    List<CacheData> removeListenCaches = entry.getValue();
                    ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);
                    configChangeListenRequest.setListen(false);
                    try {
                        RpcClient rpcClient = ensureRpcClient(taskId);
                        boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);
                        if (removeSuccess) {
                            for (CacheData cacheData : removeListenCaches) {
                                synchronized (cacheData) {
                                    if (cacheData.getListeners().isEmpty()) {
                                        ClientWorker.this
                                                .removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
                                    }
                                }
                            }
                        }
                        
                    } catch (Exception e) {
                        LOGGER.error("async remove listen config change error ", e);
                    }
                    try {
                        Thread.sleep(50L);
                    } catch (InterruptedException interruptedException) {
                        //ignore
                    }
                }
            }
            
            if (needAllSync) {
                lastAllSyncTime = now;
            }
            //If has changed keys,notify re sync md5.
            if (hasChangedKeys) {
                notifyListenConfig();
            }
        }

private void refreshContentAndCheck(String groupKey, boolean notify) {
        if (cacheMap.get() != null && cacheMap.get().containsKey(groupKey)) {
            CacheData cache = cacheMap.get().get(groupKey);
            # 获取服务端配置,检查md5和缓存的是否一致,不一致则执行监听器方法
            refreshContentAndCheck(cache, notify);
        }
    }
    
    private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
        try {
            # 获取服务端配置
            ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L,
                    notify);
            cacheData.setContent(response.getContent());
            cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
            if (null != response.getConfigType()) {
                cacheData.setType(response.getConfigType());
            }
            if (notify) {
                LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                        agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),
                        ContentUtils.truncateContent(response.getContent()), response.getConfigType());
            }
            # 缓存数据的md5和监听器的md5不一致,
            cacheData.checkListenerMd5();
        } catch (Exception e) {
            LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId,
                    cacheData.group, cacheData.tenant, e);
        }
    }
  1. CacheData
    检查监听器缓存的md5是否和数据的md5一致,不一致则触发监听器的receiveConfigChange,推送变更
void checkListenerMd5() {
        for (ManagerListenerWrap wrap : listeners) {
            if (!md5.equals(wrap.lastCallMd5)) {
                safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
            }
        }
    }

private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
            final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {
        final Listener listener = listenerWrap.listener;
        if (listenerWrap.inNotifying) {
            LOGGER.warn(
                    "[{}] [notify-currentSkip] dataId={}, group={}, md5={}, listener={}, listener is not finish yet,will try next time.",
                    name, dataId, group, md5, listener);
            return;
        }
        Runnable job = () -> {
            long start = System.currentTimeMillis();
            ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
            ClassLoader appClassLoader = listener.getClass().getClassLoader();
            try {
                if (listener instanceof AbstractSharedListener) {
                    AbstractSharedListener adapter = (AbstractSharedListener) listener;
                    adapter.fillContext(dataId, group);
                    LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                }
                // Before executing the callback, set the thread classloader to the classloader of
                // the specific webapp to avoid exceptions or misuses when calling the spi interface in
                // the callback method (this problem occurs only in multi-application deployment).
                Thread.currentThread().setContextClassLoader(appClassLoader);
                
                ConfigResponse cr = new ConfigResponse();
                cr.setDataId(dataId);
                cr.setGroup(group);
                cr.setContent(content);
                cr.setEncryptedDataKey(encryptedDataKey);
                configFilterChainManager.doFilter(null, cr);
                String contentTmp = cr.getContent();
                listenerWrap.inNotifying = true;
                // 监听器执行监听动作
                listener.receiveConfigInfo(contentTmp);
                // compare lastContent and content
                if (listener instanceof AbstractConfigChangeListener) {
                    Map data = ConfigChangeHandler.getInstance()
                            .parseChangeData(listenerWrap.lastContent, content, type);
                    ConfigChangeEvent event = new ConfigChangeEvent(data);
                    ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
                    listenerWrap.lastContent = content;
                }
                
                listenerWrap.lastCallMd5 = md5;
                LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name,
                        dataId, group, md5, listener, (System.currentTimeMillis() - start));
            } catch (NacosException ex) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
                        name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
            } catch (Throwable t) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
                        group, md5, listener, t.getCause());
            } finally {
                listenerWrap.inNotifying = false;
                Thread.currentThread().setContextClassLoader(myClassLoader);
            }
        };
        
        final long startNotify = System.currentTimeMillis();
        try {
            if (null != listener.getExecutor()) {
                listener.getExecutor().execute(job);
            } else {
                try {
                    INTERNAL_NOTIFIER.submit(job);
                } catch (RejectedExecutionException rejectedExecutionException) {
                    LOGGER.warn(
                            "[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, no available internal notifier,will sync notifier ",
                            name, dataId, group, md5, listener);
                    job.run();
                } catch (Throwable throwable) {
                    LOGGER.error(
                            "[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, submit internal async task fail,throwable= ",
                            name, dataId, group, md5, listener, throwable);
                    job.run();
                }
            }
        } catch (Throwable t) {
            LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
                    group, md5, listener, t.getCause());
        }
        final long finishNotify = System.currentTimeMillis();
        LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
                name, (finishNotify - startNotify), dataId, group, md5, listener);
    }
  1. RpcClient监听服务端主动推送的配置变更ConfigChangeNotifyRequest
    这里会修改缓存的最后修改时间,将缓存状态置为不同步,触发配置监听动作


    RpcClient监听配置变更
上一篇下一篇

猜你喜欢

热点阅读