Dubbo 2.7.X 动态规则配置分析
2020-01-05 本文已影响0人
晴天哥_王志
开篇
-
继Dubbo 动态规则配置分析作为Dubbo 2.6.X版本的动态配置分析之后,补充一篇Dubbo 2.7.X版本的针对动态配置响应流程的分析文章。
-
Dubbo 2.6.X和Dubbo 2.7.X在服务治理的实现方面变化较大,但是本质的核心还是在于监听机制,只是监听的目录结构发生了变化。
-
这篇文章基于Dubbo 2.7.4.1版本对监听和回调部分进行补充。
订阅过程分析
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private static final ConsumerConfigurationListener CONSUMER_CONFIGURATION_LISTENER = new ConsumerConfigurationListener();
private ReferenceConfigurationListener serviceConfigurationListener;
public void subscribe(URL url) {
setConsumerUrl(url);
// Dubbo 2.7.X 新增的consumer侧和Service纬度的监听
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
// Dubbo 2.6.X原有的订阅过程
registry.subscribe(url, this);
}
}
-
Dubbo 2.7.X的订阅过程相比原来Dubbo 2.7.X新增CONSUMER_CONFIGURATION_LISTENER 和 serviceConfigurationListener。
-
CONSUMER_CONFIGURATION_LISTENER负责监听ConsumerConfiguration的配置。
-
serviceConfigurationListener负责监听ReferenceConfiguration的配置。
动态配置监听Listener
-
Dubbo 2.7.X新增了针对新配置节点的监听回调ReferenceConfigurationListener、ConsumerConfigurationListener、ServiceConfigurationListener、ProviderConfigurationListener等监听器。
-
ReferenceConfigurationListener负责reference的配置监听。
-
ConsumerConfigurationListener负责consumer的配置监听。
-
ServiceConfigurationListener负责service的配置监听。
-
ProviderConfigurationListener负责provider的配置监听。
private static class ReferenceConfigurationListener extends AbstractConfiguratorListener {
private RegistryDirectory directory;
private URL url;
ReferenceConfigurationListener(RegistryDirectory directory, URL url) {
this.directory = directory;
this.url = url;
// 不同的监听对象的getRuleKey的规则不同,后缀也不同。
// public String getColonSeparatedKey() {
// StringBuilder serviceNameBuilder = new StringBuilder();
// append(serviceNameBuilder, INTERFACE_KEY, true);
// append(serviceNameBuilder, VERSION_KEY, false);
// append(serviceNameBuilder, GROUP_KEY, false);
// return serviceNameBuilder.toString();
// }
this.initWith(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX);
}
@Override
protected void notifyOverrides() {
directory.refreshInvoker(Collections.emptyList());
}
}
- ReferenceConfigurationListenerd的initWith()的方法key为DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX。
private static class ConsumerConfigurationListener extends AbstractConfiguratorListener {
List<RegistryDirectory> listeners = new ArrayList<>();
ConsumerConfigurationListener() {
this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
}
void addNotifyListener(RegistryDirectory listener) {
this.listeners.add(listener);
}
@Override
protected void notifyOverrides() {
listeners.forEach(listener -> listener.refreshInvoker(Collections.emptyList()));
}
}
- ConsumerConfigurationListener的initWith()的方法key为ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX。
private class ServiceConfigurationListener extends AbstractConfiguratorListener {
private URL providerUrl;
private OverrideListener notifyListener;
public ServiceConfigurationListener(URL providerUrl, OverrideListener notifyListener) {
this.providerUrl = providerUrl;
this.notifyListener = notifyListener;
this.initWith(DynamicConfiguration.getRuleKey(providerUrl) + CONFIGURATORS_SUFFIX);
}
private <T> URL overrideUrl(URL providerUrl) {
return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
}
@Override
protected void notifyOverrides() {
notifyListener.doOverrideIfNecessary();
}
}
- ServiceConfigurationListener的initWith()的方法key为DynamicConfiguration.getRuleKey(providerUrl) + CONFIGURATORS_SUFFIX。
private class ProviderConfigurationListener extends AbstractConfiguratorListener {
public ProviderConfigurationListener() {
this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
}
private <T> URL overrideUrl(URL providerUrl) {
return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
}
@Override
protected void notifyOverrides() {
overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary());
}
}
- ProviderConfigurationListener的initWith()的方法key为ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX。
动态配置中心构建对象
public abstract class AbstractConfiguratorListener implements ConfigurationListener {
protected List<Configurator> configurators = Collections.emptyList();
protected final void initWith(String key) {
// 在注册中心为zookeeper的场景下返回ZookeeperDynamicConfiguration对象,全局唯一。
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
// 向ZookeeperDynamicConfiguration中添加监听器,通过key进行区分
// ReferenceConfigurationListener
// ConsumerConfigurationListener
// ServiceConfigurationListener
// ProviderConfigurationListener
dynamicConfiguration.addListener(key, this);
String rawConfig = dynamicConfiguration.getRule(key, DynamicConfiguration.DEFAULT_GROUP);
if (!StringUtils.isEmpty(rawConfig)) {
genConfiguratorsFromRawRule(rawConfig);
}
}
@Override
public void process(ConfigChangeEvent event) {
if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
configurators.clear();
} else {
// 负责解析配置生成configurators对象
if (!genConfiguratorsFromRawRule(event.getValue())) {
return;
}
}
notifyOverrides();
}
protected abstract void notifyOverrides();
private boolean genConfiguratorsFromRawRule(String rawConfig) {
boolean parseSuccess = true;
try {
// parseConfigurators will recognize app/service config automatically.
configurators = Configurator.toConfigurators(ConfigParser.parseConfigurators(rawConfig))
.orElse(configurators);
} catch (Exception e) {
logger.error("Failed to parse raw dynamic config and it will not take effect, the raw config is: " +
rawConfig, e);
parseSuccess = false;
}
return parseSuccess;
}
}
public interface DynamicConfiguration extends Configuration {
String DEFAULT_GROUP = "dubbo";
default void addListener(String key, ConfigurationListener listener) {
addListener(key, DEFAULT_GROUP, listener);
}
}
- DynamicConfiguration是动态配置中心对象,在注册中心为zookeeper的场景下为ZookeeperDynamicConfiguration对象,全局唯一。
- dynamicConfiguration.addListener(key, this)的key作为监听回调对象的唯一标识,通过key来唯一区分回调Listener。
- addListener()最终会执行到ZookeeperDynamicConfiguration的addListener()方法。
- AbstractConfiguratorListener的process()方法在回调后倍执行并解析生成configurators对象。
ZookeeperDynamicConfiguration
public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
private Executor executor;
// The final root path would be: /configRootPath/config
private String rootPath;
private final ZookeeperClient zkClient;
private CountDownLatch initializedLatch;
private CacheListener cacheListener;
private URL url;
ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
this.url = url;
// 默认情况为/dubbo/config
rootPath = PATH_SEPARATOR + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
initializedLatch = new CountDownLatch(1);
this.cacheListener = new CacheListener(rootPath, initializedLatch);
this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
zkClient = zookeeperTransporter.connect(url);
zkClient.addDataListener(rootPath, cacheListener, executor);
try {
// Wait for connection
long timeout = url.getParameter("init.timeout", 5000);
boolean isCountDown = this.initializedLatch.await(timeout, TimeUnit.MILLISECONDS);
if (!isCountDown) {
throw new IllegalStateException("Failed to receive INITIALIZED event from zookeeper, pls. check if url "
+ url + " is correct");
}
} catch (InterruptedException e) {
}
}
public void addListener(String key, String group, ConfigurationListener listener) {
cacheListener.addListener(getPathKey(group, key), listener);
}
private String getPathKey(String group, String key) {
// /dubbo/config/group/key
return rootPath + PATH_SEPARATOR + group + PATH_SEPARATOR + key;
}
}
- ZookeeperDynamicConfiguration当中真正和zookeeper交互的监听器cacheListener。
- ZookeeperDynamicConfiguration的addListener执行的是cacheListener的addListener,负责将监听回调添加到cacheListener。
- ZookeeperDynamicConfiguration中的cacheListener的key为/dubbo/config/dubbo/keyPath,其中keyPath为不同listener传入的key字段。
public class CacheListener implements DataListener {
private static final int MIN_PATH_DEPTH = 5;
private Map<String, Set<ConfigurationListener>> keyListeners = new ConcurrentHashMap<>();
private CountDownLatch initializedLatch;
private String rootPath;
public CacheListener(String rootPath, CountDownLatch initializedLatch) {
this.rootPath = rootPath;
this.initializedLatch = initializedLatch;
}
// /dubbo/config/dubbo/org.apache.dubbo.demo.DemoService::.condition-router
// /dubbo/config/dubbo/dubbo-demo-api-consumer.configurators
// /dubbo/config/dubbo/dubbo-demo-api-consumer.condition-router
public void addListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
listeners.add(configurationListener);
}
private String pathToKey(String path) {
// path 为 /dubbo/config/dubbo/dubbo-demo-api-consumer.configurators
if (StringUtils.isEmpty(path)) {
return path;
}
// groupKey 为 dubbo.dubbo-demo-api-consumer.configurators
// rootPath 为 /dubbo/config
String groupKey = path.replace(rootPath + PATH_SEPARATOR, "").replaceAll(PATH_SEPARATOR, DOT_SEPARATOR);
// dubbo-demo-api-consumer.configurators
return groupKey.substring(groupKey.indexOf(DOT_SEPARATOR) + 1);
}
@Override
public void dataChanged(String path, Object value, EventType eventType) {
if (eventType == null) {
return;
}
if (eventType == EventType.INITIALIZED) {
initializedLatch.countDown();
return;
}
if (path == null || (value == null && eventType != EventType.NodeDeleted)) {
return;
}
if (path.split("/").length >= MIN_PATH_DEPTH) {
String key = pathToKey(path);
ConfigChangeType changeType;
switch (eventType) {
case NodeCreated:
changeType = ConfigChangeType.ADDED;
break;
case NodeDeleted:
changeType = ConfigChangeType.DELETED;
break;
case NodeDataChanged:
changeType = ConfigChangeType.MODIFIED;
break;
default:
return;
}
ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, (String) value, changeType);
// 根据路径找到对应的回调监听器keyListeners.get(path)
Set<ConfigurationListener> listeners = keyListeners.get(path);
if (CollectionUtils.isNotEmpty(listeners)) {
listeners.forEach(listener -> listener.process(configChangeEvent));
}
}
}
}
- CacheListener监听回调过程中会根据路径path去查找对应的回调ConfigurationListener的process()方法进行处理,即AbstractConfiguratorListener的process()方法。
- AbstractConfiguratorListener的process()方法会先统一执行configurators的生成,然后再执行具体的Listener的ReferenceConfigurationListener、ConsumerConfigurationListener、ServiceConfigurationListener、ProviderConfigurationListener的notifyOverrides()方法重新刷新对象。