Dubbo源码解析

Dubbo之Directory源码分析

2018-06-27  本文已影响0人  土豆肉丝盖浇饭

Directory的作用

首先看下Directory的接口定义

public interface Directory<T> extends Node {
    Class<T> getInterface();

    List<Invoker<T>> list(Invocation invocation) throws RpcException;
}

每个Directory实例会对应一个接口服务,它的主要功能是为Cluster提供远程对等调用invoker目录服务,list方法用于获取远程服务提供者的对等调用Invokers

public interface Cluster {
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;
}

Directory用于获取多个远程对等调用invoker,而Cluster用于将这些invoker伪装成一个invoker进行集群调用,Cluster源码会单独讲解

Directory的两种实现

Directory有两种实现,StaticDirectoryRegistryDirectory,分别对应静态和动态的Invoker目录服务
这两个实现都继承了模板类AbstractDirectory,让我们来看下AbstractDirectory封装了什么逻辑
可以在AbstractDirectory中可以看到这么一个变量

 private volatile List<Router> routers;

routers用于路由,用于过滤远程对等调用invoker,关于Router路由源码我会单独讲解

可以看到AbstractDirectory实现了Directory接口的list方法

public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }
        List<Invoker<T>> invokers = doList(invocation);
        List<Router> localRouters = this.routers; // local reference
        if (localRouters != null && !localRouters.isEmpty()) {
            for (Router router : localRouters) {
                try {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
        }
        return invokers;
    }

上面逻辑留了模板方法doList给子类实现

protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;

子类实现doList方法,只负责invokers的获取,在AbstractDirectory中增加的router的过滤逻辑。消费者拿到的invoker集合,是经过routers过滤的。

对于StaticDirectory和RegistryDirectory,我们只要关注如何获取远程对等调用invokers的逻辑即可

StaticDirectory

StaticDirectory没什么好讲的,doList方法直接返回设置的invokers

protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {

        return invokers;
    }

RegistryDirectory

RegistryDirectory实现了NotifyListener接口,会随着提供者的上下线动态刷新本地invoker缓存

对于invoker,在RegistryDirectory有两个缓存

private volatile Map<String, Invoker<T>> urlInvokerMap; 

private volatile Map<String, List<Invoker<T>>> methodInvokerMap;

urlInvokerMap缓存url对应的invoker
methodInvokerMap缓存方法对应的invokers,方法对应invoker可以存在多个,所以是 List<Invoker<T>>,在dolist方法中会用到
notify回调会刷新这两个缓存

我们先来看下RegistryDirectory的订阅操作

public void subscribe(URL url) {
        setConsumerUrl(url);
        registry.subscribe(url, this);
    }

这边的registry可以认为就是之前讲的ZookeeperRegistry,把RegistryDirectory自身作为订阅回调,一旦监控的路径发生变化,就会回调RegistryDirectory的notify方法

那么subscribe会订阅那些url?在RegistryProtocol中可以看到

//directory订阅url对应interface的provider,configurators,routers接口目录,回调接口NotifyListener由RegistryDirectory实现
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

会监听这个接口的provider,configurators,routers目录

接下来看notify的实现

public synchronized void notify(List<URL> urls) {
        //这里的通知会一次性传递对应监听目录下所有的url
        List<URL> invokerUrls = new ArrayList<URL>();
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        //对url进行分类
        for (URL url : urls) {
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            if (Constants.ROUTERS_CATEGORY.equals(category)
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }
        // configurators
        //刷新configuratorUrls
        if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
            this.configurators = toConfigurators(configuratorUrls);
        }
        // routers
        //刷新routers
        if (routerUrls != null && !routerUrls.isEmpty()) {
            List<Router> routers = toRouters(routerUrls);
            if (routers != null) { // null - do nothing
                setRouters(routers);
            }
        }
        List<Configurator> localConfigurators = this.configurators; // local reference
        // merge override parameters
        this.overrideDirectoryUrl = directoryUrl;
        //override这个overrideDirectoryUrl什么用?
        if (localConfigurators != null && !localConfigurators.isEmpty()) {
            for (Configurator configurator : localConfigurators) {
                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
            }
        }
        // providers
        //刷新客户端对等invoker
        refreshInvoker(invokerUrls);
    }

方法的开始,会对url进行分类,一共provider,configurators,routers三种类型
先刷新RegistryDirectory中的configurators,routers,再使用provider urls增量刷新invoker缓存
如果provider urls不存在,那么根据上一次的缓存provider urls,再使用router增量刷新invoker缓存

private void refreshInvoker(List<URL> invokerUrls) {
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            //传入的url protocol = empty,directory设置为禁用
            this.forbidden = true; // Forbid to access
            this.methodInvokerMap = null; // Set the method invoker map to null
            //摧毁客户端的对等调用invoker
            destroyAllInvokers(); // Close all invokers
        } else {
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            //invokerUrls为空,因为通知的url可能只改变了router或者configurator,提供者并没有变化,但是对应invoker配置还是需要被更改的
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                //invokerUrls为空使用缓存的invokers urls,也就是上一次回调拿到invokers
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                //更新缓存
                this.cachedInvokerUrls = new HashSet<URL>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            //invokerUrls为空,中止
            if (invokerUrls.isEmpty()) {
                return;
            }
            //把url转换为invoker,已经存在的invoker不会重新创建
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
            //把newUrlInvokerMap转换为methodInvokerMap
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
            // state change
            // If the calculation is wrong, it is not processed.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                return;
            }
            //如果存在group配置,对method对应的invoker进行cluster伪装
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try {
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }

在refreshInvoker方法中,会根据消费者url的protocol过滤掉不匹配的提供者url,然后对过滤后的提供者url生成远程对等调用invoker,如果invoker已经存在,那么不用再重复创建

接下来看下doList方法的实现

public List<Invoker<T>> doList(Invocation invocation) {
        if (forbidden) {
            // 1. No service provider 2. Service providers are disabled
            throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
                "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
                        + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
        }
        List<Invoker<T>> invokers = null;
        //针对每个方法,有不同的invokers列表。可能存在路由配置
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
            //从invocaion获取方法名和参数,可能是$invoke泛化调用
            String methodName = RpcUtils.getMethodName(invocation);
            Object[] args = RpcUtils.getArguments(invocation);
            if (args != null && args.length > 0 && args[0] != null
                    && (args[0] instanceof String || args[0].getClass().isEnum())) {
                invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(methodName);
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
            }
            if (invokers == null) {
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                if (iterator.hasNext()) {
                    invokers = iterator.next();
                }
            }
        }
        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }

