dubbo

4-dubbo源码分析之集群设计

2018-08-30  本文已影响0人  致虑

具体解析

一.Cluster

Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个

image.png
image.png
image.png
image.png
image.png

这个代码无非就是将相关的consumer调用信息进行构造封装,返回,但真正发挥作用的地方就是那个返回的Invoker: MockClusterInvoker-->FailoverClusterInvoker,为什么?因为这一步直接决定最终发起远程调用时所使用的ClusterInvoker,也就是如下的doInvoker方法:

   public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {

   public FailfastClusterInvoker(Directory<T> directory) {
       super(directory);
   }

   @Override
   public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
       checkInvokers(invokers, invocation);
       Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
       try {
           return invoker.invoke(invocation);
       } catch (Throwable e) {
           if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
               throw (RpcException) e;
           }
           throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
       }
     }
   }

代码逻辑一目俩然

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
       private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);
   
       public FailsafeClusterInvoker(Directory<T> directory) {
           super(directory);
       }
   
       @Override
       public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
           try {
               checkInvokers(invokers, invocation);
               Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
               return invoker.invoke(invocation);
           } catch (Throwable e) {
               logger.error("Failsafe ignore exception: " + e.getMessage(), e);
               return new RpcResult(); // ignore
           }
       }
}   
   public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {

       private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);
   
       private static final long RETRY_FAILED_PERIOD = 5 * 1000;
   
       /**
        * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread}
        * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
        */
       private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
               new NamedInternalThreadFactory("failback-cluster-timer", true));
   
       private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
       private volatile ScheduledFuture<?> retryFuture;
   
       public FailbackClusterInvoker(Directory<T> directory) {
           super(directory);
       }
   
       private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
           if (retryFuture == null) {
               synchronized (this) {
                   if (retryFuture == null) {
                       retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
   
                           @Override
                           public void run() {
                               // collect retry statistics
                               try {
                                   retryFailed();
                               } catch (Throwable t) { // Defensive fault tolerance
                                   logger.error("Unexpected error occur at collect statistic", t);
                               }
                           }
                       }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
                   }
               }
           }
           failed.put(invocation, router);
       }
   
       void retryFailed() {
           if (failed.size() == 0) {
               return;
           }
           for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(failed).entrySet()) {
               Invocation invocation = entry.getKey();
               Invoker<?> invoker = entry.getValue();
               try {
                   invoker.invoke(invocation);
                   failed.remove(invocation);
               } catch (Throwable e) {
                   logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
               }
           }
       }
   
       @Override
       protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
           try {
               checkInvokers(invokers, invocation);
               Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
               return invoker.invoke(invocation);
           } catch (Throwable e) {
               logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e);
               addFailed(invocation, this);
               return new RpcResult(); // ignore
           }
       }
}
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
       /**
        * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread}
        * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
        */
       private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));
   
       public ForkingClusterInvoker(Directory<T> directory) {
           super(directory);
       }
   
       @Override
       @SuppressWarnings({"unchecked", "rawtypes"})
       public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
           checkInvokers(invokers, invocation);
           final List<Invoker<T>> selected;
           final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
           final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
           if (forks <= 0 || forks >= invokers.size()) {
               selected = invokers;
           } else {
               selected = new ArrayList<Invoker<T>>();
               for (int i = 0; i < forks; i++) {
                   // TODO. Add some comment here, refer chinese version for more details.
                   Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                   if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
                       selected.add(invoker);
                   }
               }
           }
           RpcContext.getContext().setInvokers((List) selected);
           final AtomicInteger count = new AtomicInteger();
           final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
           for (final Invoker<T> invoker : selected) {
               executor.execute(new Runnable() {
                   @Override
                   public void run() {
                       try {
                           Result result = invoker.invoke(invocation);
                           ref.offer(result);
                       } catch (Throwable e) {
                           int value = count.incrementAndGet();
                           if (value >= selected.size()) {
                               ref.offer(e);
                           }
                       }
                   }
               });
           }
           try {
               Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
               if (ret instanceof Throwable) {
                   Throwable e = (Throwable) ret;
                   throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
               }
               return (Result) ret;
           } catch (InterruptedException e) {
               throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
           }
       }
}
   public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {

       private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
   
       public BroadcastClusterInvoker(Directory<T> directory) {
           super(directory);
       }
   
       @Override
       @SuppressWarnings({"unchecked", "rawtypes"})
       public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
           checkInvokers(invokers, invocation);
           RpcContext.getContext().setInvokers((List) invokers);
           RpcException exception = null;
           Result result = null;
           for (Invoker<T> invoker : invokers) {
               try {
                   result = invoker.invoke(invocation);
               } catch (RpcException e) {
                   exception = e;
                   logger.warn(e.getMessage(), e);
               } catch (Throwable e) {
                   exception = new RpcException(e.getMessage(), e);
                   logger.warn(e.getMessage(), e);
               }
           }
           if (exception != null) {
               throw exception;
           }
           return result;
       }
}

