Dubbo集群容错——LoadBalance

2020-07-25  本文已影响0人  就这些吗

本系列主要参考官网文档、芋道源码的源码解读和《深入理解Apache Dubbo与实战》一书。Dubbo版本为2.6.1。

文章内容顺序:

  1. 什么是负载均衡

  2. LoadBalance的UML及代码分析

  • 2.1 LoadBalance的UML图
  • 2.2 LoadBalance的接口
  • 2.3AbstractLoadBalance
  • 2.4RandomLoadBalance
  • 2.5 RoundRobinLoadBalance
  • 2.6 LeastActiveLoadBalance
  • 2.7 ConsistentHashLoadBalance
    • 2.7.1ConsistentHashLoadBalance#doSelect

1. 什么是负载均衡

官网有关负载均衡的配置文档:负载均衡
Dubbo 需要对服务消费者的调用请求进行分配,避免少数服务提供者负载过大。服务提供者负载过大,会导致部分请求超时。因此将负载均衡到每个服务提供者上,是非常必要的。

2. LoadBalance的UML及代码分析

2.1 LoadBalance的UML图

LoadBalance 子类如下图:


image.png

2.2 LoadBalance的接口

从LoadBalance 的接口来开始看:

@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {

    /**
     * select one invoker in list.
     *
     * 从 Invoker 集合中,选择一个
     *
     * @param invokers   invokers.
     * @param url        refer url
     * @param invocation invocation.
     * @return selected invoker.
     */
    @Adaptive("loadbalance")
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

接口没啥好说的,只定义了一个select方法交由子类来实现,如果没有设置loadbalance属性,默认实现为RandomLoadBalance

2.3AbstractLoadBalance

我们先来看看他的抽象类AbstractLoadBalance做了什么。

public abstract class AbstractLoadBalance implements LoadBalance {

    static int calculateWarmupWeight(int uptime, int warmup, int weight) {
        // 计算权重
        int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
        // 权重范围为 [0, weight] 之间
        return ww < 1 ? 1 : (ww > weight ? weight : ww);
    }

    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (invokers == null || invokers.isEmpty()) {
            return null;
        }
        if (invokers.size() == 1) {
            return invokers.get(0);
        }
        return doSelect(invokers, url, invocation);
    }

    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

    protected int getWeight(Invoker<?> invoker, Invocation invocation) {
        // 获得 weight 配置,即服务权重。默认为 100
        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
        if (weight > 0) {
            long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
            if (timestamp > 0L) {
                // 获得启动总时长
                int uptime = (int) (System.currentTimeMillis() - timestamp);
                // 获得预热需要总时长。默认为 10 * 60 * 1000 = 10 分钟
                int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
                // 处于预热中,计算当前的权重
                if (uptime > 0 && uptime < warmup) {
                    weight = calculateWarmupWeight(uptime, warmup, weight);
                }
            }
        }
        return weight;
    }

}

考虑到 JVM 自身会有预热的过程,所以服务提供者一启动就直接承担 100% 的流量,可能会出现很吃力的情况。因此权重的计算,默认自带了预热的过程

举个例子:
根据calculateWarmupWeight()方法实现可知,随着provider的启动时间越来越长,慢慢提升权重直到weight,且权重最小值为1,所以:

如果 provider 运行了 1 分钟,那么 weight 为 10,即只有最终需要承担的 10% 流量;
如果 provider 运行了 2 分钟,那么 weight 为 20,即只有最终需要承担的 20% 流量;
如果 provider 运行了 5 分钟,那么 weight 为 50,即只有最终需要承担的 50% 流量;
… …
如果 provider 运行了 10 分钟,那么 weight 为 100,即只有最终需要承担的 100% 流量;

2.4RandomLoadBalance

接下来是默认实现RandomLoadBalance中的方法

public class RandomLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "random";

    private final Random random = new Random();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // Number of invokers
        int totalWeight = 0; // The sum of weights
        boolean sameWeight = true; // Every invoker has the same weight?
        // 计算总权限
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation); // 获得权重
            totalWeight += weight; // Sum
            if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false;
            }
        }
        // 权重不相等,随机后,判断在哪个 Invoker 的权重区间中
        if (totalWeight > 0 && !sameWeight) {
            // 随机
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            int offset = random.nextInt(totalWeight);
            // Return a invoker based on the random value.
            // 区间判断
            for (Invoker<T> invoker : invokers) {
                offset -= getWeight(invoker, invocation);
                if (offset < 0) {
                    return invoker;
                }
            }
        }
        // 权重相等,平均随机
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(random.nextInt(length));
    }

}

光看代码可能比较迷糊,直接来模拟一下就通透了。

FROM 飞哥的 《dubbo源码-负载均衡》

假定有3台dubbo provider:

  • 10.0.0.1:20884, weight=2
  • 10.0.0.1:20886, weight=3
  • 10.0.0.1:20888, weight=4

