[Soul 源码之旅] 1.11 Soul的负载均衡

2021-02-01  本文已影响0人  AndyWei123

我们上一节介绍 Soul 的 SPI 机制是就解释过,Soul 通过 SPI 机制进行加载负载均衡策略,这一节我们分别来介绍一下三种负载均衡机制。

1.11.1 准备

为了测试负载均衡机制,我们需要启动多个客户端进行测试,使用 idea 的同学可以通过配置 run 设置如下


parallel run

同时我们还需要修改如下配置端口:


port
Soul 中负载均衡的策略都实现了 LoadBlance 接口,具体如下:
image.png

我们使用 Divide 插件进行讲述:

1.11.2 AbstractLoadBalance

在 AbstractLoadBalance 主要的算法就是 select ,这里假如 upstream 为空,则返回 null ,假如只有一个就直接返回,假如有多个就调用 doSelect ,这里是不是和插件的 execute 和 doExecute 很相似。

    @Override
    public DivideUpstream select(final List<DivideUpstream> upstreamList, final String ip) {
        if (CollectionUtils.isEmpty(upstreamList)) {
            return null;
        }
        if (upstreamList.size() == 1) {
            return upstreamList.get(0);
        }
        return doSelect(upstreamList, ip);
    }

这里还有一个很重要的方法就是获取权重:这里并不是简单的返回权重,而是根据启动时间,预热时间,和设置的权重计算出真正的权重 主要公式是 :((float) uptime / ((float) warmup / (float) weight)); 它主要是放置刚启动的服务负载过大,需要一个预热过程,而且这个分支是只有启动时间小于预热时间才会进入,但是由于 soul 目前上传的 timestamp 一直是 0,这个过程不起作用,所以真正没有使用到。(这里实现和 dubbo 很像)

    protected int getWeight(final DivideUpstream upstream) {
        if (!upstream.isStatus()) {
            return 0;
        }
        // 获取权重
        int weight = getWeight(upstream.getTimestamp(), getWarmup(upstream.getWarmup(), Constants.DEFAULT_WARMUP), upstream.getWeight());
        return weight;
    }

    private int getWeight(final long timestamp, final int warmup, final int weight) {
        if (weight > 0 && timestamp > 0) {
            int uptime = (int) (System.currentTimeMillis() - timestamp);
            if (uptime > 0 && uptime < warmup) {
                return calculateWarmupWeight(uptime, warmup, weight);
            }
        }
        return weight;
    }

    private int getWarmup(final int warmup, final int defaultWarmup) {
        if (warmup > 0) {
            return warmup;
        }
        return defaultWarmup;
    }

    private int calculateWarmupWeight(final int uptime, final int warmup, final int weight) {
        // 运行时间 / 预热时间  防止刚启动 负载就大量增加
        //换算成数据公式: weight(设置的权重) * uptime(运行时间)/warmup(预热时间)
        //小计:预热时间越长,重新计算的权重就越接近设置的权重
        int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
        return ww < 1 ? 1 : (ww > weight ? weight : ww);
    }

1.11.3 RandomLoadBalance

我们先看它的 doSelect 方法,这里主要的逻辑是假如所有的 upstream 的权重都是一样的,那么就可以直接通过随机数选择其中一个,假如不是则根据权重进行随机选择,我们侧重看一下 random(totalWeight, upstreamList);

    @Override
    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
        int totalWeight = calculateTotalWeight(upstreamList);
        boolean sameWeight = isAllUpStreamSameWeight(upstreamList);
        if (totalWeight > 0 && !sameWeight) {
            return random(totalWeight, upstreamList);
        }
        // If the weights are the same or the weights are 0 then random
        return random(upstreamList);
    }

