Sentinel之Slots插槽源码分析流控规则(五)

2019-01-13  本文已影响0人  橘子_好多灰

一、引子

前面介绍了SystemSlot(系统规则检查)和AuthoritySlot(授权规则检查),下面接着分析FlowSlot。

FlowSlot 会根据预设的规则,结合前面 NodeSelectorSlot、ClusterNodeBuilderSlot、StatistcSlot 统计出来的实时信息进行流量控制。

限流的直接表现是在执行 Entry nodeA = SphU.entry(资源名字) 的时候抛出 FlowException 异常。FlowException 是 BlockException 的子类,您可以捕捉 BlockException 来自定义被限流之后的处理逻辑。

同一个资源可以对应多条限流规则。FlowSlot 会对该资源的所有限流规则依次遍历,直到有规则触发限流或者所有规则遍历完毕。

一条限流规则主要由下面几个因素组成,我们可以组合这些元素来实现不同的限流效果:

resource:资源名,即限流规则的作用对象
count: 限流阈值
grade: 限流阈值类型,QPS 或线程数
strategy: 根据调用关系选择策略:直接、关联、链路
clusterMode:是否集群模式
controlBehavior:流控效果:快速失败、WarmUP、排队等候、WarmUP+排队等候
refResource:关联资源

在dashborad中,可以设置资源的流控规则:如图

流控规则

二 、基于QPS/并发数的流量控制

流量控制主要有两种统计类型,一种是统计线程数,另外一种则是统计 QPS。类型由 FlowRule.grade 字段来定义。其中,0 代表根据线程并发数量来限流,1 代表根据 QPS 来进行流量控制。其中线程数、QPS 值,都是由 StatisticSlot 实时统计获取的。

可以通过下面的命令查看实时统计信息:

curl http://localhost:8719/cnode?id=resourceName

输出内容格式如下:

idx id   thread  pass  blocked   success  total Rt   1m-pass   1m-block   1m-all   exeption
2   abc647 0     46     0           46     46   1       2763      0         2763     0

其中:

thread: 代表当前处理该资源的线程数;
pass: 代表一秒内到来到的请求;
blocked: 代表一秒内被流量控制的请求数量;
success: 代表一秒内成功处理完的请求;
total: 代表到一秒内到来的请求以及被阻止的请求总和;
RT: 代表一秒内该资源的平均响应时间;
1m-pass: 则是一分钟内到来的请求;
1m-block: 则是一分钟内被阻止的请求;
1m-all: 则是一分钟内到来的请求和被阻止的请求的总和;
exception: 则是一秒内业务本身异常的总和。

2.1并发线程数流量控制

线程数限流用于保护业务线程数不被耗尽。例如,当应用所依赖的下游应用由于某种原因导致服务不稳定、响应延迟增加,对于调用者来说,意味着吞吐量下降和更多的线程数占用,极端情况下甚至导致线程池耗尽。
为应对高线程占用的情况,业内有使用隔离的方案,比如通过不同业务逻辑使用不同线程池来隔离业务自身之间的资源争抢(线程池隔离),或者使用信号量来控制同时请求的个数(信号量隔离)。

这种隔离方案虽然能够控制线程数量,但无法控制请求排队时间。当请求过多时排队也是无益的,直接拒绝能够迅速降低系统压力。Sentinel线程数限流不负责创建和管理线程池,而是简单统计当前请求上下文的线程个数,如果超出阈值,新的请求会被立即拒绝。

2.2QPS流量控制

当 QPS 超过某个阈值的时候,则采取措施进行流量控制。流量控制的手段包括下面 4 种,对应 FlowRule 中的 controlBehavior 字段:

冷启动

这种方式主要用于处理间隔性突发的流量,例如消息队列。想象一下这样的场景,在某一秒有大量的请求到来,而接下来的几秒则处于空闲状态,我们希望系统能够在接下来的空闲期间逐渐处理这些请求,而不是在第一秒直接拒绝多余的请求。

三、基于调用关系的流量控制

