Dubbo配置中心

2021-02-09  本文已影响0人  爱健身的兔子

1 Dubbo配置介绍

Dubbo支持的配置默认有四种来源:

配置的覆盖关系如下:

image

2 动态配置中心

配置中心在 Dubbo 中承担两个职责:

  1. 外部化配置。启动配置的集中式存储 (简单理解为 dubbo.properties 的外部化存储)。
  2. 服务治理。服务治理规则的存储与通知。

例如:

<dubbo:config-center address="zookeeper://127.0.0.1:2181"/>

3 ConfigCenterBean

ConfigCenterBean可以设置配置中心地址、连接超时时间、是否优先级最高等。还可以设置连接配置中心的用户名、密码。

3.1 includeSpringEnv属性

includeSpringEnv用于设置是否需要从spring的Environment对象(注意dubbo也有一个Environment对象,两个对象不是一个类)中获取配置信息。默认为falseConfigCenterBean实现了EnvironmentAware接口,在启动的时候会调用setEnvironment()方法:

    public void setEnvironment(Environment environment) {
        if (includeSpringEnv) {
            // Get PropertySource mapped to 'dubbo.properties' in Spring Environment.
            setExternalConfig(getConfigurations(getConfigFile(), environment));
            // Get PropertySource mapped to 'application.dubbo.properties' in Spring Environment.
            setAppExternalConfig(getConfigurations(StringUtils.isNotEmpty(getAppConfigFile()) ? getAppConfigFile() : ("application." + getConfigFile()), environment));
        }
    }

如果includeSpringEnv设置为true,那么将从springEnvironment对象中读取key为“dubbo.properties”和“application.dubbo.properties”的配置值,并分别设置给属性externalConfigurationappExternalConfiguration

DubboBootstrap会对Environment初始化,初始化的时候将ConfigCenterBeanexternalConfigurationappExternalConfiguration的值设置到Environment对象的externalConfigurationMapappExternalConfigurationMap

public void initialize() throws IllegalStateException {
        ConfigManager configManager = ApplicationModel.getConfigManager();
        Optional<Collection<ConfigCenterConfig>> defaultConfigs = configManager.getDefaultConfigCenter();
        defaultConfigs.ifPresent(configs -> {
            for (ConfigCenterConfig config : configs) {
                //externalConfigurationMap和appExternalConfigurationMap是Map对象
                this.setExternalConfigMap(config.getExternalConfiguration());
                this.setAppExternalConfigMap(config.getAppExternalConfiguration());
            }
        });
    }

4 配置中心初始化

