Alibaba Sentinel DegradeRule 降级规
2020-09-10 本文已影响0人
xiaolyuh
降级方式
image.pngAlibaba Sentinel 支持多种降级方式:
- 根据响应时间:判断单位时间内平均响应时间是否达到阈值;
- 根据异常比例:判断单位时间内,异常数量和异常比例是否达到阈值;
- 根据异常数量:判断单位时间内异常数量是否达到阈值;
一旦触发熔断,熔断开关将会打开,这时将拒绝所有请求,拒绝时间为设置的降级时间间隔。通过源码我们可以发现,Sentinel直接使用的是ScheduledExecutorService
开启的一个延迟任务来实现降级时间间隔。
如:响应时间达到阈值,并且熔断时间间隔配置为5S,这时熔断开关会打开,并且在5S内拒绝所有请求,当5S后熔断开关再次关闭,这时会放行请求,如果放行请求后又触发了熔断,那么又需要等5S钟。
熔断降级规则说明
熔断降级规则(DegradeRule)包含下面几个重要的属性:
Field | 说明 | 默认值 |
---|---|---|
resource | 资源名,即规则的作用对象 | |
grade | 熔断策略,支持慢调用比例/异常比例/异常数策略 | 慢调用比例 |
count | 慢调用比例模式下为慢调用临界 RT(超出该值计为慢调用);异常比例/异常数模式下为对应的阈值 | |
timeWindow | 熔断时长,单位为 s | |
minRequestAmount | 熔断触发的最小请求数,请求数小于该值时即使异常比率超出阈值也不会熔断(1.7.0 引入) | 5 |
statIntervalMs | 统计时长(单位为 ms),如 60*1000 代表分钟级(1.8.0 引入) | 1000 ms |
slowRatioThreshold | 慢调用比例阈值,仅慢调用比例模式有效(1.8.0 引入) |
最新参数请看官网 。
源码
public class DegradeRule extends AbstractRule {
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("sentinel-degrade-reset-task", true));
public DegradeRule() {}
public DegradeRule(String resourceName) {
setResource(resourceName);
}
/**
* RT threshold or exception ratio threshold count.
*/
private double count;
/**
* Degrade recover timeout (in seconds) when degradation occurs.
*/
private int timeWindow;
/**
* Degrade strategy (0: average RT, 1: exception ratio, 2: exception count).
*/
private int grade = RuleConstant.DEGRADE_GRADE_RT;
/**
* Minimum number of consecutive slow requests that can trigger RT circuit breaking.
*
* @since 1.7.0
*/
private int rtSlowRequestAmount = RuleConstant.DEGRADE_DEFAULT_SLOW_REQUEST_AMOUNT;
/**
* Minimum number of requests (in an active statistic time span) that can trigger circuit breaking.
*
* @since 1.7.0
*/
private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;
...
// Internal implementation (will be deprecated and moved outside).
private AtomicLong passCount = new AtomicLong(0);
private final AtomicBoolean cut = new AtomicBoolean(false);
@Override
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
// 判断熔断开关的状态
if (cut.get()) {
return false;
}
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}
if (grade == RuleConstant.DEGRADE_GRADE_RT) { // 基于RT的熔断模式
// 获取平均RT
double rt = clusterNode.avgRt();
// 判断平均RT是否达到阈值,如果没有就放行
if (rt < this.count) {
passCount.set(0);
return true;
}
// 如果RT达到阈值,还需要判断单位时间内请求量是否达到阈值(默认是5)
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) { // 基于异常比例的熔断策略
double exception = clusterNode.exceptionQps();
double success = clusterNode.successQps();
double total = clusterNode.totalQps();
// If total amount is less than minRequestAmount, the request will pass.
// 判断单位时间内请求数量是否发到熔断阈值(默认5)
if (total < minRequestAmount) {
return true;
}
// In the same aligned statistic time window,
// "success" (aka. completed count) = exception count + non-exception count (realSuccess)
double realSuccess = success - exception;
// 判断单位时间内异常请求数量是否达到阈值
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}
// 判断异常比例是否达到阈值
if (exception / success < count) {
return true;
}
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) { // 基于异常数量的熔断策略
double exception = clusterNode.totalException();
// 判断异常数量是否达到阈值
if (exception < count) {
return true;
}
}
if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
return false;
}
private static final class ResetTask implements Runnable {
private DegradeRule rule;
ResetTask(DegradeRule rule) {
this.rule = rule;
}
@Override
public void run() {
rule.passCount.set(0);
rule.cut.set(false);
}
}
}
基于RT的熔断流程图:
image.png总结
- 有一个值得借鉴的地方,使用
ScheduledExecutorService
来实现延迟任务的执行。 - 基于RT的熔断,框架是基于所有请求的平均响应时间来实现的,这种方式不会产生上下文切换。还有一种简单的方式,这种方式采用
FutureTask
机制,但是会产生上下文切换,如:
ExecutorService executorService = Executors.newFixedThreadPool(10);
FutureTask<String> futureTask = new FutureTask<String>(() -> {
// 业务逻辑
return "处理结果";
});
executorService.submit(futureTask);
futureTask.get(5, TimeUnit.SECONDS);