Dubbo集群容错——LoadBalance
本系列主要参考官网文档、芋道源码的源码解读和《深入理解Apache Dubbo与实战》一书。Dubbo版本为2.6.1。
文章内容顺序:
什么是负载均衡
LoadBalance的UML及代码分析
- 2.1 LoadBalance的UML图
- 2.2 LoadBalance的接口
- 2.3AbstractLoadBalance
- 2.4RandomLoadBalance
- 2.5 RoundRobinLoadBalance
- 2.6 LeastActiveLoadBalance
- 2.7 ConsistentHashLoadBalance
- 2.7.1ConsistentHashLoadBalance#doSelect
1. 什么是负载均衡
官网有关负载均衡的配置文档:负载均衡
Dubbo 需要对服务消费者的调用请求进行分配,避免少数服务提供者负载过大。服务提供者负载过大,会导致部分请求超时。因此将负载均衡到每个服务提供者上,是非常必要的。
2. LoadBalance的UML及代码分析
2.1 LoadBalance的UML图
LoadBalance 子类如下图:
image.png
2.2 LoadBalance的接口
从LoadBalance 的接口来开始看:
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
/**
* select one invoker in list.
*
* 从 Invoker 集合中,选择一个
*
* @param invokers invokers.
* @param url refer url
* @param invocation invocation.
* @return selected invoker.
*/
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
接口没啥好说的,只定义了一个select方
法交由子类来实现,如果没有设置loadbalance
属性,默认实现为RandomLoadBalance
。
2.3AbstractLoadBalance
我们先来看看他的抽象类AbstractLoadBalance做了什么。
public abstract class AbstractLoadBalance implements LoadBalance {
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 计算权重
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
// 权重范围为 [0, weight] 之间
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.isEmpty()) {
return null;
}
if (invokers.size() == 1) {
return invokers.get(0);
}
return doSelect(invokers, url, invocation);
}
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
// 获得 weight 配置,即服务权重。默认为 100
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// 获得启动总时长
int uptime = (int) (System.currentTimeMillis() - timestamp);
// 获得预热需要总时长。默认为 10 * 60 * 1000 = 10 分钟
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
// 处于预热中,计算当前的权重
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
}
考虑到 JVM 自身会有预热的过程,所以服务提供者一启动就直接承担 100% 的流量,可能会出现很吃力的情况。因此权重的计算,默认自带了预热的过程
举个例子:
根据calculateWarmupWeight()方法实现可知,随着provider的启动时间越来越长,慢慢提升权重直到weight,且权重最小值为1,所以:如果 provider 运行了 1 分钟,那么 weight 为 10,即只有最终需要承担的 10% 流量;
如果 provider 运行了 2 分钟,那么 weight 为 20,即只有最终需要承担的 20% 流量;
如果 provider 运行了 5 分钟,那么 weight 为 50,即只有最终需要承担的 50% 流量;
… …
如果 provider 运行了 10 分钟,那么 weight 为 100,即只有最终需要承担的 100% 流量;
2.4RandomLoadBalance
接下来是默认实现RandomLoadBalance中的方法
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int totalWeight = 0; // The sum of weights
boolean sameWeight = true; // Every invoker has the same weight?
// 计算总权限
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation); // 获得权重
totalWeight += weight; // Sum
if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
// 权重不相等,随机后,判断在哪个 Invoker 的权重区间中
if (totalWeight > 0 && !sameWeight) {
// 随机
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = random.nextInt(totalWeight);
// Return a invoker based on the random value.
// 区间判断
for (Invoker<T> invoker : invokers) {
offset -= getWeight(invoker, invocation);
if (offset < 0) {
return invoker;
}
}
}
// 权重相等,平均随机
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(random.nextInt(length));
}
}
光看代码可能比较迷糊,直接来模拟一下就通透了。
FROM 飞哥的 《dubbo源码-负载均衡》
假定有3台dubbo provider:
- 10.0.0.1:20884, weight=2
- 10.0.0.1:20886, weight=3
- 10.0.0.1:20888, weight=4
随机算法的实现:
totalWeight=9;
- 假设offset=1(即random.nextInt(9)=1)
1-2=-1<0?是,所以选中 10.0.0.1:20884, weight=2- 假设offset=4(即random.nextInt(9)=4)
4-2=2<0?否,这时候offset=2, 2-3<0?是,所以选中 10.0.0.1:20886, weight=3- 假设offset=7(即random.nextInt(9)=7)
7-2=5<0?否,这时候offset=5, 5-3=2<0?否,这时候offset=2, 2-4<0?是,所以选中 10.0.0.1:20888, weight=4
2.5 RoundRobinLoadBalance
轮循,按公约后的权重设置轮循比率。
存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。
public class RoundRobinLoadBalance extends AbstractLoadBalance {
public static final String NAME = "roundrobin";
/**
* 服务方法与计数器的映射
*
* KEY:serviceKey + "." + methodName
*/
private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size(); // Number of invokers
int maxWeight = 0; // The maximum weight
int minWeight = Integer.MAX_VALUE; // The minimum weight
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
int weightSum = 0;
// 计算最小、最大权重,总的权重和。
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
minWeight = Math.min(minWeight, weight); // Choose the minimum weight
if (weight > 0) {
invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
weightSum += weight;
}
}
// 获得 AtomicPositiveInteger 对象
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
// 获得当前顺序号,并递增 + 1
int currentSequence = sequence.getAndIncrement();
// 权重不相等,顺序根据权重分配
if (maxWeight > 0 && minWeight < maxWeight) {
int mod = currentSequence % weightSum; // 剩余权重
for (int i = 0; i < maxWeight; i++) { // 循环最大权重
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { // 循环 Invoker 集合
final Invoker<T> k = each.getKey();
final IntegerWrapper v = each.getValue();
// 剩余权重归 0 ,当前 Invoker 还有剩余权重,返回该 Invoker 对象
if (mod == 0 && v.getValue() > 0) {
return k;
}
// 若 Invoker 还有权重值,扣除它( value )和剩余权重( mod )。
if (v.getValue() > 0) {
v.decrement();
mod--;
}
}
}
}
// 权重相等,平均顺序获得
// Round robin
return invokers.get(currentSequence % length);
}
private static final class IntegerWrapper {
/**
* 权重值
*/
private int value;
public IntegerWrapper(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
/**
* 扣除一
*/
public void decrement() {
this.value--;
}
}
}
当权重不相等时
mod = 0:满足条件,此时直接返回服务器 A
mod = 1:需要进行一次递减操作才能满足条件,此时返回服务器 B
mod = 2:需要进行两次递减操作才能满足条件,此时返回服务器 C
mod = 3:需要进行三次递减操作才能满足条件,经过递减后,服务器权重为 [1, 4, 0],此时返回服务器 A
mod = 4:需要进行四次递减操作才能满足条件,经过递减后,服务器权重为 [0, 4, 0],此时返回服务器 B
mod = 5:需要进行五次递减操作才能满足条件,经过递减后,服务器权重为 [0, 3, 0],此时返回服务器 B
mod = 6:需要进行六次递减操作才能满足条件,经过递减后,服务器权重为 [0, 2, 0],此时返回服务器 B
mod = 7:需要进行七次递减操作才能满足条件,经过递减后,服务器权重为 [0, 1, 0],此时返回服务器 B
经过8次调用后,我们得到的负载均衡结果为 [A, B, C, A, B, B, B, B],次数比 A:B:C = 2:5:1,等于权重比。当 sequence = 8 时,mod = 0,此时重头再来。从上面的模拟过程可以看出,当 mod >= 3 后,服务器 C 就不会被选中了,因为它的权重被减为0了。当 mod >= 4 后,服务器 A 的权重被减为0,此后 A 就不会再被选中。
这里存在一个问题,轮询次数与mod相关,假如 mod 很大,比如 10000,50000,甚至更大时,doSelect 方法需要进行很多次计算才能将 mod 减为0,在后面的版本中,进行了修复,详情可以看下面链接中关于RoundRobinLoadBalance的分析。Dubbo官网的负载均衡分析
当权重相等时
举个例子:
FROM 飞哥的 《dubbo源码-负载均衡》假定有3台权重都一样的dubbo provider:
- 10.0.0.1:20884, weight=100
- 10.0.0.1:20886, weight=100
- 10.0.0.1:20888, weight=100
轮询算法的实现:
其调用方法某个方法(key)的 sequence 从 0 开始:
- sequence=0时,选择invokers.get(0%3)=10.0.0.1:20884
- sequence=1时,选择invokers.get(1%3)=10.0.0.1:20886
- sequence=2时,选择invokers.get(2%3)=10.0.0.1:20888
- sequence=3时,选择invokers.get(3%3)=10.0.0.1:20884
- sequence=4时,选择invokers.get(4%3)=10.0.0.1:20886
- sequence=5时,选择invokers.get(5%3)=10.0.0.1:20888
2.6 LeastActiveLoadBalance
最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
public class LeastActiveLoadBalance extends AbstractLoadBalance {
public static final String NAME = "leastactive";
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // 总个数
int leastActive = -1; // 最小的活跃数
int leastCount = 0; // 相同最小活跃数的个数
int[] leastIndexes = new int[length]; // 相同最小活跃数的下标
int totalWeight = 0; // 总权重
int firstWeight = 0; // 第一个权重,用于于计算是否相同
boolean sameWeight = true; // 是否所有权重相同
// 计算获得相同最小活跃数的数组和个数
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活跃数
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 权重
if (leastActive == -1 || active < leastActive) { // 发现更小的活跃数,重新开始
leastActive = active; // 记录最小活跃数
leastCount = 1; // 重新统计相同最小活跃数的个数
leastIndexes[0] = i; // 重新记录最小活跃数下标
totalWeight = weight; // 重新累计总权重
firstWeight = weight; // 记录第一个权重
sameWeight = true; // 还原权重相同标识
} else if (active == leastActive) { // 累计相同最小的活跃数
leastIndexes[leastCount++] = i; // 累计相同最小活跃数下标
totalWeight += weight; // 累计总权重
// 判断所有权重是否一样
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}
}
// assert(leastCount > 0)
if (leastCount == 1) {
// 如果只有一个最小则直接返回
return invokers.get(leastIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
// 如果权重不相同且权重大于0则按总权重数随机
int offsetWeight = random.nextInt(totalWeight);
// 并确定随机值落在哪个片断上
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0) {
return invokers.get(leastIndex);
}
}
}
// 如果权重相同或权重为0则均等随机
return invokers.get(leastIndexes[random.nextInt(leastCount)]);
}
}
此负载均衡会先从通过
RpcStatus
的active
属性,来计算每个Invoker
的活跃数,
这个active
就是当前正在调用中的服务的方法的次数,注意这个是并行执行请求数。
初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。
还是举个例子:
FROM 飞哥的 《dubbo源码-负载均衡》最小活跃数算法实现:
假定有3台dubbo provider:
- 10.0.0.1:20884, weight=2,active=2
- 10.0.0.1:20886, weight=3,active=4
- 10.0.0.1:20888, weight=4,active=3
active=2最小,且只有一个2,所以选择10.0.0.1:20884
假定有3台dubbo provider:
- 10.0.0.1:20884, weight=2,active=2
- 10.0.0.1:20886, weight=3,active=2
- 10.0.0.1:20888, weight=4,active=3
active=2最小,且有2个,所以从[10.0.0.1:20884,10.0.0.1:20886 ]中选择;接下来的算法与随机算法类似:
- 假设offset=1(即random.nextInt(5)=1)
1-2=-1<0?是,所以选中 10.0.0.1:20884, weight=2- 假设offset=4(即random.nextInt(5)=4)
4-2=2<0?否,这时候offset=2, 2-3<0?是,所以选中 10.0.0.1:20886, weight=3
2.7 ConsistentHashLoadBalance
一致性 Hash,相同参数的请求总是发到同一提供者。
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
可以类比Redis的一致性哈希算法。
2.7.1ConsistentHashLoadBalance#doSelect
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 基于 invokers 集合,根据对象内存地址来计算定义哈希值
int identityHashCode = System.identityHashCode(invokers);
// 获得 ConsistentHashSelector 对象。若为空,或者定义哈希值变更(说明 invokers 集合发生变化),进行创建新的 ConsistentHashSelector 对象
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
return selector.select(invocation);
}
一致性哈希算法的思想如下:
将节点hash后会不均匀地分布在环上,这样大量key在寻找节点时,会存在key命中各个节点的概率差别较大,无法实现有效的负载均衡。
如有三个节点Node1,Node2,Node3,分布在环上时三个节点挨的很近,落在环上的key寻找节点时,大量key顺时针总是分配给Node2,而其它两个节点被找到的概率都会很小。
上面代码中调用的selector是ConsistentHashLoadBalance
的内部类,一致性哈希选择器,基于 的Ketama
算法实现。
可以参考这篇文章:《Ketama一致性Hash算法(含Java代码)》