dubbo

dubbo源码愫读(8) dubbo的集群、路由、负载均衡策略分

2019-02-13  本文已影响49人  桥头放牛娃

dubbo是一个分布式服务框架,能避免单点故障和支持服务的横向扩容。一个服务通常会部署多个实例,同时一个服务能注册到多个注册中心。如何从多个服务 Provider 组成的集群中挑选出一个进行调用,就涉及到一个负载均衡的策略。

1、dubbo负载均衡实现说明

dubbo服务调用流程图:

dubbo服务调用流程图.png

从以上调用流程图可知,dubbo的负载均衡主要在客户端实现,并通过封装Cluster、Directory、LoadBalance相关接口实现。

1.1、Cluster、Directory、Router、LoadBalance关系

关系图.png

各组件关系说明:

1.2、客户端负载均衡源码分析

1.2.1、ReferenceConfig中负载均衡的封装

客户端在进行代理处理时,在如下地方对负载均衡相关进行封装:
包路径:dubbo-config->dubbo-config-api
类名:ReferenceConfig
方法名:createProxy()

if (urls.size() == 1) {
    invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
    URL registryURL = null;
    for (URL url : urls) {
        invokers.add(refprotocol.refer(interfaceClass, url));
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            registryURL = url; // use last registry url
        }
    }
    if (registryURL != null) { // registry url is available
        // use RegistryAwareCluster only when register's cluster is available
        URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
        // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
        invoker = cluster.join(new StaticDirectory(u, invokers));
    } else { // not a registry url, must be direct invoke.
        invoker = cluster.join(new StaticDirectory(invokers));
    }
}

处理流程:

1.2.2、RegistryProtocol中负载均衡的封装

RegistryProtocol中对单个注册中心进行了负载均衡的封装:
包路径:dubbo-registry->dubbo-registry-api
类名:RegistryProtocol
方法名:doRefer()

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        registry.register(getRegisteredConsumerUrl(subscribeUrl, url));
    }
    directory.buildRouterChain(subscribeUrl);
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}

主要处理流程:

1.2.3、RegistryDirectory中负载均衡处理的封装

RegistryDirectory主要封装了订阅、信息变更通知处理、获取服务提供者信息等;
包路径:dubbo-registry->dubbo-registry-api
类名:RegistryDirectory

(1)、订阅感兴趣的信息

源码如下:

public void subscribe(URL url) {
    setConsumerUrl(url);
    consumerConfigurationListener.addNotifyListener(this);
    serviceConfigurationListener = new  ReferenceConfigurationListener(this, url);
    registry.subscribe(url, this);
}

本处主要对消费端的订阅进行了处理,消费端向注册中心订阅三个信息:配置信息、服务提供者、路由信息;当这三个信息有任何变更,本地就会接到通知,并进行处理;

(2)、配置信息、服务提供者、路由信息变更通知处理

源码如下:

public synchronized void notify(List<URL> urls) {
    List<URL> categoryUrls = urls.stream()
            .filter(this::isValidCategory)
            .filter(this::isNotCompatibleFor26x)
            .collect(Collectors.toList());

    /**
     * TODO Try to refactor the processing of these three type of urls using Collectors.groupBy()?
     */
    this.configurators = Configurator.toConfigurators(classifyUrls(categoryUrls, UrlUtils::isConfigurator))
            .orElse(configurators);

    toRouters(classifyUrls(categoryUrls, UrlUtils::isRoute)).ifPresent(this::addRouters);

    // providers
    refreshOverrideAndInvoker(classifyUrls(categoryUrls, UrlUtils::isProvider));
}

当有信息变更时,本方法就会被调用,会根据变更的配置或路由信息或服务提供者进行相应处理;若路由信息变更,则重新构建路由过滤链;若服务提供者变更,则重构刷新本地缓存的服务提供者列表;

(3)、获取服务提供者列表

源码如下:

public List<Invoker<T>> doList(Invocation invocation) {
    if (forbidden) {
        // 1. No service provider 2. Service providers are disabled
        throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
                getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
                NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
                ", please check status of providers(disabled, not registered or in blacklist).");
    }

    if (multiGroup) {
        return this.invokers == null ? Collections.emptyList() : this.invokers;
    }

    List<Invoker<T>> invokers = null;
    try {
        // Get invokers from cache, only runtime routers will be executed.
        invokers = routerChain.route(getConsumerUrl(), invocation);
    } catch (Throwable t) {
        logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
    }


    // FIXME Is there any need of failing back to Constants.ANY_VALUE or the first available method invokers when invokers is null?
    /*Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
    if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
        String methodName = RpcUtils.getMethodName(invocation);
        invokers = localMethodInvokerMap.get(methodName);
        if (invokers == null) {
            invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
        }
        if (invokers == null) {
            Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
            if (iterator.hasNext()) {
                invokers = iterator.next();
            }
        }
    }*/
    return invokers == null ? Collections.emptyList() : invokers;
}

当负载均衡获取可用的服务提供者列表时会调用此方法,此方法主要根据注册中心提供的服务提供者列表,并利用路由规则对提供者列表进行过滤。

1.2.4、FailoverCluster中负载均衡处理的封装

包路径:dubbo-cluster

dubbo中默认的Cluster实现为FailoverCluster,其主要是通过join()方法将Directory进行封装的,而实际的处理是通过FailoverClusterInvoker实现的,客户端调用服务时就是通过此invoker.invoke()进行实际调用处理的;

源码如下:

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    return new FailoverClusterInvoker<T>(directory);
}

FailoverClusterInvoker.invoke()实现:

public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();

    // binding attachments into invocation.
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    List<Invoker<T>> invokers = list(invocation);
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

主要处理流程:

FailoverClusterInvoker.doInvoke()实现:

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            // check again
            checkInvokers(copyInvokers, invocation);
        }
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + methodName
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers
                        + " (" + providers.size() + "/" + copyInvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(le.getCode(), "Failed to invoke the method "
            + methodName + " in the service " + getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + " (" + providers.size() + "/" + copyInvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "
            + le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

主要处理流程为:调用select()选取服务提供者并调用;select()中主要调用LoadBalance.selct()进行选择;

2、Directory实现分析

Directory代表多个Invoker,可以把它看成List,但与List不同的是,它的值可能是动态变化的,比如注册中心推送变更。Cluster将Directory中的多个Invoker伪装成一个Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。

Directory接口类继承图:

Directory类继承图.png

RegistryDirectory:

RegistryDirectory实现了NotifyListener接口,因此他本身也是一个监听器,可以在服务变更时接受通知,消费方要调用远程服务,会向注册中心订阅这个服务的所有的服务提供方,订阅的时候会调用notify方法,进行invoker实例的重新生成,也就是服务的重新引用。在服务提供方有变动时,也会调用notify方法,有关notify方法在Dubbo中订阅和通知解析那篇文章中已经解释,不做重复。subscribe方法也不做重复解释。

StaticDirectory:

静态目录服务,当有多个注册中心时会使用此实现。

3、Cluster实现分析

Dubbo中的Cluster可以将多个服务提供方伪装成一个提供方,具体也就是将Directory中的多个Invoker伪装成一个Invoker,在伪装的过程中包含了容错的处理,负载均衡的处理和路由的处理。

Cluster主要实现的类继承图:

Cluster类继承图.png

AbstractClusterInvoker主要实现类继承图:

AbstractClusterInvoker类继承图.png

集群的容错模式:

failover(默认):

failfast:

failsafe:

failback:

forking:

broadcast:

4、LoadBalance实现分析

LoadBalance类继承图.png

random:

roundrobin:

leastactive:

consistenthash:

5、Router实现分析

dubbo的路由干的事,就是一个请求过来,dubbo依据配置的路由规则,计算出哪些提供者可以提供这次的请求服务。所以,它的优先级是在集群容错策略和负载均衡策略之前的。即先有路由规则遴选出符合条件的服务提供者然后,再在这些服务提供者之中应用负载均衡,集群容错策略。

Router接口继承图:

Router类继承图.png

ScriptRouter:
脚本路由规则 支持 JDK 脚本引擎的所有脚本,比如:javascript, jruby, groovy 等,通过 type=javascript 参数设置脚本类型,缺省为 javascript。

ConditionRouter:
条件路由主要就是根据dubbo管理控制台配置的路由规则来过滤相关的invoker,当我们对路由规则点击启用的时候,就会触发RegistryDirectory类的notify方法,其会重构本地路由调用链,而当从Directory中获取服务提供者的list时,会利用此路由规则将提供者列表进行过滤;

上一篇 下一篇

猜你喜欢

热点阅读