sentinel-熔断篇
2019-08-04 本文已影响0人
Audience0
1.规则定义:
DegradeRule rule = new DegradeRule();
rule.setResource(resourceName);
//RuleConstant.DEGRADE_GRADE_RT
//平均相应时间,当1s内持续进入5个请求,对应时刻平均时间(秒级)均超过阈值,那么接下来的timewindow 秒之内,对这个方法熔断
//DEGRADE_GRADE_EXCEPTION_RATIO
//当资源每秒请求超过5次,且每秒异常总数占通过量的比值超过阈值之后,资源在接下来的timewindow秒进入降级,异常比率[0.0,1.0]
//DEGRADE_GRADE_EXCEPTION_COUNT
//当资源近1分钟的异常数目超过阈值之后会进行熔断,因为是1分钟级别,若timewindow小于60s,则结束熔断妆后仍可能进入熔断
rule.setGrade(grade);
rule.setCount(count);
rule.setTimeWindow(timeWindow);
rule.setLimitApp(limitName);
2.规则加载 DegradeRuleManager.loadRules
//DegradeRuleManager的内部类
private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {
@Override
public void configUpdate(List<DegradeRule> conf) {
//校验 熔断规则
Map<String, Set<DegradeRule>> rules = loadDegradeConf(conf);
if (rules != null) {
//清空原规则Map
degradeRules.clear();
//将所有规则放入Map
degradeRules.putAll(rules);
}
RecordLog.info("[DegradeRuleManager] Degrade rules received: " + degradeRules);
}
private Map<String, Set<DegradeRule>> loadDegradeConf(List<DegradeRule> list) {
Map<String, Set<DegradeRule>> newRuleMap = new ConcurrentHashMap<>();
if (list == null || list.isEmpty()) {
return newRuleMap;
}
for (DegradeRule rule : list) {
//校验熔断规则
if (!isValidRule(rule)) {
RecordLog.warn(
"[DegradeRuleManager] Ignoring invalid degrade rule when loading new rules: " + rule);
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
//赋值limitApp 初始值 default
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
String identity = rule.getResource();
Set<DegradeRule> ruleSet = newRuleMap.get(identity);
if (ruleSet == null) {
ruleSet = new HashSet<>();
newRuleMap.put(identity, ruleSet);
}
ruleSet.add(rule);
}
return newRuleMap;
}
}
//校验规则
public static boolean isValidRule(DegradeRule rule) {
boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource())
&& rule.getCount() >= 0 && rule.getTimeWindow() > 0;
if (!baseValid) {
return false;
}
// Warn for RT mode that exceeds the {@code TIME_DROP_VALVE}.
int maxAllowedRt = Constants.TIME_DROP_VALVE;
//超出此阈值的都会算作 4900 ms,若需要变更此上限可以通过启动配置项 -Dcsp.sentinel.statistic.max.rt=xxx 来配置。
//SentinelConfig 这个类进行一些这种默认值的初始化以及读取JVM这些数据配置的值
if (rule.getGrade() == RuleConstant.DEGRADE_GRADE_RT && rule.getCount() > maxAllowedRt) {
RecordLog.warn(String.format("[DegradeRuleManager] WARN: setting large RT threshold (%.1f ms) in RT mode"
+ " will not take effect since it exceeds the max allowed value (%d ms)", rule.getCount(),
maxAllowedRt));
}
// Check exception ratio mode.
if (rule.getGrade() == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO && rule.getCount() > 1) {
return false;
}
return true;
}
}
3.熔断校验DegradeSlot DegradeRuleManager DegradeRule
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
//校验是否熔断
DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
//下一个节点
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
public final class DegradeRuleManager {
private static final Map<String, Set<DegradeRule>> degradeRules = new ConcurrentHashMap<>();
//校验熔断
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
throws BlockException {
//获取规则
Set<DegradeRule> rules = degradeRules.get(resource.getName());
if (rules == null) {
return;
}
for (DegradeRule rule : rules) {
//遍历规则,如果有一个规则命中,则抛出熔断异常
if (!rule.passCheck(context, node, count)) {
throw new DegradeException(rule.getLimitApp(), rule);
}
}
}
}
public class DegradeRule extends AbstractRule {
private static final int RT_MAX_EXCEED_N = 5;
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("sentinel-degrade-reset-task", true));
@Override
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
//熔断后cut会被设置成true,timewindow之后由定时任务改为false
if (cut.get()) {
return false;
}
//获取ClusterBuilderSlot赋值的ClusterNode节点
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}
//RT熔断
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
//获取平均RT(一秒)
double rt = clusterNode.avgRt();
if (rt < this.count) {
//平均RT小于配置值count时,则通过校验
//如果校验通过则,设置passCount为0,以重新计算当前秒校验失败的此数
passCount.set(0);
return true;
}
// 如果校验失败的此数少于5次,则算做校验通过 RT_MAX_EXCEED_N:默认值5
if (passCount.incrementAndGet() < RT_MAX_EXCEED_N) {
return true;
}
//按异常比率熔断
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
//当前秒的异常数
double exception = clusterNode.exceptionQps();
//当前秒的成功数
double success = clusterNode.successQps();
//当前秒的总数(校验成功(pass)+校验失败的(block))MetricEvent
double total = clusterNode.totalQps();
// 总数小于5次 不降级
if (total < RT_MAX_EXCEED_N) {
return true;
}
double realSuccess = success - exception;
if (realSuccess <= 0 && exception < RT_MAX_EXCEED_N) {
return true;
}
if (exception / success < count) {
return true;
}
//按异常次数降级
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
//获取当前一分钟的异常总次数
//因为此处是一分钟, 所以,如果timeWindow的时间结束后,仍然处于当前这一分钟范围,则应该会直接降级,因为当前分钟异常次数超过了规则配置次数
double exception = clusterNode.totalException();
//总次数小于配置count 则校验通过
if (exception < count) {
return true;
}
}
//已经校验失败,应该降级
//将cut 设置为true,为true时直接回降级
//unsafe.compareAndSwapInt(this, valueOffset, e, u)
//expect表示期望的值,即遇到这个值,则将value改为update值,在这里就是遇到false便改成true,更新成功返回true
//如果遇到的值不是expect值则不做任何处理,并返回个false
if (cut.compareAndSet(false, true)) {
//创建定时任务
ResetTask resetTask = new ResetTask(this);
//timeWindow时间后,执行resetTask这个定时任务
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
return false;
}
//定时任务
private static final class ResetTask implements Runnable {
private DegradeRule rule;
ResetTask(DegradeRule rule) {
this.rule = rule;
}
@Override
public void run() {
//将校验失败次数设置为0,用来设定5次失败内不降级
rule.getPassCount().set(0);
//将cut设置为false,表示进行正常校验流程
rule.cut.set(false);
}
}
}