dubbo源码2-注册中心
一 注册中心
dubbo注册中心主要是保存和管理服务的provider信息和consumer信息。
dubbo提供的注册中心有zookeeper,multicast,redis等。
下面主要介绍zookeeper注册中心,类继承关系如下。
image.png
1.1 AbstractRegistry
1.1.1 注册,订阅数据缓存
- 在register(),unregister()方法调用时,缓存相应数据到ConcurrentHashSet<URL>中
- 在subscribe(),unsubscribe()方法调用时,缓存相应数据到ConcurrentMap<URL, Set<NotifyListener>>中
- 缓存数据用于连接抖动断开重连时恢复zk上的相关配置。
1.1.2 订阅结果处理
- 通知notify()
变更的url和订阅的url只有isMatch()方法匹配才会调用回调listener函数。主要是group和version。其中consumer端的配置支持*为匹配任意的provider的配置。
public static boolean isMatch(URL consumerUrl, URL providerUrl) {
String consumerInterface = consumerUrl.getServiceInterface();
String providerInterface = providerUrl.getServiceInterface();
if (!(Constants.ANY_VALUE.equals(consumerInterface) || StringUtils.isEquals(consumerInterface, providerInterface)))
return false;
if (!isMatchCategory(providerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY),
consumerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY))) {
return false;
}
if (!providerUrl.getParameter(Constants.ENABLED_KEY, true)
&& !Constants.ANY_VALUE.equals(consumerUrl.getParameter(Constants.ENABLED_KEY))) {
return false;
}
String consumerGroup = consumerUrl.getParameter(Constants.GROUP_KEY);
String consumerVersion = consumerUrl.getParameter(Constants.VERSION_KEY);
String consumerClassifier = consumerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);
String providerGroup = providerUrl.getParameter(Constants.GROUP_KEY);
String providerVersion = providerUrl.getParameter(Constants.VERSION_KEY);
String providerClassifier = providerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);
return (Constants.ANY_VALUE.equals(consumerGroup) || StringUtils.isEquals(consumerGroup, providerGroup) || StringUtils.isContains(consumerGroup, providerGroup))
&& (Constants.ANY_VALUE.equals(consumerVersion) || StringUtils.isEquals(consumerVersion, providerVersion))
&& (consumerClassifier == null || Constants.ANY_VALUE.equals(consumerClassifier) || StringUtils.isEquals(consumerClassifier, providerClassifier));
}
- 内存缓存
变更的url信息,按照category属性分类。
category分为:providers(服务提供者),consumers(服务使用者),routers(服务路由),configurators(动态配置)。
缓存到ConcurrentMap<URL, Map<String/分类/, List<URL>>>中 - 文件缓存,
key为save.file的值表示是否同步执行本地文件缓存
key为file的值表示本地文件路径,为空则不做本地文件缓存
通过FileLock锁避免对缓存文件的并发修改
File lockfile = new File(file.getAbsolutePath() + ".lock");
if (!lockfile.exists()) {
lockfile.createNewFile();
}
RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
try {
FileChannel channel = raf.getChannel();
try {
FileLock lock = channel.tryLock();
if (lock == null) {
throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
}
// Save
try {
if (!file.exists()) {
file.createNewFile();
}
FileOutputStream outputFile = new FileOutputStream(file);
try {
properties.store(outputFile, "Dubbo Registry Cache");
} finally {
outputFile.close();
}
} finally {
lock.release();
}
} finally {
channel.close();
}
} finally {
raf.close();
}
- 查询
可通过lookup()方法查询目标节点的变更数据记录
1.2 FailbackRegistry
在注册,注销,订阅,解订阅的调用接口中,缓存失败的相关信息。
异步线程5s间隔周期重试缓存的失败操作
1.3 ZookeeperRegistry
- 屏蔽不同zkclient的库,提供统一的接口
- 连接zookeeper,并监听连接状态,调用AbstractRegistry.recover()在重连成功后恢复zookeeper上的注册和订阅等信息。
-
订阅 doSubscribe()
image.png
二 注册流程
在<dubbo:reference/>和<dubbo:service/>的配置解析时,根据URL协议信息调用Protocol接口的实现类方法。默认使用dubbo协议。
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()
调用流程如下,接口信息的注册是在RegistryProtocol类中完成。
Protocol调用流程.png
2.1 URL组装
- AbstractInterfaceConfig.loadRegistries()根据registry和application配置信息,对每个注册中心地址创建一个URL。
- 在registry URL的refer参数存储<dubbo:reference/>接口配置
- 在registry URL的export参数存储<dubbo:service/>接口配置
2.2 <dubbo:reference/>之refer()
2.2.1 refer
- 工厂类获取zookeeperRegistry
- 接口是注册中心接口,则返回接口代理
- group配置为匹配多个的,则使用MergeableCluster,支持合并多个provider返回结果。
- cluster通过依赖注入,默认为failoverCluster,支持失败重试。详情后续在cluster层源码解析时介绍。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}
2.2.2 doRefer
- 构建RegistryDirectory,用于存储接口的路由,动态配置,provider信息。
- 注册url为接口consumers的子节点
- 订阅接口providers,routers,overriders节点的子节点变更,在RegistryDirectory中做相应处理。
- 返回clusterInvoker,配置解析层在invoker基础上创建代理层,交给spring管理
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
2.2.3 RegistryDirectory
接口的几个category子节点信息变更时,触发notify回调,按照url的category配置分别处理。
-
URL overrideDirectoryUrl存储动态配置信息。
- 合并实际的接口url配置即为实际使用的配置。
-
List<Router> routers 存储接口路由信息
1.添加默认路由routers.add(new MockInvokersSelector());
用于筛选mock协议的provider。
2.更新methodInvokerMap时,根据路由配置,过滤不匹配的provider
3.若router配置runtime为true,则每次consumer调用接口时,获取provider列表后,都需要调用该router过滤掉不匹配的provider。 -
refreshInvoker()更新缓存的provider信息
1.Map<String/url/, Invoker<T>> urlInvokerMap 以url为key存储的provider信息
2.Map<String, List<Invoker<T>>> methodInvokerMap urlInvokerMap 以方法名为key存储的provider信息。实际接口调用时,使用此处缓存的数据。
3.删除下线的provider,对新增的provider url,调用协议层refer接口处理,主要是设置创建netty client,和provider建立连接。
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
refreshInvoker(invokerUrls);
}
2.3 <dubbo:service/>之export()
- doLocalExport()
1 ConcurrentHashMap bounds缓存service接口信息,通过同步锁避免并发问题。
2 调用dubbo协议接口,主要是创建本地的netty server。后续协议层解析时介绍详细内容。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
-
注册接口
接口注册.png
动态配置变更时,原接口url合并动态配置,调用doChangeLocalExport()更新对应的协议层的配置信息。