这里引入了一个 offset 的概念,即落入某个区间,offset 每减一次权重,就是减去 totalweight 的权重中上一个区间的长度,那么假如 offset 小于0 这代表随机数落在这个区间上,不是则看下一个区间。

    private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {
        // If the weights are not the same and the weights are greater than 0, then random by the total number of weights
        int offset = RANDOM.nextInt(totalWeight);
        // Determine which segment the random value falls on
        for (DivideUpstream divideUpstream : upstreamList) {
            offset -= getWeight(divideUpstream);
            if (offset < 0) {
                return divideUpstream;
            }
        }
        return upstreamList.get(0);
    }

1.11.4 HashLoadBalance

我们还是从 doSelect 方法出发,soul 会为所有的 upstream 根据下面的公式 "SOUL-" + address.getUpstreamUrl() + "-HASH-" + i 得到一个String 然后生产对应的 hash 值,这里每个 upstream 会生成 5个,然后放置在 ConcurrentSkipListMap 中,然后根据 ip 生成对应的 hash ,然后从 ConcurrentSkipListMap 找到第一个大于 这个ip 对应的 hash 值,假如没有则返回第一个,这个功能可以在多集群中实现会话保持的能力,因为每个ip 它对应的服务器每次都是确定的。


    @Override
    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
        final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();
        for (DivideUpstream address : upstreamList) {
            for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
                long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-" + i);
                treeMap.put(addressHash, address);
            }
        }
        long hash = hash(String.valueOf(ip));
        SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);
        if (!lastRing.isEmpty()) {
            return lastRing.get(lastRing.firstKey());
        }
        return treeMap.firstEntry().getValue();
    }

RoundRobinLoadBalance

RoundRobinLoadBalance 是这三种算法中实现最复杂的,我们先来看一下它的 doSelect 方法:

    @Override
    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
        String key = upstreamList.get(0).getUpstreamUrl();
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map == null) {
            methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
            map = methodWeightMap.get(key);
        }
        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        long now = System.currentTimeMillis();
        DivideUpstream selectedInvoker = null;
        WeightedRoundRobin selectedWRR = null;
        for (DivideUpstream upstream : upstreamList) {
            String rKey = upstream.getUpstreamUrl();
            WeightedRoundRobin weightedRoundRobin = map.get(rKey);
            int weight = getWeight(upstream);
            if (weightedRoundRobin == null) {
                weightedRoundRobin = new WeightedRoundRobin();
                weightedRoundRobin.setWeight(weight);
                map.putIfAbsent(rKey, weightedRoundRobin);
            }
            if (weight != weightedRoundRobin.getWeight()) {
                //weight changed
                weightedRoundRobin.setWeight(weight);
            }
            long cur = weightedRoundRobin.increaseCurrent();
            weightedRoundRobin.setLastUpdate(now);
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedInvoker = upstream;
                selectedWRR = weightedRoundRobin;
            }
            totalWeight += weight;
        }
        if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {
            try {
                // copy -> modify -> update reference
                ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
                newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
                methodWeightMap.put(key, newMap);
            } finally {
                updateLock.set(false);
            }
        }
        if (selectedInvoker != null) {
            selectedWRR.sel(totalWeight);
            return selectedInvoker;
        }
        // should not happen here
        return upstreamList.get(0);
    }

这里引入了一个数据结构 WeightedRoundRobin ,它主要有三个属性 weight 该upstream 对应的权重,current 当前的权重,lastUpdate 最后更新时间。我们结合实例来解析该原理,如下是两个 upstream weight 为 50% 的轮询过程。这里先使用 for 循环遍历所有的 upstream ,然后 current值设置为 upstream 对应的 WeightedRoundRobin 加上它本身的权重,如下第一次从 0 到 50 ,然后从中选出最大的 current 值。当选中为这才调用的 upstream ,那么它的 current 会被减少 totaolWeight ,即就是途中 50 - 100 为 -50 ,然后下一次轮询又加上当前的权重50 就变成了0, 而另外一个 就是 50 + 50 变成了 100 。


第一次
第二次
第三次

1.11.5 总结

如上三种也是我们常用的三种轮询算法,我们从中可以看出 dubbo 的影子,三种轮询算法实现 包括 wramup 机制实现基本一致。

上一篇下一篇

猜你喜欢

热点阅读