Dubbo源码分析(十三) Directory实现

2018-11-13  本文已影响0人  skyguard

下面我们来说一下Dubbo的Directory实现。之前我们说过了,Cluster是调用了Directory实现的集群容错机制。先来看一下Directory接口

 /**
 * get service type.
 *
 * 获得服务类型,例如:com.alibaba.dubbo.demo.DemoService
 *
 * @return service type.
 */
Class<T> getInterface();

/**
 * list invokers.
 *
 * 获得所有服务 Invoker 集合
 *
 * @return invokers
 */
List<Invoker<T>> list(Invocation invocation) throws RpcException;

再来看一下AbstractDirectory的list方法

public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }
    // 获得所有 Invoker 集合
    List<Invoker<T>> invokers = doList(invocation);
    // 根据路由规则,筛选 Invoker 集合
    List<Router> localRouters = this.routers; // local reference 本地引用,避免并发问题
    if (localRouters != null && !localRouters.isEmpty()) {
        for (Router router : localRouters) {
            try {
                if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                    invokers = router.route(invokers, getConsumerUrl(), invocation);
                }
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
    }
    return invokers;
}

再看一下RegistryDirectory的notify方法

public synchronized void notify(List<URL> urls) {
    // 根据 URL 的分类或协议,分组成三个集合 。
    List<URL> invokerUrls = new ArrayList<URL>(); // 服务提供者 URL 集合
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    for (URL url : urls) {
        String protocol = url.getProtocol();
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
        }
    }
    // 处理配置规则 URL 集合
    // configurators
    if (!configuratorUrls.isEmpty()) {
        this.configurators = toConfigurators(configuratorUrls);
    }
    // 处理路由规则 URL 集合
    // routers
    if (!routerUrls.isEmpty()) {
        List<Router> routers = toRouters(routerUrls);
        if (routers != null) { // null - do nothing
            setRouters(routers);
        }
    }
    // 合并配置规则,到 `directoryUrl` 中,形成 `overrideDirectoryUrl` 变量。
    List<Configurator> localConfigurators = this.configurators; // local reference
    // merge override parameters
    this.overrideDirectoryUrl = directoryUrl;
    if (localConfigurators != null && !localConfigurators.isEmpty()) {
        for (Configurator configurator : localConfigurators) {
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }
    // 处理服务提供者 URL 集合
    // providers
    refreshInvoker(invokerUrls);
}

再到refreshInvoker方法

private void refreshInvoker(List<URL> invokerUrls) {
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        // 设置禁止访问
        this.forbidden = true; // Forbid to access
        // methodInvokerMap 置空
        this.methodInvokerMap = null; // Set the method invoker map to null
        // 销毁所有 Invoker 集合
        destroyAllInvokers(); // Close all invokers
    } else {
        // 设置允许访问
        this.forbidden = false; // Allow to access
        // 引用老的 urlInvokerMap
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        // 传入的 invokerUrls 为空,说明是路由规则或配置规则发生改变,此时 invokerUrls 是空的,直接使用 cachedInvokerUrls 。
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
            invokerUrls.addAll(this.cachedInvokerUrls);
        // 传入的 invokerUrls 非空,更新 cachedInvokerUrls 。
        } else {
            this.cachedInvokerUrls = new HashSet<URL>();
            this.cachedInvokerUrls.addAll(invokerUrls); //Cached invoker urls, convenient for comparison //缓存invokerUrls列表,便于交叉对比
        }
        // 忽略,若无 invokerUrls
        if (invokerUrls.isEmpty()) {
            return;
        }
        // 将传入的 invokerUrls ,转成新的 urlInvokerMap
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
        // 转换出新的 methodInvokerMap
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
        // state change
        // If the calculation is wrong, it is not processed. 如果计算错误,则不进行处理.
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
            return;
        }
        // 若服务引用多 group ,则按照 method + group 聚合 Invoker 集合
        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
        // 销毁不再使用的 Invoker 集合
        try {
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

Dubbo的目录服务简单来说就是消费者将自己能够调用的服务提供者的信息缓存到本地Directory中,当服务提供者有所变化时会通知到注册中心,消费者会监听注册中心相关服务的消息,当收到相关服务提供者变动的消息时会更新本地服务目录Directory。
Dubbo的Directory实现就介绍到这里了。

上一篇 下一篇

猜你喜欢

热点阅读