2.LoadBalance
   /**
* 负载均衡-四种负载均衡策略
* LoadBalance. (SPI, Singleton, ThreadSafe)
* <p>
* <a href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load-Balancing</a>
*
* @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory)
*/
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {

       /**
        * select one invoker in list.
        *
        * @param invokers   invokers.
        * @param url        refer url
        * @param invocation invocation.
        * @return selected invoker.
        */
       @Adaptive("loadbalance")
       <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}

默认取的是RandomLoadBalance,那我们就以消费流程去详细解析下这个负债均衡策略。

@Override
   public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {

       if (invokers == null || invokers.isEmpty()) return null;

       if (invokers.size() == 1) return invokers.get(0);

       // 进行选择,具体的子类实现,我们这里是RandomLoadBalance
       return doSelect(invokers, url, invocation);
   }

又是钩子,具体就顺其到子类了:

   /**
* random load balance.
* 默认的策略
*
* 随机,按权重设置随机概率。
* 在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
*
* 1.获取invokers的个数,并遍历累加权重
* 2.若不为第0个,则将当前权重与上一个进行比较,只要有一个不等则认为不等,即:sameWeight=false
* 3.若总权重>0 且 sameWeight=false 按权重获取随机数,根据随机数合权重相减确定调用节点
* 4.sameWeight=true,则均等随机调用
*
* eg:假设有四个集群节点A,B,C,D,对应的权重分别是1,2,3,4,那么请求到A节点的概率就为1/(1+2+3+4) = 10%.B,C,D节点依次类推为20%,30%,40%.
*/
public class RandomLoadBalance extends AbstractLoadBalance {

   public static final String NAME = "random";

   private final Random random = new Random();

   @Override
   protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
       int length = invokers.size(); // Number of invokers 总个数
       int totalWeight = 0; // The sum of weights 总权重
       boolean sameWeight = true; // Every invoker has the same weight? 权重是否都一样
       for (int i = 0; i < length; i++) {
           int weight = getWeight(invokers.get(i), invocation);
           totalWeight += weight; // Sum 累计总权重
           if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) {
               sameWeight = false; // 计算所有权重是否都一样
           }
       }

       // eg: 总权重为10(1+2+3+4),那么怎么做到按权重随机呢?根据10随机出一个整数,假如为随机出来的是2.然后依次和权重相减,比如2(随机数)-1(A的权重) = 1,然后1(上一步计算的结果)-2(B的权重) = -1,此时-1 < 0,那么则调用B,其他的以此类推
       if (totalWeight > 0 && !sameWeight) {
           // 如果权重不相同且权重大于0.则按总权重数随机
           // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
           int offset = random.nextInt(totalWeight);
           // 确定随机值落在那个片段上
           // Return a invoker based on the random value.
           for (int i = 0; i < length; i++) {
               offset -= getWeight(invokers.get(i), invocation);
               if (offset < 0) {
                   return invokers.get(i);
               }
           }
       }
       // 如果权重相同或权重为0则均等随机
       // If all invokers have the same weight value or totalWeight=0, return evenly.
       return invokers.get(random.nextInt(length));
   }
}

当前策略的算法在注释中很清楚了,这里不在细说。其他三种负债均衡其实处理方式大致相同,简单列一下:

