dubbo

Dubbo 2.7.X 动态规则配置分析

2020-01-05  本文已影响0人  晴天哥_王志

开篇

订阅过程分析

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {

    private static final ConsumerConfigurationListener CONSUMER_CONFIGURATION_LISTENER = new ConsumerConfigurationListener();
    private ReferenceConfigurationListener serviceConfigurationListener;

    public void subscribe(URL url) {
        setConsumerUrl(url);

        // Dubbo 2.7.X 新增的consumer侧和Service纬度的监听
        CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
        serviceConfigurationListener = new ReferenceConfigurationListener(this, url);

        // Dubbo 2.6.X原有的订阅过程
        registry.subscribe(url, this);
    }
}

动态配置监听Listener

private static class ReferenceConfigurationListener extends AbstractConfiguratorListener {
    private RegistryDirectory directory;
    private URL url;

    ReferenceConfigurationListener(RegistryDirectory directory, URL url) {
        this.directory = directory;
        this.url = url;
        // 不同的监听对象的getRuleKey的规则不同,后缀也不同。
        // public String getColonSeparatedKey() {
        // StringBuilder serviceNameBuilder = new StringBuilder();
        // append(serviceNameBuilder, INTERFACE_KEY, true);
        // append(serviceNameBuilder, VERSION_KEY, false);
        // append(serviceNameBuilder, GROUP_KEY, false);
        // return serviceNameBuilder.toString();
        // }
        this.initWith(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX);
    }

    @Override
    protected void notifyOverrides() {
        directory.refreshInvoker(Collections.emptyList());
    }
}
private static class ConsumerConfigurationListener extends AbstractConfiguratorListener {
    List<RegistryDirectory> listeners = new ArrayList<>();

    ConsumerConfigurationListener() {
        this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
    }

    void addNotifyListener(RegistryDirectory listener) {
        this.listeners.add(listener);
    }

    @Override
    protected void notifyOverrides() {
        listeners.forEach(listener -> listener.refreshInvoker(Collections.emptyList()));
    }
}
private class ServiceConfigurationListener extends AbstractConfiguratorListener {
    private URL providerUrl;
    private OverrideListener notifyListener;

    public ServiceConfigurationListener(URL providerUrl, OverrideListener notifyListener) {
        this.providerUrl = providerUrl;
        this.notifyListener = notifyListener;
        this.initWith(DynamicConfiguration.getRuleKey(providerUrl) + CONFIGURATORS_SUFFIX);
    }

    private <T> URL overrideUrl(URL providerUrl) {
        return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
    }

    @Override
    protected void notifyOverrides() {
        notifyListener.doOverrideIfNecessary();
    }
}
private class ProviderConfigurationListener extends AbstractConfiguratorListener {

    public ProviderConfigurationListener() {
        this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
    }

    private <T> URL overrideUrl(URL providerUrl) {
        return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
    }

    @Override
    protected void notifyOverrides() {
        overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary());
    }
}

动态配置中心构建对象

public abstract class AbstractConfiguratorListener implements ConfigurationListener {
   
    protected List<Configurator> configurators = Collections.emptyList();

    protected final void initWith(String key) {
        // 在注册中心为zookeeper的场景下返回ZookeeperDynamicConfiguration对象,全局唯一。
        DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
        // 向ZookeeperDynamicConfiguration中添加监听器,通过key进行区分
        // ReferenceConfigurationListener
        // ConsumerConfigurationListener
        // ServiceConfigurationListener
        // ProviderConfigurationListener
        dynamicConfiguration.addListener(key, this);
        String rawConfig = dynamicConfiguration.getRule(key, DynamicConfiguration.DEFAULT_GROUP);
        if (!StringUtils.isEmpty(rawConfig)) {
            genConfiguratorsFromRawRule(rawConfig);
        }
    }

    @Override
    public void process(ConfigChangeEvent event) {

        if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
            configurators.clear();
        } else {
            // 负责解析配置生成configurators对象
            if (!genConfiguratorsFromRawRule(event.getValue())) {
                return;
            }
        }

        notifyOverrides();
    }

    protected abstract void notifyOverrides();

    private boolean genConfiguratorsFromRawRule(String rawConfig) {
        boolean parseSuccess = true;
        try {
            // parseConfigurators will recognize app/service config automatically.
            configurators = Configurator.toConfigurators(ConfigParser.parseConfigurators(rawConfig))
                    .orElse(configurators);
        } catch (Exception e) {
            logger.error("Failed to parse raw dynamic config and it will not take effect, the raw config is: " +
                    rawConfig, e);
            parseSuccess = false;
        }
        return parseSuccess;
    }
}


