soul网关学习16-插件实现1-Divide2-负载均衡算法

2021-01-30  本文已影响0人  niuxin

接着上篇的文章来。

负载均衡算法

DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
    public static DivideUpstream selector(final List<DivideUpstream> upstreamList, final String algorithm, final String ip) {
        LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getJoin(algorithm);
        return loadBalance.select(upstreamList, ip);
    }

SPI扩展点机制

    public static <T> ExtensionLoader<T> getExtensionLoader(final Class<T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("extension clazz is null");
        }
        if (!clazz.isInterface()) {
            throw new IllegalArgumentException("extension clazz (" + clazz + ") is not interface!");
        }
        if (!clazz.isAnnotationPresent(SPI.class)) {
            throw new IllegalArgumentException("extension clazz (" + clazz + ") without @" + SPI.class + " Annotation");
        }
        ExtensionLoader<T> extensionLoader = (ExtensionLoader<T>) LOADERS.get(clazz);
        if (extensionLoader != null) {
            return extensionLoader;
        }
        LOADERS.putIfAbsent(clazz, new ExtensionLoader<>(clazz));
        return (ExtensionLoader<T>) LOADERS.get(clazz);
    }

负载均衡算法实现

random-加权随机

  1. 看代码实际上是一个加权的随机算法
    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
        // 计算所有后端节点的权重值
        int totalWeight = calculateTotalWeight(upstreamList);
        // 判断是否所有的节点为相同权重的情况
        boolean sameWeight = isAllUpStreamSameWeight(upstreamList);
        if (totalWeight > 0 && !sameWeight) {
            // 不是,则要加权随机
            return random(totalWeight, upstreamList);
        }
        // If the weights are the same or the weights are 0 then random
        // 如果各节点权重都相同,则按照总的节点数N,生成一个1-N之间的随机数X,然后选取出该X节点
        return random(upstreamList);
    }
  1. 加权随机的实现稍稍复杂点,看下源码
    private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {
        // If the weights are not the same and the weights are greater than 0, then random by the total number of weights
        // 先拿到随机数为区间,为1-总权重值N之间
        int offset = RANDOM.nextInt(totalWeight);
        // Determine which segment the random value falls on
        // 遍历所有的节点,每次循环都用区间=区间-当前权重值,然后再看区间是否小于0;
        // 若小于0,则证明刚好位于当前节点的区间,返回当前节点;若没有,则继续循环
        for (DivideUpstream divideUpstream : upstreamList) {
            offset -= getWeight(divideUpstream);
            if (offset < 0) {
                return divideUpstream;
            }
        }
        // 最后返回第一个兜底
        return upstreamList.get(0);
    }