DubboBootstrap初始化(initialize方法)时要调用startConfigCenter方法,该方法代码如下:

    private void startConfigCenter() {
        //获取所有ConfigCenterConfig对象,ConfigCenterBean是其子类,其实这个位置获得是ConfigCenterBean对象
        Collection<ConfigCenterConfig> configCenters = configManager.getConfigCenters();
        if (CollectionUtils.isNotEmpty(configCenters)) {
            CompositeDynamicConfiguration compositeDynamicConfiguration = new CompositeDynamicConfiguration();
            //遍历每个ConfigCenterBean对象
            for (ConfigCenterConfig configCenter : configCenters) {
                //使用java系统属性等设置ConfigCenterBean对象的属性
                configCenter.refresh();
                //校验ConfigCenterBean对象的parameters是否合法
                ConfigValidationUtils.validateConfigCenterConfig(configCenter);
                //prepareEnvironment方法建立与配置中心的连接,拉取配置数据,并保存到本地
                  compositeDynamicConfiguration.addConfiguration(prepareEnvironment(configCenter));
            }
            environment.setDynamicConfiguration(compositeDynamicConfiguration);
        }
        //使用配置中心的配置更新如下对象的属性:
        //ApplicationConfig、MonitorConfig、ModuleConfig、ProtocolConfig、RegistryConfig、
        //ProviderConfig、ConsumerConfig。
        //更新对象属性时,使用如下规则搜索配置:dubbo.类名去掉Config.id值.属性名,
        //比如更新id为“test”的ProviderConfig的threads属性时,
        //从配置中心搜索key=dubbo.provider.test.threads,如果能找到就更新threads属性。
        configManager.refreshAll();
    }
    private DynamicConfiguration prepareEnvironment(ConfigCenterConfig configCenter) {
        //ConfigCenterConfig必须配置address,否则为无效
        if (configCenter.isValid()) {
            if (!configCenter.checkOrUpdateInited()) {
                return null;
            }
            //构建连接配置中心的url,url中包含了ip、端口、协议等
            //getDynamicConfiguration根据url的协议使用SPI创建DynamicConfiguration对象
            //DynamicConfiguration对象建立与配置中心的连接
            DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl());
            //从配置中心拉取key=dubbo.properties,group=dubbo的值(这两个值都是默认值,我们可以通过修改属性configFile来修改key)
            String configContent = dynamicConfiguration.getProperties(configCenter.getConfigFile(), configCenter.getGroup());
            String appGroup = getApplication().getName();
            String appConfigContent = null;
            if (isNotEmpty(appGroup)) {
                //从配置中心拉取应用配置,group是应用名,key是appConfigFile的值,
                //如果appConfigFile=null,则使用configFile,
                //默认是使用configFile的值,也就是dubbo.properties
                appConfigContent = dynamicConfiguration.getProperties
                        (isNotEmpty(configCenter.getAppConfigFile()) ? configCenter.getAppConfigFile() : configCenter.getConfigFile(),
                                appGroup
                        );
            }
            try {
                environment.setConfigCenterFirst(configCenter.isHighestPriority())           
                //将配置信息保存到Environment的Map属性中,后面的配置会覆盖之前的
                environment.updateExternalConfigurationMap(parseProperties(configContent));
                environment.updateAppExternalConfigurationMap(parseProperties(appConfigContent));
            } catch (IOException e) {
                throw new IllegalStateException("Failed to parse configurations from Config Center.", e);
            }
            return dynamicConfiguration;
        }
        return null;
    }

上面的代码是创建动态配置中心并连接配置中心拉取数据。

4.1 DynamicConfiguration

DynamicConfigurationConfiguration的子接口,用于连接外部配置,具体实现类包括:

下面介绍ZookeeperDynamicConfiguration创建连接和监听。

4.2 ZookeeperDynamicConfiguration

Zookeeper默认所有的配置都存储在 /dubbo/config 节点,具体节点结构图如下:

image
4.3 创建连接

prepareEnvironment方法通过getDynamicConfiguration方法获取与动态配置中心的连接。

static DynamicConfiguration getDynamicConfiguration(URL connectionURL) {
        //获取连接配置中心使用的协议,下面分析以zk为例
        String protocol = connectionURL.getProtocol();
        //使用SPI加载DynamicConfigurationFactory对象,其支持的协议以及对应的类可以参见文件
        //org.apache.dubbo.common.config.configcenter.DynamicConfigurationFactory。
        DynamicConfigurationFactory factory = getDynamicConfigurationFactory(protocol);
        //使用DynamicConfigurationFactory创建DynamicConfiguration
        return factory.getDynamicConfiguration(connectionURL);
    }
    static DynamicConfigurationFactory getDynamicConfigurationFactory(String name) {
        Class<DynamicConfigurationFactory> factoryClass = DynamicConfigurationFactory.class;
        ExtensionLoader<DynamicConfigurationFactory> loader = getExtensionLoader(factoryClass);
        return loader.getOrDefaultExtension(name);
    }