调用关系包括调用方、被调用方;方法又可能会调用其它方法,形成一个调用链路的层次关系。Sentinel 通过 NodeSelectorSlot 建立不同资源间的调用的关系,通过ClusterBuilderSlot设置每个资源源节点,并且通过 StatisticSlot 记录每个资源的实时统计信息。

3.1 根据调用方限流

RuleConstant.STRATEGY_DIRECT = 0

ContextUtil.enter(resourceName, origin) 方法中的 origin 参数标明了调用方身份。这些信息会在 StatisticSlot 中被统计。

限流规则中的 limitApp 字段用于根据调用方进行流量控制。该字段的值有以下三种选项,分别对应不同的场景:

同一个资源名可以配置多条规则,规则的生效顺序为:{some_origin_name} > other > default

3.2 具有关系的资源流量控制:关联流量控制

RuleConstant.STRATEGY_RELATE = 1

当两个资源之间具有资源争抢或者依赖关系的时候,这两个资源便具有了关联。比如对数据库同一个字段的读操作和写操作存在争抢,读的速度过高会影响写得速度,写的速度过高会影响读的速度。如果放任读写操作争抢资源,则争抢本身带来的开销会降低整体的吞吐量。

可使用关联限流来避免具有关联关系的资源之间过度的争抢,举例来说,read_db 和 write_db 这两个资源分别代表数据库读写,我们可以给 read_db 设置限流规则来达到写优先的目的:设置 FlowRule.strategy 为 RuleConstant.STRATEGY_RELATE 同时设置 FlowRule.refResource 为 write_db。这样当写库操作过于频繁时,读数据的请求会被限流。

3.3 根据调用链路入口限流:链路限流

RuleConstant.STRATEGY_CHAIN = 2

NodeSelectorSlot 中记录了资源之间的调用链路,这些资源通过调用关系,相互之间构成一棵调用树。这棵树的根节点是一个名字为 machine-root 的虚拟节点,调用链的入口都是这个虚节点的子节点。

一棵典型的调用树如下图所示:

                  machine-root
                    /       \
                   /         \
             Entrance1     Entrance2
                /              \
               /                \
      DefaultNode(nodeA)   DefaultNode(nodeA)

上图中来自入口 Entrance1 和 Entrance2 的请求都调用到了资源 NodeA,Sentinel 允许只根据某个入口的统计信息对资源限流。

比如我们可以设置 FlowRule.strategy 为 RuleConstant.STRATEGY_CHAIN,同时设置 FlowRule.refResource 为 Entrance1 来表示只有从入口 Entrance1 的调用才会记录到 NodeA 的限流统计当中,而对来自 Entrance2 的调用漠不关心。

调用链的入口是通过 API 方法 ContextUtil.enter(name) 定义的。

四、源码分析

4.1 FlowSlot

首先看FlowSlot入口类:

  @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        // Flow rule map cannot be null.
        Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();

        List<FlowRule> rules = flowRules.get(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp());
                }
            }
        }
    }

    boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int count, boolean prioritized) {
        return FlowRuleChecker.passCheck(rule, context, node, count, prioritized);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }

大致内容:
1.通过FlowRuleManage获取所有的限流规则
2.获取该资源对应的限流,然后循环通过canPassCheck方法判断,若返回false则说明被限流了。

4.2 FlowRuleChecker类

 static boolean passCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {
            return true;
        }

        if (rule.isClusterMode()) {
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }

        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    }

 private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }

        return rule.getRater().canPass(selectedNode, acquireCount);
    }

1、通过selectNodeByRequesterAndStrategy方法选择被限流的节点。
2、获取的rule的Controller调用具体的限流规则。

 static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
        // 获取限流的limitApp,限流策略(startegy),上线的origin
        String limitApp = rule.getLimitApp();
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();

        //如果limitApp等于origin并且origin不是default和other;
        if (limitApp.equals(origin) && filterOrigin(origin)) {
            //如果策略是STRATEGY_DIRECT(调用方限流)
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Matches limit origin, return origin statistic node.
                return context.getOriginNode();
            }

            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Return the cluster node.
                return node.getClusterNode();
            }

            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
            && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return context.getOriginNode();
            }

            return selectReferenceNode(rule, context, node);
        }

        return null;
    }

    static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
        String refResource = rule.getRefResource();
        int strategy = rule.getStrategy();

        if (StringUtil.isEmpty(refResource)) {
            return null;
        }

        if (strategy == RuleConstant.STRATEGY_RELATE) {
            return ClusterBuilderSlot.getClusterNode(refResource);
        }

        if (strategy == RuleConstant.STRATEGY_CHAIN) {
            if (!refResource.equals(context.getName())) {
                return null;
            }
            return node;
        }
        // No node.
        return null;
    }

