Sentinel之FlowSlot限流分析
FlowSlot主要是进行限流工作的。
结合从以前的插槽(NodeSelectorSlot,ClusterNodeBuilderSlot和StatisticSlot)收集的运行时统计信息,FlowSlot将使用预设规则来决定是否应阻止传入请求。
如果触发了定义好的任何规则,SphU.entry(resourceName)
会抛出FlowException
异常。
一种资源可以有多个流规则。FlowSlot遍历这些规则,直到触发其中一个规则或遍历所有规则。每个 FlowRule
主要由以下因素组成:grade, strategy,path。 我们可以结合这些因素来达到不同的效果。
grade由FlowRule
中的 grade
字段定义,此处,0用于线程隔离,而1用于请求计数整形(QPS)。线程计数和请求计数都是在实时运行时收集的。
此阶段通常用于保护资源不被占用。 如果资源需要很长时间才能完成,线程将开始占用。 响应时间越长,占用的线程越多。
除了计数器之外,thread pool
或semaphore
也可以用于实现此目的。
thread pool
:分配线程池来处理这些资源。 当池中没有更多空闲线程时,该请求将被拒绝而不会影响其他资源。
semaphore
:使用信号量控制此资源中线程的并发计数。
使用线程池的好处是,它可以在超时时正常退出。 但这也给我们带来了上下文切换和附加线程的成本。 如果传入的请求已经在单独的线程(例如Servlet HTTP请求)中提供服务,那么在使用线程池的情况下,它将几乎使线程计数加倍。
当QPS超过阈值时,Sentinel将采取措施控制传入请求,并由流规则中的controlBehavior
字段进行配置。
RuleConstant.CONTROL_BEHAVIOR_DEFAULT
:立即拒绝。这是默认行为,超出的请求将立即被拒绝,并抛出FlowException。
RuleConstant.CONTROL_BEHAVIOR_WARM_UP
:Warmup。 如果系统的负载在一段时间内很低,但是此时有大量的请求过来,系统可能无法一次处理这些所有请求。但是,如果我们稳定增加传入的请求,则系统可以预热并最终能够处理所有请求。 可以通过在流规则中设置字段 warmUpPeriodSec
来配置此预热时间。
RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER
:统一速率限制Uniform Rate Limiting
。此策略严格控制请求之间的间隔。换句话说,它允许请求以稳定,统一的速率通过。此策略是漏斗(leaky bucket)的实现。
它用于以稳定的速率处理请求,通常用于突发流量(例如消息处理)。 当超过系统容量的大量请求同时到达时,使用此策略的系统将处理请求及其固定速率,直到所有请求都已处理或超时为止。
以上是从官方源码中的注释文档翻译的,大家可以直接下载源码阅读,官方的注释挺好的!
下面先从限流规则的配置讲解。
FlowRule
public static void main(String[] args) {
// 配置规则.
initFlowRules();
while (true) {
// 1.5.0 版本开始可以直接利用 try-with-resources 特性
try (Entry entry = SphU.entry("HelloWorld")) {
// 被保护的逻辑
System.out.println("hello world");
} catch (BlockException ex) {
// 处理被流控的逻辑
System.out.println("blocked!");
}
}
}
private static void initFlowRules(){
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("HelloWorld");
//根据请求数
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 20.
rule.setCount(20);
rules.add(rule);
FlowRuleManager.loadRules(rules);
}
从上面我们可以知道,要进行流量控制,我们要自己设定一些流量规则。
我们可以根据请求数和线程数进行统计,这里使用了请求数Qps,定义了资源HelloWorld
每秒最多只能通过20个请求。
```java
//流量控制的阈值类型
private int grade = RuleConstant.FLOW_GRADE_QPS;
//流量控制的阈值数
private double count;
//基于调用链的流量控制策略
private int strategy = RuleConstant.STRATEGY_DIRECT;
//关联资源或入口资源,当流控模式为关联或者链路时配置的关联资源或者入口资源
private String refResource;
//流量控制后的采取行为
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
//预热时间,如果controlBehavior设置为预热,可以配置其预热时间,默认10s
private int warmUpPeriodSec = 10;
//最大超时时间,如果controlBehavior设置为排队等待时,等待的最大超时时间,默认是500ms
private int maxQueueingTimeMs = 500;
//集群扩容相关配置
private ClusterFlowConfig clusterConfig;
```java
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
// Flow rule map should not be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
上述逻辑主要是通过限流规则提供器获取与该资源相关的流控规则列表。
然后遍历流控规则列表,通过调用canPassCheck方法来判断是否满足该规则设置的条件,如果满足流控条件,则抛出FlowException,则只需满足一个就结束校验。
canPassCheck进行校验
public FlowRule() {
super();
setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
public FlowRule(String resourceName) {
super();
setResource(resourceName);
setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
//如果limitApp为null,就不进行验证,在构建FlowRule时,默认limitApp是default,即允许所有来源的请求进行检查
if (limitApp == null) {
return true;
}
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
如果是集群限流模式,则调用passClusterCheck,非集群限流模式则调用passLocalCheck方法。
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
passLocalCheck 单节点限流
在这段逻辑中,有三个重要的字段变量。
limitApp
:该条限流规则针对的调用方。
strategy
:该条限流规则的流控规则。
origin
:本次请求的调用方,从当前上下文环境中获取。
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
String limitApp = rule.getLimitApp();
int strategy = rule.getStrategy();
String origin = context.getOrigin();
//如果限流规则配置的针对的调用方与当前请求的调用方来源相同,并且调用方不是default、other
if (limitApp.equals(origin) && filterOrigin(origin)) {
// 如果基于调用关系的流量控制是根据调用方限流
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
//如果基于关系关系的流量控制是关联,则从集群环境中获取对应关联资源所代表的Node
//如果是根据调用链的,则判断当前调用上下文的名字与规则配置的是否相同,如果是则返回DefaultNode
return selectReferenceNode(rule, context, node);
//如果该流量控制规则针对的调用方配置为default,则对所有的调用源都有效
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
//直接获取本次调用上下文当前节点的ClusterNode
return node.getClusterNode();
}
return selectReferenceNode(rule, context, node);
//如果流量控制针对的调用方为other,则要判断当前资源受到其他限流规则的限制,则不执行,简单的说处理给定的这些调用方,剩余的调用方都会进行流量控制
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
//如果流量控制模式是直接调用,则从Context中获取源调用方所代表的Node
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
return null;
}
private static boolean filterOrigin(String origin) {
// Origin cannot be `default` or `other`.
return !RuleConstant.LIMIT_APP_DEFAULT.equals(origin) && !RuleConstant.LIMIT_APP_OTHER.equals(origin);
}
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
String refResource = rule.getRefResource();
int strategy = rule.getStrategy();
if (StringUtil.isEmpty(refResource)) {
return null;
}
if (strategy == RuleConstant.STRATEGY_RELATE) {
return ClusterBuilderSlot.getClusterNode(refResource);
}
if (strategy == RuleConstant.STRATEGY_CHAIN) {
if (!refResource.equals(context.getName())) {
return null;
}
return node;
}
// No node.
return null;
}
TrafficShapingController canPass
从上述步骤中可以获取一个统计实时数据的Node,接下来就是根据数据与流量控制规则进行判断,是否通过。
![](https://img.haomeiwen.com/i3130295/05595ab129b64da0.png)
该篇中先介绍DefaultController
的原理,实现了直接拒绝
的策略。
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//当前已经消耗的令牌数量,即当前时间窗口内已创建的线程数量或者已通过的请求个数
int curCount = avgUsedTokens(node);
//如果已消耗的令牌数加上当前请求所要消耗的请求的和小于等于阈值,则直接返回true,表示通过。
//如果超过阈值,则需要执行里面的逻辑
if (curCount + acquireCount > count) {
//如果prioritized为true,即存在优先级,并且是基于QPS进行限流,则进行里面的逻辑,否则,直接返回false
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
//获取当前线程的等待时间
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
//将需要的令牌数添加到borrowArray中未来一个时间窗口
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
//将抢占的未来的令牌数也添加到原来data中的当前时间窗口中
node.addOccupiedPass(acquireCount);
//等待到对用时间窗口到达
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
@Override
public double passQps() {
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
//acquireCount本次请求要消耗的令牌,threshold设置的令牌阈值
// IntervalProperty.INTERVAL 为一个时间间隔,默认1000ms,即1s
public long tryOccupyNext(long currentTime, int acquireCount, double threshold) {
//即一个时间间隔内产生的最大令牌数,后面的为1,实际上就是1s内定义的阈值令牌数
double maxCount = threshold \* IntervalProperty.INTERVAL / 1000;
//获得borrowArray中除去过期的所抢占的未来的令牌数 会调用currentWaiting方法
long currentBorrow = rollingCounterInSecond.waiting();
if (currentBorrow >= maxCount) {
//最大占用超时,以毫秒为单位,500ms
return OccupyTimeoutProperty.getOccupyTimeout();
}
// 1000ms / 2
// 一个时间窗口表示的时间长度
int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT;
// currentTime - currentTime % windowLength为该窗口的实际开始时间
// 再加上一个窗口的时间长度减去一个时间间隔
// windowLength 一般为500ms,INTERVAL为1000ms,所以总共要减去500ms
//即实际的开始时间向前退500ms,应该是比他早一个的时间窗口开始时间
long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL;
int idx = 0;
long currentPass = rollingCounterInSecond.pass();
while (earliestTime < currentTime) {
// 当前窗口所剩余的时间
long waitInMs = idx * windowLength + windowLength - currentTime % windowLength;
//设置的等待时间不能超过一个阈值500ms
if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) {
break;
}
//获取它的之前早的一个窗口的统计的请求数
long windowPass = rollingCounterInSecond.getWindowPass(earliestTime);
if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) {
return waitInMs;
}
//向后移动一个窗口
earliestTime += windowLength;
//当前一个窗口记录的通过量
currentPass -= windowPass;
idx++;
}
return OccupyTimeoutProperty.getOccupyTimeout();
}
@Override
public long currentWaiting() {
// A kind of BucketLeapArray that only reserves for future buckets
borrowArray.currentWindow();
long currentWaiting = 0;
//注意,在里面方法中判断时间窗口方法进行了重写
List<MetricBucket> list = borrowArray.values();
for (MetricBucket window : list) {
currentWaiting += window.pass();
}
return currentWaiting;
}
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
//isWindowDeprecated进行了重写,因为是FutureBucketLeapArray调用的
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
//因为它抢占式未来时间的窗口,即记录当前时间以后的数据,所以当当前时间比窗口时间的开始时间大,说明不是记录未来时间窗口的,所以失效了
public boolean isWindowDeprecated(long time, WindowWrap<MetricBucket> windowWrap) {
// Tricky: will only calculate for future.
return time >= windowWrap.windowStart();
}
//而普通的时间窗口只需要判断当前时间减去时间窗口的开始时间不超过时间间隔(秒级的话,即1s)即可,
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}
在这里,大家可能对OccupiableBucketLeapArray
这个类不是很了解,到现在我也没有理解很好,现在我从官网上直接摘抄一段,大体的意思可以体现出来。
引用:Sentinel 1.5.0 版本发布,引入 Reactive 支持
Sentinel1.5.0对底层的滑动窗口结构进行了升级,添加了“占用机制”,允许在当前QPS已经到达限流阈值时,同个资源高优先级的请求提前占用未来时间窗口的配额数,等待到对应时间窗口到达时直接通过,从而可以实现“最终通过”的效果而不是被立即拒绝;而同个资源低优先级的请求则不能占用未来的配额,阈值到达时就会被限流。
Sentinel1.5.0引入了FutureBucketLeapArray
,这是一种特殊的时间窗口,仅维持当前时间以后的格子, 从而可以用于统计未来被预先占用的配额数目。Sentinel将普通的滑动窗口与FutureBucketLeapArray
组合成可占用的滑动窗口OccupiableBucketLeapArray
,从而实现了“部分高优先级请求最终通过”的效果。我们可以调用SphU.entryWithPriority(resourceName
来标识本次调用为高优先级(prioritized = true)。
![](https://img.haomeiwen.com/i3130295/d3772f4e4c14dc2d.png)
参考文章:
Sentinel 1.5.0 版本发布,引入 Reactive 支持
Sentinel FlowSlot 限流实现原理(文末附流程图与总结)