public interface DynamicConfiguration extends Configuration {
    String DEFAULT_GROUP = "dubbo";

    default void addListener(String key, ConfigurationListener listener) {
        addListener(key, DEFAULT_GROUP, listener);
    }
}

ZookeeperDynamicConfiguration

public class ZookeeperDynamicConfiguration implements DynamicConfiguration {

    private Executor executor;
    // The final root path would be: /configRootPath/config
    private String rootPath;
    private final ZookeeperClient zkClient;
    private CountDownLatch initializedLatch;
    private CacheListener cacheListener;
    private URL url;

    ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
        this.url = url;
        // 默认情况为/dubbo/config
        rootPath = PATH_SEPARATOR + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";

        initializedLatch = new CountDownLatch(1);
        this.cacheListener = new CacheListener(rootPath, initializedLatch);
        this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));

        zkClient = zookeeperTransporter.connect(url);
        zkClient.addDataListener(rootPath, cacheListener, executor);
        try {
            // Wait for connection
            long timeout = url.getParameter("init.timeout", 5000);
            boolean isCountDown = this.initializedLatch.await(timeout, TimeUnit.MILLISECONDS);
            if (!isCountDown) {
                throw new IllegalStateException("Failed to receive INITIALIZED event from zookeeper, pls. check if url "
                        + url + " is correct");
            }
        } catch (InterruptedException e) {
        }
    }

    public void addListener(String key, String group, ConfigurationListener listener) {
        cacheListener.addListener(getPathKey(group, key), listener);
    }

    private String getPathKey(String group, String key) {
        // /dubbo/config/group/key
        return rootPath + PATH_SEPARATOR + group + PATH_SEPARATOR + key;
    }
}
public class CacheListener implements DataListener {
    private static final int MIN_PATH_DEPTH = 5;

    private Map<String, Set<ConfigurationListener>> keyListeners = new ConcurrentHashMap<>();
    private CountDownLatch initializedLatch;
    private String rootPath;

    public CacheListener(String rootPath, CountDownLatch initializedLatch) {
        this.rootPath = rootPath;
        this.initializedLatch = initializedLatch;
    }

    // /dubbo/config/dubbo/org.apache.dubbo.demo.DemoService::.condition-router
    // /dubbo/config/dubbo/dubbo-demo-api-consumer.configurators
    // /dubbo/config/dubbo/dubbo-demo-api-consumer.condition-router
    public void addListener(String key, ConfigurationListener configurationListener) {
        Set<ConfigurationListener> listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
        listeners.add(configurationListener);
    }

    private String pathToKey(String path) {
        // path 为 /dubbo/config/dubbo/dubbo-demo-api-consumer.configurators
        if (StringUtils.isEmpty(path)) {
            return path;
        }
        // groupKey 为 dubbo.dubbo-demo-api-consumer.configurators
        // rootPath 为 /dubbo/config
        String groupKey = path.replace(rootPath + PATH_SEPARATOR, "").replaceAll(PATH_SEPARATOR, DOT_SEPARATOR);

        // dubbo-demo-api-consumer.configurators
        return groupKey.substring(groupKey.indexOf(DOT_SEPARATOR) + 1);
    }

    @Override
    public void dataChanged(String path, Object value, EventType eventType) {
        if (eventType == null) {
            return;
        }

        if (eventType == EventType.INITIALIZED) {
            initializedLatch.countDown();
            return;
        }

        if (path == null || (value == null && eventType != EventType.NodeDeleted)) {
            return;
        }

        if (path.split("/").length >= MIN_PATH_DEPTH) {
            String key = pathToKey(path);
            ConfigChangeType changeType;
            switch (eventType) {
                case NodeCreated:
                    changeType = ConfigChangeType.ADDED;
                    break;
                case NodeDeleted:
                    changeType = ConfigChangeType.DELETED;
                    break;
                case NodeDataChanged:
                    changeType = ConfigChangeType.MODIFIED;
                    break;
                default:
                    return;
            }

            ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, (String) value, changeType);

            // 根据路径找到对应的回调监听器keyListeners.get(path)
            Set<ConfigurationListener> listeners = keyListeners.get(path);
            if (CollectionUtils.isNotEmpty(listeners)) {
                listeners.forEach(listener -> listener.process(configChangeEvent));
            }
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读