轮循,按公约后的权重设置轮循比率。

   @Override
   protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
       String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
       int length = invokers.size(); // Number of invokers invokers的个数
       int maxWeight = 0; // The maximum weight // 最大权重
       int minWeight = Integer.MAX_VALUE; // The minimum weight 最小权重
       final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
       int weightSum = 0;
       for (int i = 0; i < length; i++) {
           int weight = getWeight(invokers.get(i), invocation);
           maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight  累计最大权重
           minWeight = Math.min(minWeight, weight); // Choose the minimum weight  累计最小权重
           if (weight > 0) {
               invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
               weightSum += weight;
           }
       }
       AtomicPositiveInteger sequence = sequences.get(key);
       if (sequence == null) {
           sequences.putIfAbsent(key, new AtomicPositiveInteger());
           sequence = sequences.get(key);
       }
       int currentSequence = sequence.getAndIncrement();
       if (maxWeight > 0 && minWeight < maxWeight) {  // 如果权重不一样
           int mod = currentSequence % weightSum;
           for (int i = 0; i < maxWeight; i++) {
               for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                   final Invoker<T> k = each.getKey();
                   final IntegerWrapper v = each.getValue();
                   if (mod == 0 && v.getValue() > 0) {
                       return k;
                   }
                   if (v.getValue() > 0) {
                       v.decrement();
                       mod--;
                   }
               }
           }
       }
       // Round robin 取模循环
       return invokers.get(currentSequence % length);
   }

最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大

举个实际的例子:
A请求接受一个请求时计数+1,请求完再-1;B请求接受一个请求时,计数+1,请求完计数-1;按照这种逻辑,如果请求中的节点肯定比没有请求的计数低,因此找计数低的服务处理。场景就是:处理越慢的服务,计数越容易高,因此将后面请求分发给计数低的服务会更加友好。

   @Override
   protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
       int length = invokers.size(); // Number of invokers ,invoker总数
       int leastActive = -1; // The least active value of all invokers ,所有invoker的最小活跃数
       int leastCount = 0; // The number of invokers having the same least active value (leastActive)  拥有最小活跃数的Invoker是的个数
       int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)  拥有最小活跃数的Invoker的下标,也就是将最小活跃的invoker集中放入新数组,以便后续遍历
       int totalWeight = 0; // The sum of weights  总权重
       int firstWeight = 0; // Initial value, used for comparision  初始权重,用于计算是否相同
       boolean sameWeight = true; // Every invoker has the same weight value?  是否所有invoker的权重都相同
       for (int i = 0; i < length; i++) {
           Invoker<T> invoker = invokers.get(i);
           int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number 活跃数
           int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight
           if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.  如果发现更小的活跃数则重新开始
               leastActive = active; // Record the current least active value 记录下最小的活跃数
               leastCount = 1; // Reset leastCount, count again based on current leastCount 重新统计最小活跃数的个数
               leastIndexs[0] = i; // Reset  重置小标
               totalWeight = weight; // Reset
               firstWeight = weight; // Record the weight the first invoker 重置第一个权重
               sameWeight = true; // Reset, every invoker has the same weight value?  重置是否权重相同标识
           } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.  累计相同的最小活跃数
               leastIndexs[leastCount++] = i; // Record index number of this invoker  累计相同的最小活跃invoker的小标
               totalWeight += weight; // Add this invoker's weight to totalWeight. 累加总权重
               // If every invoker has the same weight?  是否所有权重一样
               if (sameWeight && i > 0
                       && weight != firstWeight) {
                   sameWeight = false;
               }
           }
       }
       // assert(leastCount > 0)
       if (leastCount == 1) {
           // 如果只有一个最小则直接返回
           // If we got exactly one invoker having the least active value, return this invoker directly.
           return invokers.get(leastIndexs[0]);
       }
       if (!sameWeight && totalWeight > 0) {
           // 如果权重不相同且总权重大于0,则按总权重随机
           // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
           int offsetWeight = random.nextInt(totalWeight);
           // 按随机数去值
           // Return a invoker based on the random value.
           for (int i = 0; i < leastCount; i++) {
               int leastIndex = leastIndexs[i];
               offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
               if (offsetWeight <= 0)
                   return invokers.get(leastIndex);
           }
       }
       // 如果权重相同或总权重为0,则均等随机
       // If all invokers have the same weight value or totalWeight=0, return evenly.
       return invokers.get(leastIndexs[random.nextInt(leastCount)]);
   }