直接从methodInvokerMap中获取对应的invoker集合即可。优先通过方法名查找,如果找不到,通过*作为key查找,再找不到,返回methodInvokerMap第一个invoker集合。

讲解完子类需要实现的doList方法后,下面看下RegistryDirectory是如何被使用到的

RegistryDirectory的使用

RegistryDirectory封装了获取远程对等invokers的逻辑,主要使用在RegistryProtocol的doRefer方法

 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        //这边的url为consumer url
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        //这里的protocol为spi注入的适配类
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            //注册consumer url
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        //directory订阅url对应interface的provider,configurators,routers接口目录,回调接口NotifyListener由RegistryDirectory实现
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        //通过cluster封装获取invoker的逻辑,将对多个invoker的集群调用封装成一个invoker
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

RegistryDirectory通过与Cluster配合,将对多个invoker的集群调用封装成一个invoker,然后通过代理把invoker转换为代理对象bean,放入spring容器中去,就和正常使用本地bean一样。这就是RPC。

总结

Dubbo的目录服务,用于获取远程对等invoker,其实这种设计在业务场景中也能用到。
比如我们公司项目的司机池功能,对于每个订单都有一个司机池,并且这个司机池会随着司机状态变化而发生变化,也可以参考Directory的接口设计。

最后

希望大家关注下我的公众号


image
上一篇 下一篇

猜你喜欢

热点阅读