[Soul 源码之旅] 1.11 Soul的负载均衡
我们上一节介绍 Soul 的 SPI 机制是就解释过,Soul 通过 SPI 机制进行加载负载均衡策略,这一节我们分别来介绍一下三种负载均衡机制。
1.11.1 准备
为了测试负载均衡机制,我们需要启动多个客户端进行测试,使用 idea 的同学可以通过配置 run 设置如下

同时我们还需要修改如下配置端口:

Soul 中负载均衡的策略都实现了 LoadBlance 接口,具体如下:

我们使用 Divide 插件进行讲述:
1.11.2 AbstractLoadBalance
在 AbstractLoadBalance 主要的算法就是 select ,这里假如 upstream 为空,则返回 null ,假如只有一个就直接返回,假如有多个就调用 doSelect ,这里是不是和插件的 execute 和 doExecute 很相似。
@Override
public DivideUpstream select(final List<DivideUpstream> upstreamList, final String ip) {
if (CollectionUtils.isEmpty(upstreamList)) {
return null;
}
if (upstreamList.size() == 1) {
return upstreamList.get(0);
}
return doSelect(upstreamList, ip);
}
这里还有一个很重要的方法就是获取权重:这里并不是简单的返回权重,而是根据启动时间,预热时间,和设置的权重计算出真正的权重 主要公式是 :((float) uptime / ((float) warmup / (float) weight)); 它主要是放置刚启动的服务负载过大,需要一个预热过程,而且这个分支是只有启动时间小于预热时间才会进入,但是由于 soul 目前上传的 timestamp 一直是 0,这个过程不起作用,所以真正没有使用到。(这里实现和 dubbo 很像)
protected int getWeight(final DivideUpstream upstream) {
if (!upstream.isStatus()) {
return 0;
}
// 获取权重
int weight = getWeight(upstream.getTimestamp(), getWarmup(upstream.getWarmup(), Constants.DEFAULT_WARMUP), upstream.getWeight());
return weight;
}
private int getWeight(final long timestamp, final int warmup, final int weight) {
if (weight > 0 && timestamp > 0) {
int uptime = (int) (System.currentTimeMillis() - timestamp);
if (uptime > 0 && uptime < warmup) {
return calculateWarmupWeight(uptime, warmup, weight);
}
}
return weight;
}
private int getWarmup(final int warmup, final int defaultWarmup) {
if (warmup > 0) {
return warmup;
}
return defaultWarmup;
}
private int calculateWarmupWeight(final int uptime, final int warmup, final int weight) {
// 运行时间 / 预热时间 防止刚启动 负载就大量增加
//换算成数据公式: weight(设置的权重) * uptime(运行时间)/warmup(预热时间)
//小计:预热时间越长,重新计算的权重就越接近设置的权重
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
1.11.3 RandomLoadBalance
我们先看它的 doSelect 方法,这里主要的逻辑是假如所有的 upstream 的权重都是一样的,那么就可以直接通过随机数选择其中一个,假如不是则根据权重进行随机选择,我们侧重看一下 random(totalWeight, upstreamList);
@Override
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
int totalWeight = calculateTotalWeight(upstreamList);
boolean sameWeight = isAllUpStreamSameWeight(upstreamList);
if (totalWeight > 0 && !sameWeight) {
return random(totalWeight, upstreamList);
}
// If the weights are the same or the weights are 0 then random
return random(upstreamList);
}
这里引入了一个 offset 的概念,即落入某个区间,offset 每减一次权重,就是减去 totalweight 的权重中上一个区间的长度,那么假如 offset 小于0 这代表随机数落在这个区间上,不是则看下一个区间。
private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {
// If the weights are not the same and the weights are greater than 0, then random by the total number of weights
int offset = RANDOM.nextInt(totalWeight);
// Determine which segment the random value falls on
for (DivideUpstream divideUpstream : upstreamList) {
offset -= getWeight(divideUpstream);
if (offset < 0) {
return divideUpstream;
}
}
return upstreamList.get(0);
}
1.11.4 HashLoadBalance
我们还是从 doSelect 方法出发,soul 会为所有的 upstream 根据下面的公式 "SOUL-" + address.getUpstreamUrl() + "-HASH-" + i 得到一个String 然后生产对应的 hash 值,这里每个 upstream 会生成 5个,然后放置在 ConcurrentSkipListMap 中,然后根据 ip 生成对应的 hash ,然后从 ConcurrentSkipListMap 找到第一个大于 这个ip 对应的 hash 值,假如没有则返回第一个,这个功能可以在多集群中实现会话保持的能力,因为每个ip 它对应的服务器每次都是确定的。
@Override
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();
for (DivideUpstream address : upstreamList) {
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-" + i);
treeMap.put(addressHash, address);
}
}
long hash = hash(String.valueOf(ip));
SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return treeMap.firstEntry().getValue();
}
RoundRobinLoadBalance
RoundRobinLoadBalance 是这三种算法中实现最复杂的,我们先来看一下它的 doSelect 方法:
@Override
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
String key = upstreamList.get(0).getUpstreamUrl();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
map = methodWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
DivideUpstream selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (DivideUpstream upstream : upstreamList) {
String rKey = upstream.getUpstreamUrl();
WeightedRoundRobin weightedRoundRobin = map.get(rKey);
int weight = getWeight(upstream);
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(rKey, weightedRoundRobin);
}
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = upstream;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return upstreamList.get(0);
}
这里引入了一个数据结构 WeightedRoundRobin ,它主要有三个属性 weight 该upstream 对应的权重,current 当前的权重,lastUpdate 最后更新时间。我们结合实例来解析该原理,如下是两个 upstream weight 为 50% 的轮询过程。这里先使用 for 循环遍历所有的 upstream ,然后 current值设置为 upstream 对应的 WeightedRoundRobin 加上它本身的权重,如下第一次从 0 到 50 ,然后从中选出最大的 current 值。当选中为这才调用的 upstream ,那么它的 current 会被减少 totaolWeight ,即就是途中 50 - 100 为 -50 ,然后下一次轮询又加上当前的权重50 就变成了0, 而另外一个 就是 50 + 50 变成了 100 。



1.11.5 总结
如上三种也是我们常用的三种轮询算法,我们从中可以看出 dubbo 的影子,三种轮询算法实现 包括 wramup 机制实现基本一致。