sentinel

sentinel-熔断篇

2019-08-04  本文已影响0人  Audience0

1.规则定义:

DegradeRule rule = new DegradeRule();
        rule.setResource(resourceName);
        //RuleConstant.DEGRADE_GRADE_RT
        //平均相应时间,当1s内持续进入5个请求,对应时刻平均时间(秒级)均超过阈值,那么接下来的timewindow 秒之内,对这个方法熔断
        //DEGRADE_GRADE_EXCEPTION_RATIO
        //当资源每秒请求超过5次,且每秒异常总数占通过量的比值超过阈值之后,资源在接下来的timewindow秒进入降级,异常比率[0.0,1.0]
        //DEGRADE_GRADE_EXCEPTION_COUNT
        //当资源近1分钟的异常数目超过阈值之后会进行熔断,因为是1分钟级别,若timewindow小于60s,则结束熔断妆后仍可能进入熔断
        rule.setGrade(grade);
        rule.setCount(count);
        rule.setTimeWindow(timeWindow);
        rule.setLimitApp(limitName);

2.规则加载 DegradeRuleManager.loadRules

    //DegradeRuleManager的内部类
    private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {

        @Override
        public void configUpdate(List<DegradeRule> conf) {
            //校验 熔断规则
            Map<String, Set<DegradeRule>> rules = loadDegradeConf(conf);
            if (rules != null) {
                //清空原规则Map
                degradeRules.clear();
                //将所有规则放入Map
                degradeRules.putAll(rules);
            }
            RecordLog.info("[DegradeRuleManager] Degrade rules received: " + degradeRules);
        }

        private Map<String, Set<DegradeRule>> loadDegradeConf(List<DegradeRule> list) {
            Map<String, Set<DegradeRule>> newRuleMap = new ConcurrentHashMap<>();

            if (list == null || list.isEmpty()) {
                return newRuleMap;
            }

            for (DegradeRule rule : list) {
                //校验熔断规则
                if (!isValidRule(rule)) {
                    RecordLog.warn(
                        "[DegradeRuleManager] Ignoring invalid degrade rule when loading new rules: " + rule);
                    continue;
                }

                if (StringUtil.isBlank(rule.getLimitApp())) {
                    //赋值limitApp 初始值 default
                    rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
                }

                String identity = rule.getResource();
                Set<DegradeRule> ruleSet = newRuleMap.get(identity);
                if (ruleSet == null) {
                    ruleSet = new HashSet<>();
                    newRuleMap.put(identity, ruleSet);
                }
                ruleSet.add(rule);
            }

            return newRuleMap;
        }
    }
    //校验规则
    public static boolean isValidRule(DegradeRule rule) {
        boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource())
            && rule.getCount() >= 0 && rule.getTimeWindow() > 0;
        if (!baseValid) {
            return false;
        }
        // Warn for RT mode that exceeds the {@code TIME_DROP_VALVE}.
        int maxAllowedRt = Constants.TIME_DROP_VALVE;
        //超出此阈值的都会算作 4900 ms,若需要变更此上限可以通过启动配置项 -Dcsp.sentinel.statistic.max.rt=xxx 来配置。
        //SentinelConfig 这个类进行一些这种默认值的初始化以及读取JVM这些数据配置的值
        if (rule.getGrade() == RuleConstant.DEGRADE_GRADE_RT && rule.getCount() > maxAllowedRt) {
            RecordLog.warn(String.format("[DegradeRuleManager] WARN: setting large RT threshold (%.1f ms) in RT mode"
                    + " will not take effect since it exceeds the max allowed value (%d ms)", rule.getCount(),
                maxAllowedRt));
        }
        // Check exception ratio mode.
        if (rule.getGrade() == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO && rule.getCount() > 1) {
            return false;
        }
        return true;
    }
}

3.熔断校验DegradeSlot DegradeRuleManager DegradeRule

public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
        //校验是否熔断
        DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
        //下一个节点
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}
public final class DegradeRuleManager {
    private static final Map<String, Set<DegradeRule>> degradeRules = new ConcurrentHashMap<>();

    //校验熔断
    public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
        throws BlockException {
        //获取规则
        Set<DegradeRule> rules = degradeRules.get(resource.getName());
        if (rules == null) {
            return;
        }

        for (DegradeRule rule : rules) {
            //遍历规则,如果有一个规则命中,则抛出熔断异常
            if (!rule.passCheck(context, node, count)) {
                throw new DegradeException(rule.getLimitApp(), rule);
            }
        }
    }
}
public class DegradeRule extends AbstractRule {
  
    private static final int RT_MAX_EXCEED_N = 5;

    @SuppressWarnings("PMD.ThreadPoolCreationRule")
    private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(
        Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("sentinel-degrade-reset-task", true));

    @Override
    public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
        //熔断后cut会被设置成true,timewindow之后由定时任务改为false
        if (cut.get()) {
            return false;
        }
        //获取ClusterBuilderSlot赋值的ClusterNode节点
        ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
        if (clusterNode == null) {
            return true;
        }
        //RT熔断
        if (grade == RuleConstant.DEGRADE_GRADE_RT) {
            //获取平均RT(一秒)
            double rt = clusterNode.avgRt();
            if (rt < this.count) {
                //平均RT小于配置值count时,则通过校验
                //如果校验通过则,设置passCount为0,以重新计算当前秒校验失败的此数
                passCount.set(0);
                return true;
            }

            // 如果校验失败的此数少于5次,则算做校验通过  RT_MAX_EXCEED_N:默认值5
            if (passCount.incrementAndGet() < RT_MAX_EXCEED_N) {
                return true;
            }
            //按异常比率熔断
        } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
            //当前秒的异常数
            double exception = clusterNode.exceptionQps();
            //当前秒的成功数
            double success = clusterNode.successQps();
            //当前秒的总数(校验成功(pass)+校验失败的(block))MetricEvent
            double total = clusterNode.totalQps();
            // 总数小于5次 不降级
            if (total < RT_MAX_EXCEED_N) {
                return true;
            }
            double realSuccess = success - exception;
            if (realSuccess <= 0 && exception < RT_MAX_EXCEED_N) {
                return true;
            }

            if (exception / success < count) {
                return true;
            }
            //按异常次数降级
        } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
            //获取当前一分钟的异常总次数
            //因为此处是一分钟, 所以,如果timeWindow的时间结束后,仍然处于当前这一分钟范围,则应该会直接降级,因为当前分钟异常次数超过了规则配置次数
            double exception = clusterNode.totalException();
            //总次数小于配置count 则校验通过
            if (exception < count) {
                return true;
            }
        }
        //已经校验失败,应该降级
        //将cut 设置为true,为true时直接回降级

         //unsafe.compareAndSwapInt(this, valueOffset, e, u)
        //expect表示期望的值,即遇到这个值,则将value改为update值,在这里就是遇到false便改成true,更新成功返回true
        //如果遇到的值不是expect值则不做任何处理,并返回个false
        if (cut.compareAndSet(false, true)) {
            //创建定时任务
            ResetTask resetTask = new ResetTask(this);
            //timeWindow时间后,执行resetTask这个定时任务
            pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
        }

        return false;
    }

    //定时任务
    private static final class ResetTask implements Runnable {

        private DegradeRule rule;

        ResetTask(DegradeRule rule) {
            this.rule = rule;
        }

        @Override
        public void run() {
            //将校验失败次数设置为0,用来设定5次失败内不降级
            rule.getPassCount().set(0);
            //将cut设置为false,表示进行正常校验流程
            rule.cut.set(false);
        }
    }

}
上一篇 下一篇

猜你喜欢

热点阅读