Dubbo消费者订阅

2019-01-04  本文已影响0人  你值得拥有更好的12138

一、与Spring的结合

ReferenceConfig被ReferenceBean继承然后通过ReferenceAnnotationBeanPostProcessor注册到Spring中IOC中。与ServiceConifg注册过程类似

二、订阅服务

首先入口在ReferenceConfig的get方法中方法触发init方法

  public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }

然后来看看init最后获取消费者代理的代码

  ref = createProxy(map);
        ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), ref, interfaceClass.getMethods());
        ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);

ref就是这消费的代理,调用时通过这个代理去调用invoker然后通过dubbo协议到最底层通过TCP远程调用服务,获取结果。

看看createProxy怎么创建代理的

if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
            }

通过protocol的refer去获取invoker。这的protocol通过扩展工具获得的RegistryProtocol,看看他的refer方法做了什么。一直走到方法的最底层doRefer方法。

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

这里通过registry订阅服务,这个registry根据配置的的注册中心不一样,扩展工具获取的也不一样。如果你配置的zookeeper,那么获取的结果就是ZookeeperRegistry。
这里的注册逻辑与Service的注册逻辑差不多。

Invoker是通过RegistryDirectory获取的,Directoy是Invoker的集合包装,分为静态和动态的。静态的就直接构造了,动态的是通过监听器,监听注册中心服务的变化进行动态的更新invokers。看看实现:

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
}

NotifyListener监听器就是监听注册中心的变化,看看实现的监听逻辑在notify这个方法中的refreshInvoker中通过传入的url进行invoker的构造。重要逻辑在toInvoker方法中

Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                        invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }

最后发现这发现这里通过protocol的refer进行invoker的构造。咦,TM的这不是又回去了吗?第一次看我真是这么想的,这TM的不是递归了吗?到处找递归结束条件,最终无果!
最后发现这个protocol是在前面的RegistryProtocol传入的,这实例中有个成员变量也是Protocol类型的,这个Protocol又是通过扩展工具在构造RegistryProtocol的时候注入的。

public class RegistryProtocol implements Protocol {

    private final static Logger logger = LoggerFactory.getLogger(RegistryProtocol.class);
    private static RegistryProtocol INSTANCE;
    private final Map<URL, NotifyListener> overrideListeners = new ConcurrentHashMap<URL, NotifyListener>();
    //To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
    //providerurl <--> exporter
    private final Map<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<String, ExporterChangeableWrapper<?>>();
    private Cluster cluster;
    private Protocol protocol;
    private RegistryFactory registryFactory;
    private ProxyFactory proxyFactory;
}
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);

然后突然想着TM这个protocol通过ExtensionLoader工具注入的时候会不是就是他自己,那这么想又TM的递归了!最后看懂了ExtensionLoader工具后发现他是URL总线模式的,这个时候在url中的参数已经变了,不会获取原来那个Protocol,而是DubboProtocol.
URL变化代码如下,如果想搞清楚就一定要看懂ExtensionLoader

 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        Registry registry = registryFactory.getRegistry(url);

这段代码在RegistryProtocol中的refer方法中。

三、构造Invoker

DubboProtocol中构造Inoker代码,所以从这里我们可以证明Dubbo注册,订阅是通过如zookeeper这样的中间件,但是真正的通讯是通过dubbo协议自定义通讯来进行沟通的。

 @Override
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }
上一篇下一篇

猜你喜欢

热点阅读