Dubbo消费者订阅
一、与Spring的结合
ReferenceConfig被ReferenceBean继承然后通过ReferenceAnnotationBeanPostProcessor注册到Spring中IOC中。与ServiceConifg注册过程类似
二、订阅服务
首先入口在ReferenceConfig的get方法中方法触发init方法
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
然后来看看init最后获取消费者代理的代码
ref = createProxy(map);
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
ref就是这消费的代理,调用时通过这个代理去调用invoker然后通过dubbo协议到最底层通过TCP远程调用服务,获取结果。
看看createProxy怎么创建代理的
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
}
通过protocol的refer去获取invoker。这的protocol通过扩展工具获得的RegistryProtocol,看看他的refer方法做了什么。一直走到方法的最底层doRefer方法。
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
这里通过registry订阅服务,这个registry根据配置的的注册中心不一样,扩展工具获取的也不一样。如果你配置的zookeeper,那么获取的结果就是ZookeeperRegistry。
这里的注册逻辑与Service的注册逻辑差不多。
Invoker是通过RegistryDirectory获取的,Directoy是Invoker的集合包装,分为静态和动态的。静态的就直接构造了,动态的是通过监听器,监听注册中心服务的变化进行动态的更新invokers。看看实现:
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
}
NotifyListener监听器就是监听注册中心的变化,看看实现的监听逻辑在notify这个方法中的refreshInvoker中通过传入的url进行invoker的构造。重要逻辑在toInvoker方法中
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
最后发现这发现这里通过protocol的refer进行invoker的构造。咦,TM的这不是又回去了吗?第一次看我真是这么想的,这TM的不是递归了吗?到处找递归结束条件,最终无果!
最后发现这个protocol是在前面的RegistryProtocol传入的,这实例中有个成员变量也是Protocol类型的,这个Protocol又是通过扩展工具在构造RegistryProtocol的时候注入的。
public class RegistryProtocol implements Protocol {
private final static Logger logger = LoggerFactory.getLogger(RegistryProtocol.class);
private static RegistryProtocol INSTANCE;
private final Map<URL, NotifyListener> overrideListeners = new ConcurrentHashMap<URL, NotifyListener>();
//To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
//providerurl <--> exporter
private final Map<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<String, ExporterChangeableWrapper<?>>();
private Cluster cluster;
private Protocol protocol;
private RegistryFactory registryFactory;
private ProxyFactory proxyFactory;
}
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
然后突然想着TM这个protocol通过ExtensionLoader工具注入的时候会不是就是他自己,那这么想又TM的递归了!最后看懂了ExtensionLoader工具后发现他是URL总线模式的,这个时候在url中的参数已经变了,不会获取原来那个Protocol,而是DubboProtocol.
URL变化代码如下,如果想搞清楚就一定要看懂ExtensionLoader
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
Registry registry = registryFactory.getRegistry(url);
这段代码在RegistryProtocol中的refer方法中。
三、构造Invoker
DubboProtocol中构造Inoker代码,所以从这里我们可以证明Dubbo注册,订阅是通过如zookeeper这样的中间件,但是真正的通讯是通过dubbo协议自定义通讯来进行沟通的。
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}