1.获取限流的limitApp,限流策略(startegy),上线的origin。
2.如果limitApp等于origin并且origin不是default和other:

如果策略是STRATEGY_DIRECT(调用方限流),则限流节点是originNode;若是限流策略是STRATEGY_RELATE(关联限流),则限流节点是refResource的clusterNode;若是限流策略是STRATEGY_CHAIN(链路限流),并且refResource等于contextName,则限流节点就是node

3.如果limitApp等于default:

如果策略是STRATEGY_DIRECT(调用方限流),则限流节点是clusterNode;若是限流策略是STRATEGY_RELATE(关联限流),则限流节点是refResource的clusterNode;若是限流策略是STRATEGY_CHAIN(链路限流),并且refResource等于contextName,则限流节点就是node

4.如果limitApp等于other并且该资源的其他限流limitApp不与origin相同:

如果策略是STRATEGY_DIRECT(调用方限流),则限流节点是originNode;若是限流策略是STRATEGY_RELATE(关联限流),则限流节点是refResource的clusterNode;若是限流策略是STRATEGY_CHAIN(链路限流),并且refResource等于contextName,则限流节点就是node

4.3 流控规则

由rule.getRater()获取具体的流控规则,目前有四种流控规则;直接失败、WarmUP、排队等候、WarmUP+排队等候。

流控规则是在FlowRuleUtil类中设置的,根据具体的ControlBefavior进行设置,如下代码:


    private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            switch (rule.getControlBehavior()) {
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                    return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                        ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                    return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                    return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                        rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
                default:
                    // Default mode or unknown mode: default traffic shaping controller (fast-reject).
            }
        }
        return new DefaultController(rule.getCount(), rule.getGrade());
    }

可以发现对应关系如下:

流控类型 Controller
快速失败 DefaultController
WarmUp WarmUpController
匀速排队 RateLimiterController
WarmUp + 匀速排队 WarmUpRateLimiterController
快速失败
快速失败

1.通过avgUsedTokens方法先获取当前的请求线程数或者qps,然后加上当前请求的个数acquireCount,如果大于count则说明超过了限流控制的阈值,则返回false。

Warp Up

Sentinel的WarmUp是基于Guava的算法,但是不像Guava的场景,这是基于一个漏桶,主要使用基于时间间隔,
Sentinel更专注于控制计数每秒的请求而没有计算它的间隔。

Sentinel的WarmUp算法实现基于基于Guava的算法。然而,Guava的实现重点调整请求的时间间隔,换句话说,一个漏水的水桶。哨兵更多关注控制计数每秒的请求没有计算它的间隔,它更像是一个“令牌桶。

剩下的令牌桶是用来测量系统效用。假设一个系统可以处理b每秒的请求。每秒钟b标记将被添加到桶,直到桶满了。系统处理一个请求时,它需要一个令牌桶。剩有令牌桶,降低系统的利用率;令牌桶中的令牌时超过一定阈值,我们称之为“饱和”状态。

基于Guava的理论,这是一个线性方程我们可以写这个形式y = m x + y;(b.k.y(x))或每秒(q)),我们预计每秒给定一个饱和期(eg:3分钟),m是变化的速度从我们冷(最小)率稳定(最大),x(或q)是被占领的令牌。

下面通过数学知识理解:

           ^ throttling
           |
  3*stable +                  /
  interval |                 /.
   (cold)  |                / .
           |               /  .   <-- "warmup period" is the area of the trapezoid between
  2*stable +              /   .       warningToken and maxToken(预热区为这个梯形区域)
  interval |             /    .
           |            /     .
           |           /   B  .
    stable +----------/  WARM . }
  interval |          .   UP  . } <-- 这块矩形 (宽从0至maxPermits, 高为stableInterval
           |          . PERIOD. }     定义为冷却区域,同时我们希望冷却区==预热区
           |    A      .      . }     cooldownPeriod == warmupPeriod
           |---------------------------------> storedPermits
              (warningToken) (maxToken)