随机算法的实现:
totalWeight=9;

  • 假设offset=1(即random.nextInt(9)=1)
    1-2=-1<0?是,所以选中 10.0.0.1:20884, weight=2
  • 假设offset=4(即random.nextInt(9)=4)
    4-2=2<0?否,这时候offset=2, 2-3<0?是,所以选中 10.0.0.1:20886, weight=3
  • 假设offset=7(即random.nextInt(9)=7)
    7-2=5<0?否,这时候offset=5, 5-3=2<0?否,这时候offset=2, 2-4<0?是,所以选中 10.0.0.1:20888, weight=4

2.5 RoundRobinLoadBalance

轮循,按公约后的权重设置轮循比率。
存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

public class RoundRobinLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "roundrobin";

    /**
     * 服务方法与计数器的映射
     *
     * KEY:serviceKey + "." + methodName
     */
    private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size(); // Number of invokers
        int maxWeight = 0; // The maximum weight
        int minWeight = Integer.MAX_VALUE; // The minimum weight
        final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
        int weightSum = 0;
        // 计算最小、最大权重,总的权重和。
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
            minWeight = Math.min(minWeight, weight); // Choose the minimum weight
            if (weight > 0) {
                invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
                weightSum += weight;
            }
        }
        // 获得 AtomicPositiveInteger 对象
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        // 获得当前顺序号,并递增 + 1
        int currentSequence = sequence.getAndIncrement();
        // 权重不相等,顺序根据权重分配
        if (maxWeight > 0 && minWeight < maxWeight) {
            int mod = currentSequence % weightSum; // 剩余权重
            for (int i = 0; i < maxWeight; i++) { // 循环最大权重
                for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { // 循环 Invoker 集合
                    final Invoker<T> k = each.getKey();
                    final IntegerWrapper v = each.getValue();
                    // 剩余权重归 0 ,当前 Invoker 还有剩余权重,返回该 Invoker 对象
                    if (mod == 0 && v.getValue() > 0) {
                        return k;
                    }
                    // 若 Invoker 还有权重值,扣除它( value )和剩余权重( mod )。
                    if (v.getValue() > 0) {
                        v.decrement();
                        mod--;
                    }
                }
            }
        }
        // 权重相等,平均顺序获得
        // Round robin
        return invokers.get(currentSequence % length);
    }

    private static final class IntegerWrapper {

        /**
         * 权重值
         */
        private int value;

        public IntegerWrapper(int value) {
            this.value = value;
        }

        public int getValue() {
            return value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        /**
         * 扣除一
         */
        public void decrement() {
            this.value--;
        }

    }

}

当权重不相等时

mod = 0:满足条件,此时直接返回服务器 A

mod = 1:需要进行一次递减操作才能满足条件,此时返回服务器 B

mod = 2:需要进行两次递减操作才能满足条件,此时返回服务器 C

mod = 3:需要进行三次递减操作才能满足条件,经过递减后,服务器权重为 [1, 4, 0],此时返回服务器 A

mod = 4:需要进行四次递减操作才能满足条件,经过递减后,服务器权重为 [0, 4, 0],此时返回服务器 B

mod = 5:需要进行五次递减操作才能满足条件,经过递减后,服务器权重为 [0, 3, 0],此时返回服务器 B

mod = 6:需要进行六次递减操作才能满足条件,经过递减后,服务器权重为 [0, 2, 0],此时返回服务器 B

mod = 7:需要进行七次递减操作才能满足条件,经过递减后,服务器权重为 [0, 1, 0],此时返回服务器 B

经过8次调用后,我们得到的负载均衡结果为 [A, B, C, A, B, B, B, B],次数比 A:B:C = 2:5:1,等于权重比。当 sequence = 8 时,mod = 0,此时重头再来。从上面的模拟过程可以看出,当 mod >= 3 后,服务器 C 就不会被选中了,因为它的权重被减为0了。当 mod >= 4 后,服务器 A 的权重被减为0,此后 A 就不会再被选中。

这里存在一个问题,轮询次数与mod相关,假如 mod 很大,比如 10000,50000,甚至更大时,doSelect 方法需要进行很多次计算才能将 mod 减为0,在后面的版本中,进行了修复,详情可以看下面链接中关于RoundRobinLoadBalance的分析。Dubbo官网的负载均衡分析

当权重相等时

举个例子:
FROM 飞哥的 《dubbo源码-负载均衡》

假定有3台权重都一样的dubbo provider:

  • 10.0.0.1:20884, weight=100
  • 10.0.0.1:20886, weight=100
  • 10.0.0.1:20888, weight=100