一致性 Hash,相同参数的请求总是发到同一提供者。当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

   protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
       String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
       int identityHashCode = System.identityHashCode(invokers);
       ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
       if (selector == null || selector.identityHashCode != identityHashCode) {
           selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
           selector = (ConsistentHashSelector<T>) selectors.get(key);
       }
       return selector.select(invocation);
   }

具体相关算法:
http://en.wikipedia.org/wiki/Consistent_hashing


3.Router
   /**
* Router. (SPI, Prototype, ThreadSafe)
* <p>
* <a href="http://en.wikipedia.org/wiki/Routing">Routing</a>
*
* @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory)
* @see com.alibaba.dubbo.rpc.cluster.Directory#list(Invocation)
*/
public interface Router extends Comparable<Router> {

       /**
        * get the router url.
        *
        * @return url
        */
       URL getUrl();
   
       /**
        * route.
        *
        * @param invokers
        * @param url        refer url
        * @param invocation
        * @return routed invokers
        * @throws RpcException
        */
       <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
核心方法已经出现了。我们还是按照原有思路debug一下:
image.png image.png
image.png

OK ,路由核心出现了,上面方法做了两件事:
- 1.RegistryDirectory doList(invocation)将所有可用的invokers根据参数条件筛选出来;
- 2.根据路由规则,将directory中筛选出来的invokers进行过滤,比如MockInvokersSelector将所有mock invokers过滤掉。

image.png
image.png

过滤出来的invokers再返回即完成路由操作。路由执行大体流程就是如此,接下来列一下几个路由策略:

条件路由: 根据dubbo管理控制台配置的路由规则来过滤相关的invoker,这里会实时触发RegistryDirectory类的notify方法,通知本地重建invokers

```
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
    if (invokers == null || invokers.isEmpty()) {
        return invokers;
    }
    try {
        if (!matchWhen(url, invocation)) {
            return invokers;
        }
        List<Invoker<T>> result = new ArrayList<Invoker<T>>();
        if (thenCondition == null) {
            logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
            return result;
        }
        for (Invoker<T> invoker : invokers) {
            if (matchThen(invoker.getUrl(), url)) {
                result.add(invoker);
            }
        }
        if (!result.isEmpty()) {
            return result;
        } else if (force) {
            logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY));
            return result;
        }
    } catch (Throwable t) {
        logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
    }
    return invokers;
}
```

OK 路由基本就分析到这里;


4.Directory
   /**
* Directory. (SPI, Prototype, ThreadSafe)
* <p>
* <a href="http://en.wikipedia.org/wiki/Directory_service">Directory Service</a>
*
* Directory 代表多个 Invoker,可以把它看成 List<Invoker> ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更
*
* @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory)
*/
public interface Directory<T> extends Node {

       /**
        * get service type.
        *
        * @return service type.
        */
       Class<T> getInterface();
   
       /**
        * list invokers.
        *
        * @return invokers
        */
       List<Invoker<T>> list(Invocation invocation) throws RpcException;
}

而此处list方法的核心逻辑也是在分析Route中就已经见过了,不在分析;
Directory能够动态根据注册中心维护Invokers列表,是因为相关Listener在被notify之后会触发methodInvokerMap和urlInvokerMap等缓存的相关变动;最后在list方法中也就实时取出了最新的invokers;看下之前的流程就清楚了;

构造方法传入invokers,因此这个Directory的invokers是不会动态变化的,使用场景不多;

public StaticDirectory(List<Invoker<T>> invokers) {
       this(null, invokers, null);
   }

   public StaticDirectory(List<Invoker<T>> invokers, List<Router> routers) {
       this(null, invokers, routers);
   }

   public StaticDirectory(URL url, List<Invoker<T>> invokers) {
       this(url, invokers, null);
   }

   public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {
       super(url == null && invokers != null && !invokers.isEmpty() ? invokers.get(0).getUrl() : url, routers);
       if (invokers == null || invokers.isEmpty())
           throw new IllegalArgumentException("invokers == null");
       this.invokers = invokers;
   }

根据注册中心的推送变更,动态维护invokers列表;

整个集群大致模块就到这里。

上一篇下一篇

猜你喜欢

热点阅读