dubbo源码分析-Cluster

2019-12-23  本文已影响0人  圣村的希望

     Cluster主要就是用来应对出错情况采取的策略,可以看下在dubbo官网中对Cluster的定位:

cluster-key.png

     Cluster是整个集群的抽象,Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。list Directory、route Router、select LoadBalance最后选择一个具体Invoker,代理通过具体Invoker进行远程调用获取结果。

     接下来看下Cluster类的继承关系:

Cluster-Struct.png
@SPI(FailoverCluster.NAME)
public interface Cluster {
​
    /**
     * Merge the directory invokers to a virtual invoker.
     * 合并多个directory形成一个invoker
     * @param <T>
     * @param directory
     * @return cluster invoker
     * @throws RpcException
     */
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;
​
}

     可以看到Cluster的具体实现类很多,每个不同的实现类对应不同的集群实现策略:

public class FailoverCluster implements Cluster {
​
    public final static String NAME = "failover";
​
    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }
}

     FailoverCluster对join的实现就是创建一个FailoverClusterInvoker,交给FailoverClusterInvoker来进行实现处理。下面开始以服务引用为线来进行分析Cluster,服务引用是给对应的接口创建了代理类,这点有点类似mybatis的Mapper接口的实现,只不过是在dubbo中使用的是javasisst实现的动态代理,说起代理类,那就少不了对InvocationHandler的叙述,他是代理的行为表现。

public class InvokerInvocationHandler implements InvocationHandler {
​
    private final Invoker<?> invoker;
​
    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }
​
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

     在代理行为中,只是把对应的方法和参数封装成一次方法调用RpcInvocation,然后调用Invoker的invoker方法,默认的集群策略是FailoverCluster,所以这里的Invoker是FailoverClusterInvoker,所以一切又都回到了FailoverClusterInvoker,FailoverClusterInvoker继承自AbstractClusterInvoker,所以这里也是调用AbstractClusterInvoker的invoke方法:

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyinvokers = invokers;
        checkInvokers(copyinvokers, invocation);
        //获取重试次数,默认2次,总共最多调用3次
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            //这里是为了及时感应invokers的变化
            if (i > 0) {
                checkWhetherDestroyed();
                copyinvokers = list(invocation);
                // check again
                checkInvokers(copyinvokers, invocation);
            }
            //根据负载均衡策略获取可用Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                //调用具体Invoker的invoke方法
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                //如果是用户异常直接抛出
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyinvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }
}

     FailoverClusterInvoker是失败重试策略,默认重试3次,重试次数可以具体配置。根据负载均衡获取可用Invoker进行具体invoke调用。Cluster策略不难,也没什么东西,就是对集群出错策略的抽象处理,封装了对Directory、Router和Balance的处理,不同的策略对应不同的实现类,具体通过SPI进行加载,默认为FailoverCluster,可以通过如下配置改变集群容错策略:

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failover" retries="2" />

画一个时序图(待续)

上一篇下一篇

猜你喜欢

热点阅读