轮询算法的实现:
其调用方法某个方法(key)的 sequence 从 0 开始:

  • sequence=0时,选择invokers.get(0%3)=10.0.0.1:20884
  • sequence=1时,选择invokers.get(1%3)=10.0.0.1:20886
  • sequence=2时,选择invokers.get(2%3)=10.0.0.1:20888
  • sequence=3时,选择invokers.get(3%3)=10.0.0.1:20884
  • sequence=4时,选择invokers.get(4%3)=10.0.0.1:20886
  • sequence=5时,选择invokers.get(5%3)=10.0.0.1:20888

2.6 LeastActiveLoadBalance

最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

public class LeastActiveLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "leastactive";

    private final Random random = new Random();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 总个数
        int leastActive = -1; // 最小的活跃数
        int leastCount = 0; // 相同最小活跃数的个数
        int[] leastIndexes = new int[length]; // 相同最小活跃数的下标
        int totalWeight = 0; // 总权重
        int firstWeight = 0; // 第一个权重,用于于计算是否相同
        boolean sameWeight = true; // 是否所有权重相同
        // 计算获得相同最小活跃数的数组和个数
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活跃数
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 权重
            if (leastActive == -1 || active < leastActive) { // 发现更小的活跃数,重新开始
                leastActive = active; // 记录最小活跃数
                leastCount = 1; // 重新统计相同最小活跃数的个数
                leastIndexes[0] = i; // 重新记录最小活跃数下标
                totalWeight = weight; // 重新累计总权重
                firstWeight = weight; // 记录第一个权重
                sameWeight = true; // 还原权重相同标识
            } else if (active == leastActive) { // 累计相同最小的活跃数
                leastIndexes[leastCount++] = i; // 累计相同最小活跃数下标
                totalWeight += weight; // 累计总权重
                // 判断所有权重是否一样
                if (sameWeight && weight != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // assert(leastCount > 0)
        if (leastCount == 1) {
            // 如果只有一个最小则直接返回
            return invokers.get(leastIndexes[0]);
        }
        if (!sameWeight && totalWeight > 0) {
            // 如果权重不相同且权重大于0则按总权重数随机
            int offsetWeight = random.nextInt(totalWeight);
            // 并确定随机值落在哪个片断上
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexes[i];
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0) {
                    return invokers.get(leastIndex);
                }
            }
        }
        // 如果权重相同或权重为0则均等随机
        return invokers.get(leastIndexes[random.nextInt(leastCount)]);
    }

}

此负载均衡会先从通过 RpcStatusactive 属性,来计算每个Invoker的活跃数,
这个active就是当前正在调用中的服务的方法的次数,注意这个是并行执行请求数。
初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。

还是举个例子:
FROM 飞哥的 《dubbo源码-负载均衡》

最小活跃数算法实现:
假定有3台dubbo provider:

  • 10.0.0.1:20884, weight=2,active=2
  • 10.0.0.1:20886, weight=3,active=4
  • 10.0.0.1:20888, weight=4,active=3

active=2最小,且只有一个2,所以选择10.0.0.1:20884

假定有3台dubbo provider:

  • 10.0.0.1:20884, weight=2,active=2
  • 10.0.0.1:20886, weight=3,active=2
  • 10.0.0.1:20888, weight=4,active=3
    active=2最小,且有2个,所以从[10.0.0.1:20884,10.0.0.1:20886 ]中选择;

接下来的算法与随机算法类似:

  • 假设offset=1(即random.nextInt(5)=1)
    1-2=-1<0?是,所以选中 10.0.0.1:20884, weight=2
  • 假设offset=4(即random.nextInt(5)=4)
    4-2=2<0?否,这时候offset=2, 2-3<0?是,所以选中 10.0.0.1:20886, weight=3

2.7 ConsistentHashLoadBalance

一致性 Hash,相同参数的请求总是发到同一提供者。
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
可以类比Redis的一致性哈希算法。

2.7.1ConsistentHashLoadBalance#doSelect

   protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        // 基于 invokers 集合,根据对象内存地址来计算定义哈希值
        int identityHashCode = System.identityHashCode(invokers);
        // 获得 ConsistentHashSelector 对象。若为空,或者定义哈希值变更(说明 invokers 集合发生变化),进行创建新的 ConsistentHashSelector 对象
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        if (selector == null || selector.identityHashCode != identityHashCode) {
            selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
        return selector.select(invocation);
    }

一致性哈希算法的思想如下:
将节点hash后会不均匀地分布在环上,这样大量key在寻找节点时,会存在key命中各个节点的概率差别较大,无法实现有效的负载均衡。
如有三个节点Node1,Node2,Node3,分布在环上时三个节点挨的很近,落在环上的key寻找节点时,大量key顺时针总是分配给Node2,而其它两个节点被找到的概率都会很小。
上面代码中调用的selector是 ConsistentHashLoadBalance 的内部类,一致性哈希选择器,基于 的Ketama算法实现。
可以参考这篇文章:《Ketama一致性Hash算法(含Java代码)》

上一篇下一篇

猜你喜欢

热点阅读