Dubbo之Directory源码分析
Directory的作用
首先看下Directory的接口定义
public interface Directory<T> extends Node {
Class<T> getInterface();
List<Invoker<T>> list(Invocation invocation) throws RpcException;
}
每个Directory实例会对应一个接口服务,它的主要功能是为Cluster提供远程对等调用invoker目录服务,list方法用于获取远程服务提供者的对等调用Invokers
public interface Cluster {
<T> Invoker<T> join(Directory<T> directory) throws RpcException;
}
Directory用于获取多个远程对等调用invoker,而Cluster用于将这些invoker伪装成一个invoker进行集群调用,Cluster源码会单独讲解
Directory的两种实现
Directory有两种实现,StaticDirectory和RegistryDirectory,分别对应静态和动态的Invoker目录服务
这两个实现都继承了模板类AbstractDirectory,让我们来看下AbstractDirectory封装了什么逻辑
可以在AbstractDirectory中可以看到这么一个变量
private volatile List<Router> routers;
routers用于路由,用于过滤远程对等调用invoker,关于Router路由源码我会单独讲解
可以看到AbstractDirectory实现了Directory接口的list方法
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
上面逻辑留了模板方法doList给子类实现
protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
子类实现doList方法,只负责invokers的获取,在AbstractDirectory中增加的router的过滤逻辑。消费者拿到的invoker集合,是经过routers过滤的。
对于StaticDirectory和RegistryDirectory,我们只要关注如何获取远程对等调用invokers的逻辑即可
StaticDirectory
StaticDirectory没什么好讲的,doList方法直接返回设置的invokers
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
return invokers;
}
RegistryDirectory
RegistryDirectory实现了NotifyListener接口,会随着提供者的上下线动态刷新本地invoker缓存
对于invoker,在RegistryDirectory有两个缓存
private volatile Map<String, Invoker<T>> urlInvokerMap;
private volatile Map<String, List<Invoker<T>>> methodInvokerMap;
urlInvokerMap缓存url对应的invoker
methodInvokerMap缓存方法对应的invokers,方法对应invoker可以存在多个,所以是 List<Invoker<T>>,在dolist方法中会用到
notify回调会刷新这两个缓存
我们先来看下RegistryDirectory的订阅操作
public void subscribe(URL url) {
setConsumerUrl(url);
registry.subscribe(url, this);
}
这边的registry可以认为就是之前讲的ZookeeperRegistry,把RegistryDirectory自身作为订阅回调,一旦监控的路径发生变化,就会回调RegistryDirectory的notify方法
那么subscribe会订阅那些url?在RegistryProtocol中可以看到
//directory订阅url对应interface的provider,configurators,routers接口目录,回调接口NotifyListener由RegistryDirectory实现
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
会监听这个接口的provider,configurators,routers目录
接下来看notify的实现
public synchronized void notify(List<URL> urls) {
//这里的通知会一次性传递对应监听目录下所有的url
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
//对url进行分类
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
//刷新configuratorUrls
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers
//刷新routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
//override这个overrideDirectoryUrl什么用?
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
//刷新客户端对等invoker
refreshInvoker(invokerUrls);
}
方法的开始,会对url进行分类,一共provider,configurators,routers三种类型
先刷新RegistryDirectory中的configurators,routers,再使用provider urls增量刷新invoker缓存
如果provider urls不存在,那么根据上一次的缓存provider urls,再使用router增量刷新invoker缓存
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
//传入的url protocol = empty,directory设置为禁用
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
//摧毁客户端的对等调用invoker
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
//invokerUrls为空,因为通知的url可能只改变了router或者configurator,提供者并没有变化,但是对应invoker配置还是需要被更改的
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
//invokerUrls为空使用缓存的invokers urls,也就是上一次回调拿到invokers
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
//更新缓存
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
//invokerUrls为空,中止
if (invokerUrls.isEmpty()) {
return;
}
//把url转换为invoker,已经存在的invoker不会重新创建
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
//把newUrlInvokerMap转换为methodInvokerMap
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
//如果存在group配置,对method对应的invoker进行cluster伪装
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
在refreshInvoker方法中,会根据消费者url的protocol过滤掉不匹配的提供者url,然后对过滤后的提供者url生成远程对等调用invoker,如果invoker已经存在,那么不用再重复创建
接下来看下doList方法的实现
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
"No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
}
List<Invoker<T>> invokers = null;
//针对每个方法,有不同的invokers列表。可能存在路由配置
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
//从invocaion获取方法名和参数,可能是$invoke泛化调用
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
直接从methodInvokerMap中获取对应的invoker集合即可。优先通过方法名查找,如果找不到,通过*作为key查找,再找不到,返回methodInvokerMap第一个invoker集合。
讲解完子类需要实现的doList方法后,下面看下RegistryDirectory是如何被使用到的
RegistryDirectory的使用
RegistryDirectory封装了获取远程对等invokers的逻辑,主要使用在RegistryProtocol的doRefer方法
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//这边的url为consumer url
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
//这里的protocol为spi注入的适配类
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
//注册consumer url
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
//directory订阅url对应interface的provider,configurators,routers接口目录,回调接口NotifyListener由RegistryDirectory实现
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
//通过cluster封装获取invoker的逻辑,将对多个invoker的集群调用封装成一个invoker
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
RegistryDirectory通过与Cluster配合,将对多个invoker的集群调用封装成一个invoker,然后通过代理把invoker转换为代理对象bean,放入spring容器中去,就和正常使用本地bean一样。这就是RPC。
总结
Dubbo的目录服务,用于获取远程对等invoker,其实这种设计在业务场景中也能用到。
比如我们公司项目的司机池功能,对于每个订单都有一个司机池,并且这个司机池会随着司机状态变化而发生变化,也可以参考Directory的接口设计。
最后
希望大家关注下我的公众号
image