dubbo源码分析-Cluster
Cluster主要就是用来应对出错情况采取的策略,可以看下在dubbo官网中对Cluster的定位:
cluster-key.pngCluster是整个集群的抽象,Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。list Directory、route Router、select LoadBalance最后选择一个具体Invoker,代理通过具体Invoker进行远程调用获取结果。
接下来看下Cluster类的继承关系:
Cluster-Struct.png@SPI(FailoverCluster.NAME)
public interface Cluster {
/**
* Merge the directory invokers to a virtual invoker.
* 合并多个directory形成一个invoker
* @param <T>
* @param directory
* @return cluster invoker
* @throws RpcException
*/
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;
}
可以看到Cluster的具体实现类很多,每个不同的实现类对应不同的集群实现策略:
-
Failsafe Cluster:失败安全,出现异常时,直接忽略。失败安全就是当调用过程中出现异常时,FailsafeClusterInvoker 仅会打印异常,而不会抛出异常。适用于写入审计日志等操作
-
Failover Cluster:失败自动切换,当调用出现失败的时候,会自动切换集群中其他服务器,来获得invoker重试,通常用于读操作,但重试会带来更长延迟。一般都会设置重试次数。默认策略
-
Failfast Cluster:只会进行一次调用,失败后立即抛出异常。适用于幂等操作,比如新增记录。
-
Failback Cluster:失败自动恢复,在调用失败后,返回一个空结果给服务提供者。并通过定时任务对失败的调用记录并且重传,适合执行消息通知等操作。
-
Forking Cluster:会在线程池中运行多个线程,来调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。一般会设置最大并行数。
-
Available Cluster:调用第一个可用的服务器,仅仅应用于多注册中心。
-
Broadcast Cluster:广播调用所有提供者,逐个调用,在循环调用结束后,只要任意一台报错就报错。通常用于通知所有提供者更新缓存或日志等本地资源信息
-
Mergeable Cluster:分组聚合。
-
MockClusterWrapper:本地伪装。
下面以FailoverCluster为例,以服务调用为线进行源码分析:
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
FailoverCluster对join的实现就是创建一个FailoverClusterInvoker,交给FailoverClusterInvoker来进行实现处理。下面开始以服务引用为线来进行分析Cluster,服务引用是给对应的接口创建了代理类,这点有点类似mybatis的Mapper接口的实现,只不过是在dubbo中使用的是javasisst实现的动态代理,说起代理类,那就少不了对InvocationHandler的叙述,他是代理的行为表现。
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
在代理行为中,只是把对应的方法和参数封装成一次方法调用RpcInvocation,然后调用Invoker的invoker方法,默认的集群策略是FailoverCluster,所以这里的Invoker是FailoverClusterInvoker,所以一切又都回到了FailoverClusterInvoker,FailoverClusterInvoker继承自AbstractClusterInvoker,所以这里也是调用AbstractClusterInvoker的invoke方法:
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
//获取重试次数,默认2次,总共最多调用3次
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
//这里是为了及时感应invokers的变化
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
//根据负载均衡策略获取可用Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
//调用具体Invoker的invoke方法
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
//如果是用户异常直接抛出
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}
}
FailoverClusterInvoker是失败重试策略,默认重试3次,重试次数可以具体配置。根据负载均衡获取可用Invoker进行具体invoke调用。Cluster策略不难,也没什么东西,就是对集群出错策略的抽象处理,封装了对Directory、Router和Balance的处理,不同的策略对应不同的实现类,具体通过SPI进行加载,默认为FailoverCluster,可以通过如下配置改变集群容错策略:
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failover" retries="2" />
画一个时序图(待续)