【dubbo源码】19. 服务消费方:集群容错策略详解

2021-07-21  本文已影响0人  天还下着毛毛雨

前言

image

上一篇降到movk本地伪装其实是对Cluster集群容错类实例的再次包装,调完mockInvoker的invoker方法后,会调到被包装的Cluster实例的invoker,对真正的远程调用执行各种集群容错的策略。

image

集群容错策略的选择

@Reference(cluster="AvailableCluster")
private UserService userService;

Cluster实例都是从SPI工厂里加载而来的,如果没有主动配置,默认得到的就是接口上@SPI的值

image
Cluster$Adaptive源代码 :
public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster {
 public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
     if (arg0 == null)
         throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
     if (arg0.getUrl() == null)
         throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
     // 从url拿cluster的配置,没有配就failover
     com.alibaba.dubbo.common.URL url = arg0.getUrl();
     String extName = url.getParameter("cluster", "failover");
     if (extName == null)
         throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
     com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
     return extension.join(arg0);
 }
}

FailoverClusterInvoker

当首次调用失败后,还会重试指定次数

循环最大调用次数( 1+重试次数),每次都去发起远程调用,如果其中一次调用成功就return,结束循环,否则继续下一次调用如果超出重试次数还没有调用成功,就抛rpc异常

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
    checkInvokers(copyinvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    //获取重试次数
    int len = getUrl().getMethodParameter(methodName,Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    //已经调用过了的服务列表
    List<Invoker<T>> invoked = newArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        //如果掉完一次后,服务列表更新了,再次获取服务列表
        if (i > 0) {
            checkWhetherDestroyed();
            copyinvokers = list(invocation);
            // check again
            checkInvokers(copyinvokers, invocation);
        }
        //根据负载均衡算法,选择一个服务调用
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers,invoked);
        //记录已经调用过的invoker
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            //具体的服务调用逻辑
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " +invocation.getMethodName()
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " +invoker.getUrl().getAddress()
                        + ", but there have been failed providers " +providers
                        + " (" + providers.size() + "/" +copyinvokers.size()
                        + ") from the registry " +directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " +Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            // 期间有一次调用成功就返回结果,结束循环
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    // 如果超出重试次数还没有调用成功,就抛rpc异常
    throw new RpcException(le != null ? le.getCode() : 0, "Failed toinvoke the method "
            + invocation.getMethodName() + " in the service " +getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + " (" + providers.size() + "/" + copyinvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using thedubbo version "
            + Version.getVersion() + ". Last error is: "
            + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}

FailBackClusterInvoker

出错返回空结果,异步重试,起一个线程,等待配置的时间,进行重试

catch到异常

image image

FailfastClusterInvoker

catch到异常并直接抛出RpcException,不进行重试,以快速响应

image

FailsafeClusterInvoker

直接吞掉异常,不进行任何处理,返回空的结果,适用于对结果不敏感的业务

image

ForkingClusterInvoker

同时调n台(默认为2台)主机,选择最快返回结果的那台主机返回的结果

负载均衡后选出与fork数相同的invoker

image

selecte出几个invoker就起对应数量的县城,去发起远程调用,并异步把结果放到LinkedBlockingQueue 阻塞队列中

image

一旦从队列poll到第一个result,就return 结果

image

BroadcastClusterInvoker

服务列表里所有主机都会被调,原子广播
image

AvailableClusterInvoker

没有负载均衡没有重试,调服务之前会先查看tcp存在有效的长连接,确保后端服务正常,如果有,再调用具体的业务方法。用于对数据安全要求比较高的业务
image image
上一篇下一篇

猜你喜欢

热点阅读