roundRobin-加权轮询

  1. 加权轮询的算法会有些难懂,我们先看下关键方法doSelect
    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
        //TODO question 如果这批upstream第一个节点经常变更,可能会导致前面节点生成的WeightedRoundRobin数据无法释放
        String key = upstreamList.get(0).getUpstreamUrl();
        // 取出此批后端服务的权重对象 key -> 后端server,为ip:port   value -> 后端server对应的权重对象
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map == null) {
            methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
            map = methodWeightMap.get(key);
        }
        // 总权重值
        int totalWeight = 0;
        // 当前请求权重的最大值,每次请求都会置为MIN_VALUE
        long maxCurrent = Long.MIN_VALUE;
        // 当前系统时间
        long now = System.currentTimeMillis();
        // 选中节点
        DivideUpstream selectedInvoker = null;
        // 选中节点的权重
        WeightedRoundRobin selectedWRR = null;
        //TODO question 轮询这里为什么没有break,应该找到选中节点后就跳出循环,不需要继续循环下去
        for (DivideUpstream upstream : upstreamList) {
            // rkey -> 后端server,ip:port
            String rKey = upstream.getUpstreamUrl();
            // 后端server对应的权重对象
            WeightedRoundRobin weightedRoundRobin = map.get(rKey);
            // 当前后端节点的权重值
            int weight = getWeight(upstream);
            // 如果当前后端server权重对象不存在,则需生成当前后端server的权重对象,并添加到内存
            if (weightedRoundRobin == null) {
                weightedRoundRobin = new WeightedRoundRobin();
                weightedRoundRobin.setWeight(weight);
                map.putIfAbsent(rKey, weightedRoundRobin);
            }
            // 节点权重值发生了变化,则需要重新设置
            if (weight != weightedRoundRobin.getWeight()) {
                //weight changed
                // 权重值发生变化后,将内存中后端server权重更新为最新权重值
                // 同时其内部current值也将更新为0,回到初始状态
                weightedRoundRobin.setWeight(weight);
            }
            // 内部序列从0开始增,每次进来都+自己的权重值
            long cur = weightedRoundRobin.increaseCurrent();
            // 重新设置更新时间
            weightedRoundRobin.setLastUpdate(now);
            // 如果节点当前轮询权重大于此次调用的最大权重值,则满足项,可以作为选中节点
            // 这里选中节点后,并不会停止循环,会遍历所有的节点,将最后一个满足条件的节点作为选中节点
            // 这里是为了找到当前权重最大的那个后端server,将其作为选中节点
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedInvoker = upstream;
                selectedWRR = weightedRoundRobin;
            }
            // 总权重
            totalWeight += weight;
        }
        // 更新标志锁位未更新  后端节点数不等于原有节点数  采用compareAndSet取锁,获取到锁之后才更新
        if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {
            try {
                // copy -> modify -> update reference
                // 新的map
                ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
                // 如果从轮询开始,到结束,中间间隔了1min钟,则需要移除该后端server的权重对象数据
                // 这里是为了移除掉那些下线的节点,因为只要有被轮询,其LastUpdate=now;而超过1min钟,都没有轮询到的,则就是已下线的节点
                newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
                // 将新的后端节点的权重数据替换旧有的
                methodWeightMap.put(key, newMap);
            } finally {
                // 最终将锁的标志为设置为false
                updateLock.set(false);
            }
        }
        // 选中节点的处理
        if (selectedInvoker != null) {
            // 选中节点的内部计数的当前权重current要减去总的权重值,降低其当前权重,也就意味着其下次被选中的优先级被降低
            selectedWRR.sel(totalWeight);
            return selectedInvoker;
        }
        // should not happen here
        return upstreamList.get(0);
    }
  1. 因不懂其思想,在debug源码过程中,将关键变量变化的过程用表格记录了下来,方便跟踪逻辑
    RoundRobinComputeProcess
  2. 从上图可以看出:当权重为50:50时,4次调用其最终选取出来的结果为1:1;当权重为10:20时,6次调用其最终选中出来的结果为1:2,前3次调用最终选取出来的结果也为1:2。
  3. //TODO很是神奇,不知晓其中原理
  4. 实现逻辑看起来比较复杂,简单说来就是 对于每个 DivideUpstream 都维护一个权重对象,每次遍历所有权重对象获取到权重最大的那个 DivideUpstream,同时减去被选中的 DivideUpstream 的权重(减去总权重,优先级降为最低)

  5. 参考:【高性能网关soul学习】11. 插件之DevidePlugin

hash-IP哈希

    public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
        // 并发跳表map key->后端server的hash值 value->后端节点
        final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();
        for (DivideUpstream address : upstreamList) {
            // 每个实际后端节点会生成5个虚拟节点,用于更大范围映射,类似于一致性hash
            for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
                // 后端server的hash
                long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-" + i);
                treeMap.put(addressHash, address);
            }
        }
        // 远程客户端ip的hash
        long hash = hash(String.valueOf(ip));
        // 如果 请求地址的Hash值 右边存在节点,返回右边第一个
        // treeMap.tailMap(hash) 会返回所有(key >= hash)的映射集合,同时是有序的。
        SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);
        // 如果找到了,则从找到的entry拿出后端节点
        if (!lastRing.isEmpty()) {
            // 这里也就是取 url hash 值右边的第一个节点
            return lastRing.get(lastRing.firstKey());
        }
        // 没找到,则直接去第一个节点兜底
        return treeMap.firstEntry().getValue();
    }

总结

  1. Divide插件这块的内容还挺多的,后面还需要将负载均衡这块的逻辑重点看一看,搞清楚roundRobinhash算法的实现,同时也去看看其他框架中这两种算法的实现
  2. 已分析Divide插件中重点块:后端节点的探活机制负载均衡算法
  3. 后续还有http服务的请求转发websocket服务的支持
  4. 关于负载均衡分析的好文章

To be continued...

上一篇 下一篇

猜你喜欢

热点阅读