现在我们看WarmUpController构造方法:

  public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
        construct(count, warmUpPeriodInSec, coldFactor);
    }

    public WarmUpController(double count, int warmUpPeriodInSec) {
        construct(count, warmUpPeriodInSec, 3);
    }

    private void construct(double count, int warmUpPeriodInSec, int coldFactor) {

        if (coldFactor <= 1) {
            throw new IllegalArgumentException("Cold factor should be larger than 1");
        }

        this.count = count;

        //冷冻因子,默认为3
        this.coldFactor = coldFactor;

        // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
        // warningToken = 100;
        warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
        // / maxPermits = thresholdPermits + 2 * warmupPeriod /
        // (stableInterval + coldInterval)
        // maxToken = 200
        maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));

        // slope
        // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
        // - thresholdPermits);
        slope = (coldFactor - 1.0) / count / (maxToken - warningToken);

    }
  1. warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
    warningToken为告警令牌数,warmUpPeriodInSec及时B区域的面积,它是A的面积的coldFactor-1倍;stableInterval为1/count;
  2. maxToken是根据梯形的面积公式计算出来的;
  3. slope是根据斜率计算公式计算的
 @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long passQps = node.passQps();

        long previousQps = node.previousPassQps();
        syncToken(previousQps);

        // 开始计算它的斜率
        // 如果进入了警戒线,开始调整他的qps
        long restToken = storedTokens.get();
        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;
            // 消耗的速度要比warning快,但是要比慢
            // current interval = restToken*slope+1/count
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        } else {
            if (passQps + acquireCount <= count) {
                return true;
            }
        }

        return false;
    }
  1. 如果restToken进入了警戒线,开始调整他的qps,根据斜率计算出warningQps;若passQps + acquireCount小于warningQps则请求通过。
  2. 如果没有进入警戒线,若passQps + acquireCount <= count则请求通过。
  3. 若1和2不满足,则请求不通过。
匀速排队
 @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long currentTime = TimeUtil.currentTimeMillis();
        // Calculate the interval between every two requests.
        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

        // Expected pass time of this request.
        long expectedTime = costTime + latestPassedTime.get();

        if (expectedTime <= currentTime) {
            // Contention may exist here, but it's okay.
            latestPassedTime.set(currentTime);
            return true;
        } else {
            // Calculate the time to wait.
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
            if (waitTime >= maxQueueingTimeMs) {
                return false;
            } else {
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime >= maxQueueingTimeMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    Thread.sleep(waitTime);
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }

1.根据qps计算两次请求的时间间隔并且获取当前请求的期待时间。
2.如果期待时间小于当前时间,则请求通过;否则先获取waitTime,若waitTime大于maxQueueingTimeMs队列排队时间,则请求阻止。
3.通过latestPassedTime.addAndGet(costTime)加上costTime;若此时waitTime还大于maxQueueingTimeMs队列排队时间,latestPassedTime时间恢复加costTime之前的值,并请求阻止;否则线程睡眠waitTime时间,并请求通过。

WarmUp + 匀速排队

这种选择就是WarmUp与匀速排队组合,具体可见源码。

五、我的总结

1、介绍了Sentinel的限流规则以及限流原理。
2、FlowSlot是整个插槽链中最复杂的一块,主要根据了前面 NodeSelectorSlot、ClusterNodeBuilderSlot、StatistcSlot 统计出来的实时信息进行流量控制。
3、阈值类型有两种(限流阈值类型,QPS 或线程数),流控模式有三种(直接、关联、链路),流控效果有四种(快速失败、WarmUP、排队等候、WarmUP+排队等候)。
4、WarmUP限流是根据Guava的令牌桶算法演变而来的。


以上内容,若有不当之处,请指正

上一篇下一篇

猜你喜欢

热点阅读