下面以创建ZookeeperDynamicConfiguration为例,构造动态配置中心。

ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
        this.url = url;
        //构建访问配置中心的根路径,默认是:/dubbo/config/
        rootPath = PATH_SEPARATOR + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
        initializedLatch = new CountDownLatch(1);
        //创建监听器
        this.cacheListener = new CacheListener(rootPath, initializedLatch);
        //创建单个线程,用于处理被监听的事件
        this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
        //建立与配置中心的连接
        zkClient = zookeeperTransporter.connect(url);
        //设置监听器和监听目录
        zkClient.addDataListener(rootPath, cacheListener, executor);
        try {
             //可以通过ConfigCenterBean的parameters设置init.timeout的值,init.timeout表示建立链接的超时时间
            long timeout = url.getParameter("init.timeout", 5000);
            boolean isCountDown = this.initializedLatch.await(timeout, TimeUnit.MILLISECONDS);
            if (!isCountDown) {
                throw new IllegalStateException("Failed to receive INITIALIZED event from zookeeper, pls. check if url "
                        + url + " is correct");
            }
        } catch (InterruptedException e) {
            logger.warn("Failed to build local cache for config center (zookeeper)." + url);
        }
    }
4.4 监听配置中心

建立与配置中心链接时,在ZookeeperDynamicConfiguration的构造方法中设置监听器CacheListenerCacheListener将监听rootPath路径。

//该方法用于设置监听zk的指定目录
    public void addDataListener(String path, DataListener listener, Executor executor) {
        //listeners是一个两层map对象,类型如下:
        //ConcurrentMap<String, ConcurrentMap<DataListener, TargetDataListener>>
        //最外层的map,key是被监听的路径,内层的map,key和value都是监听器,
        //其区别是value可以认为是对key的封装,在本代码中key是CacheListener,value是CuratorWatcherImpl。
        ConcurrentMap<DataListener, TargetDataListener> dataListenerMap = listeners.get(path);
        if (dataListenerMap == null) {
            listeners.putIfAbsent(path, new ConcurrentHashMap<DataListener, TargetDataListener>());
            dataListenerMap = listeners.get(path);
        }
        TargetDataListener targetListener = dataListenerMap.get(listener);
        if (targetListener == null) {
            //createTargetDataListener创建目标监听器,方法见下面[1]
            dataListenerMap.putIfAbsent(listener, createTargetDataListener(path, listener));
            targetListener = dataListenerMap.get(listener);
        }
        //访问zk将targetListener注册为监听器,方法代码见[2]
        addTargetDataListener(path, targetListener, executor);
    }
    //[1] 以zk为配置中心为例
    protected CuratorZookeeperClient.CuratorWatcherImpl createTargetDataListener(String path, DataListener listener) {
        return new CuratorWatcherImpl(client, listener);
    }
    //[2]
    public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
        try {
            return client.getChildren().usingWatcher(listener).forPath(path);
        } catch (NoNodeException e) {
            return null;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

addDataListener方法的监听器其实是CuratorWatcherImplCuratorWatcherImpl的代码如下:

static class CuratorWatcherImpl implements CuratorWatcher, TreeCacheListener {
        private CuratorFramework client;
        private volatile ChildListener childListener;
        private volatile DataListener dataListener;
        private String path;

        //createTargetDataListener方法调用下面的方法创建CuratorWatcherImpl对象,
        //从方法入参DataListener可以看出,只监听zk节点数据的变化
        public CuratorWatcherImpl(CuratorFramework client, DataListener dataListener) {
            this.dataListener = dataListener;
        }

        //...
        //代码删减

        //当数据有变化时,通知调用下面的方法
        //本方法主要是做类型的转换,然后调用CacheListener通知数据变化
        @Override
        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
            if (dataListener != null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("listen the zookeeper changed. The changed data:" + event.getData());
                }
                TreeCacheEvent.Type type = event.getType();
                EventType eventType = null;
                String content = null;
                String path = null;
                switch (type) {
                    case NODE_ADDED:
                        eventType = EventType.NodeCreated;
                        path = event.getData().getPath();
                        content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
                        break;
                    case NODE_UPDATED:
                        eventType = EventType.NodeDataChanged;
                        path = event.getData().getPath();
                        content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
                        break;
                    case NODE_REMOVED:
                        path = event.getData().getPath();
                        eventType = EventType.NodeDeleted;
                        break;
                    case INITIALIZED:
                        eventType = EventType.INITIALIZED;
                        break;
                    case CONNECTION_LOST:
                        eventType = EventType.CONNECTION_LOST;
                        break;
                    case CONNECTION_RECONNECTED:
                        eventType = EventType.CONNECTION_RECONNECTED;
                        break;
                    case CONNECTION_SUSPENDED:
                        eventType = EventType.CONNECTION_SUSPENDED;
                        break;
                }
                //调用CacheListener,下面介绍该方法
                dataListener.dataChanged(path, content, eventType);
            }
        }
    }

