【dubbo源码】19. 服务消费方:集群容错策略详解
2021-07-21 本文已影响0人
天还下着毛毛雨
前言
image上一篇降到movk本地伪装其实是对Cluster集群容错类实例的再次包装,调完mockInvoker的invoker方法后,会调到被包装的Cluster实例的invoker,对真正的远程调用执行各种集群容错的策略。
image集群容错策略的选择
- 配置方式 :指定集群容错策略为available`
@Reference(cluster="AvailableCluster")
private UserService userService;
- 默认策略 :failover
Cluster实例都是从SPI工厂里加载而来的,如果没有主动配置,默认得到的就是接口上@SPI的值
imageCluster$Adaptive源代码 :
public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster {
public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
// 从url拿cluster的配置,没有配就failover
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("cluster", "failover");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
return extension.join(arg0);
}
}
FailoverClusterInvoker
当首次调用失败后,还会重试指定次数
循环最大调用次数( 1+重试次数),每次都去发起远程调用,如果其中一次调用成功就return,结束循环,否则继续下一次调用如果超出重试次数还没有调用成功,就抛rpc异常
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
//获取重试次数
int len = getUrl().getMethodParameter(methodName,Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
//已经调用过了的服务列表
List<Invoker<T>> invoked = newArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
//如果掉完一次后,服务列表更新了,再次获取服务列表
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
//根据负载均衡算法,选择一个服务调用
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers,invoked);
//记录已经调用过的invoker
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
//具体的服务调用逻辑
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " +invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " +invoker.getUrl().getAddress()
+ ", but there have been failed providers " +providers
+ " (" + providers.size() + "/" +copyinvokers.size()
+ ") from the registry " +directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " +Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
// 期间有一次调用成功就返回结果,结束循环
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 如果超出重试次数还没有调用成功,就抛rpc异常
throw new RpcException(le != null ? le.getCode() : 0, "Failed toinvoke the method "
+ invocation.getMethodName() + " in the service " +getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using thedubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}
FailBackClusterInvoker
出错返回空结果,异步重试,起一个线程,等待配置的时间,进行重试
catch到异常
image imageFailfastClusterInvoker
catch到异常并直接抛出RpcException,不进行重试,以快速响应
imageFailsafeClusterInvoker
直接吞掉异常,不进行任何处理,返回空的结果,适用于对结果不敏感的业务
imageForkingClusterInvoker
同时调n台(默认为2台)主机,选择最快返回结果的那台主机返回的结果
负载均衡后选出与fork数相同的invoker
imageselecte出几个invoker就起对应数量的县城,去发起远程调用,并异步把结果放到LinkedBlockingQueue 阻塞队列中
image一旦从队列poll到第一个result,就return 结果
image