CacheListenerdataChanged方法如下:

    public void dataChanged(String path, Object value, EventType eventType) {
        if (eventType == null) {
            return;
        }
        //当发生INITIALIZED类型的事件时,表示客户端缓存数据同步成功之后可以与zk服务端正常交互,
        //在ZookeeperDynamicConfiguration构造方法中,调用initializedLatch.await方法等待该事件,
        //默认等待5s,超时抛出异常
        if (eventType == EventType.INITIALIZED) {
            initializedLatch.countDown();
            return;
        }

        if (path == null || (value == null && eventType != EventType.NodeDeleted)) {
            return;
        }

        // TODO We only care the changes happened on a specific path level, for example
        //  /dubbo/config/dubbo/configurators, other config changes not in this level will be ignored
        //本监听器对路径深度有检查,深度至少是四级
        if (path.split("/").length >= MIN_PATH_DEPTH) {
            //获取key值,也就是路径中最后一个“/”后面的内容
            String key = pathToKey(path);
            ConfigChangeType changeType;
            //本监听器只处理下面三种事件:增加、删除、修改
            switch (eventType) {
                case NodeCreated:
                    changeType = ConfigChangeType.ADDED;
                    break;
                case NodeDeleted:
                    changeType = ConfigChangeType.DELETED;
                    break;
                case NodeDataChanged:
                    changeType = ConfigChangeType.MODIFIED;
                    break;
                default:
                    return;
            }
            //创建事件ConfigChangedEvent
            ConfigChangedEvent configChangeEvent = new ConfigChangedEvent(key, getGroup(path), (String) value, changeType);
            //通知各个监听器
            //CacheListener其实是一个复合监听器,包含了多个子监听器
            //CacheListener根据被监听路径将ConfigChangedEvent事件发送给对应的监听器
            Set<ConfigurationListener> listeners = keyListeners.get(path);
            if (CollectionUtils.isNotEmpty(listeners)) {
                listeners.forEach(listener -> listener.process(configChangeEvent));
            }
        }
    }

CacheListener是一个复合监听器,其持有一个监听器集合,当指定目录下的数据发生变化时,通知集合中的监听器,这个集合可以包含的监听器如下:

上面这些监听器都是在其构造方法中将自身作为监听器添加到CacheListener的listeners中。

下面简单介绍前四个监听器的作用:

  1. ServiceConfigurationListener:根据修改后的配置,重新发布服务;
  2. ProviderConfigurationListener:根据修改后的配置,重新发布服务;
  3. ReferenceConfigurationListener:根据修改后的配置,重新建立对远程服务的引用;
  4. ConsumerConfigurationListener:根据修改后的配置,重新建立对远程服务的引用;

因为ProviderConfigurationListenerConsumerConfigurationListener监听应用目录,如果dubbo应用发布的服务或者引用的服务比较多,则会造成dubbo修改配置有延时。

如果需要修改配置,可以在配置中心修改相应目录下的数据,这样上述监听器便监听到数据变化,进而修改本地配置。dubbo不会自动向这些目录下存储配置数据。

dubbo解析-详解配置中心_龚厂长的博客-CSDN博客

上一篇下一篇

猜你喜欢

热点阅读