javaWeb学习架构

微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码

2022-01-31  本文已影响0人  多氯环己烷

前言

参考资料
《Spring Microservices in Action》
《Spring Cloud Alibaba 微服务原理与实战》
《B站 尚硅谷 SpringCloud 框架开发教程 周阳》
《Sentinel GitHub 官网》
《Sentinel 官网》

调用链路是 Sentinel 的工作主流程,由各个 Slot 槽组成,将不同的 Slot 槽按照顺序串在一起,从而将不同的功能(限流、降级、系统保护)组合在一起;

本篇《2. 获取 ProcessorSlot 链》将从源码级讲解如何获取调用链路,接着会以遍历链表的方式处理每一个 Slot 槽,其中就有:FlowSlot、StatisticSlot、DegradeSlot 等。分别对应本篇《3. 流控槽实施流控逻辑》、《4. 统计槽实施指标数据统计》和《5. 熔断槽实施服务熔断》;


1. Sentinel 的自动装配

1.2 依赖引入

Sentinel 的自动装配.png

1.3 SentinelWebAutoConfiguration 配置类

@Configuration
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelWebAutoConfiguration {
    
    //省略其他代码
    
    @Bean
    @ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled", matchIfMissing = true)
    public FilterRegistrationBean sentinelFilter() {
        FilterRegistrationBean<Filter> registration = new FilterRegistrationBean<>();

        SentinelProperties.Filter filterConfig = properties.getFilter();

        if (filterConfig.getUrlPatterns() == null || filterConfig.getUrlPatterns().isEmpty()) {
            List<String> defaultPatterns = new ArrayList<>();
            //默认情况下通过 /* 规则拦截所有的请求
            defaultPatterns.add("/*");
            filterConfig.setUrlPatterns(defaultPatterns);
        }

        registration.addUrlPatterns(filterConfig.getUrlPatterns().toArray(new String[0]));
        //【点进去】注册 CommonFilter
        Filter filter = new CommonFilter();
        registration.setFilter(filter);
        registration.setOrder(filterConfig.getOrder());
        registration.addInitParameter("HTTP_METHOD_SPECIFY", String.valueOf(properties.getHttpMethodSpecify()));
        log.info("[Sentinel Starter] register Sentinel CommonFilter with urlPatterns: {}.", filterConfig.getUrlPatterns());
        return registration;
    }
}

1.4 CommonFilter 过滤器

public class CommonFilter implements Filter {
    
    //省略部分代码

    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HttpServletRequest sRequest = (HttpServletRequest)request;
        Entry urlEntry = null;
        try {
            //解析请求 URL
            String target = FilterUtil.filterTarget(sRequest);
            //URL 清洗
            UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner();
            if (urlCleaner != null) {
                //如果存在,则说明配置过 URL 清洗策略,替换配置的 targer
                target = urlCleaner.clean(target);
            }
            if (!StringUtil.isEmpty(target)) {
                String origin = this.parseOrigin(sRequest);
                ContextUtil.enter("sentinel_web_servlet_context", origin);
                if (this.httpMethodSpecify) {
                    String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + ":" + target;
                    //使用 SphU.entry() 方法对 URL 添加限流埋点
                    urlEntry = SphU.entry(pathWithHttpMethod, 1, EntryType.IN);
                } else {
                    urlEntry = SphU.entry(target, 1, EntryType.IN);
                }
            }
            //执行过滤
            chain.doFilter(request, response);
        } catch (BlockException var14) {
            HttpServletResponse sResponse = (HttpServletResponse)response;
            WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, var14);
        } catch (ServletException | RuntimeException | IOException var15) {
            Tracer.traceEntry(var15, urlEntry);
            throw var15;
        } finally {
            if (urlEntry != null) {
                urlEntry.exit();
            }
            ContextUtil.exit();
        }
    }
}

1.5 小结

2. 获取 ProcessorSlot 链

2.1 Sentinel 源码包结构

模块名 说明
sentinel-adapter 负责针对主流开源框架进行限流适配,如:Dubbo、gRPC、Zuul 等;
sentinel-core Sentinel 核心库,提供限流、熔断等实现;
sentinel-dashboard 控制台模块,提供可视化监控和管理;
sentinel-demo 官方案例;
sentinel-extension 实现不同组件的数据源扩展,如:Nacos、ZooKeeper、Apollo 等;
sentinel-transport 通信协议处理模块;
首次DeBug 进入 SphU.entry() 方法.png

2.2 获取 ProcessorSlot 链与操作 Slot 槽的入口 CtSph.entryWithPriority()

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
    Context context = ContextUtil.getContext();
    if (context instanceof NullContext) {
        //上下文量已经超过阈值 -> 只初始化条目,不进行规则检查
        return new CtEntry(resourceWrapper, null, context);
    }

    if (context == null) {
        //没有指定上下文 -> 使用默认上下文 context
        context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
    }
     
     if (!Constants.ON) {
        //全局开关关闭 -> 没有规则检查
        return new CtEntry(resourceWrapper, null, context);
    }
    //【断点步入 2.2.1】通过 lookProcessChain 方法获取 ProcessorSlot 链
    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);


    if (chain == null) {
        //表示资源量超过 Constants.MAX_SLOT_CHAIN_SIZE 常量 -> 不会进行规则检查
        return new CtEntry(resourceWrapper, null, context);
    }

    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        //【断点步入 3./4./5.】执行 ProcessorSlot 对 ProcessorSlot 链中的 Slot 槽遍历操作(遍历链表的方式)
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
        //这种情况不应该发生,除非 Sentinel 内部存在错误
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}

2.2.1 构造 ProcessorSlot 链 CtSph.lookProcessChain()

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    //从缓存中获取 slot 调用链
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry size limit.
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }
                //【断点步入】构造 Slot 链(责任链模式)
                chain = SlotChainProvider.newSlotChain();
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                    chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}
@Override
public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
    for (ProcessorSlot slot : sortedSlotList) {
        if (!(slot instanceof AbstractLinkedProcessorSlot)) {
            RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
            continue;
        }
        chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
    }
    return chain;
}
ProcessorSlotChain 链中有 10 个 Slot 插槽.png

2.2.2 操作 Slot 槽的入口

3. 流控槽实施流控逻辑 FlowSlot.entry()

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    //【断点步入】检查流量规则
    checkFlow(resourceWrapper, context, node, count, prioritized);
    //调用下一个 Slot
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    if (ruleProvider == null || resource == null) {
        return;
    }
    //【断点步入 3.1】获取流控规则
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
    if (rules != null) {
        //遍历所有流控规则 FlowRule
        for (FlowRule rule : rules) {
            //【点进去 3.2】校验每条规则
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

3.1 获取流控规则 FlowSlot.ruleProvider.apply()

private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
    @Override
    public Collection<FlowRule> apply(String resource) {
        // Flow rule map should not be null.
        Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
        return flowRules.get(resource);
    }
};

3.2 校验每条规则 FlowRuleChecker.canPassCheck()

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
    String limitApp = rule.getLimitApp();
    if (limitApp == null) {
        return true;
    }
    //集群模式
    if (rule.isClusterMode()) {
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }
    //【点进去】单机模式
    return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
    //【点进去 3.2.1】获取 Node
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    if (selectedNode == null) {
        return true;
    }
    //【点进去 3.2.2】获取流控的处理策略
    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}

3.2.1 获取 Node FlowRuleChecker.selectNodeByRequesterAndStrategy()

static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
    //limitApp 不能为空
    String limitApp = rule.getLimitApp();
    int strategy = rule.getStrategy();
    String origin = context.getOrigin();
    
    //场景1:限流规则设置了具体应用,如果当前流量就是通过该应用的,则命中场景1
    if (limitApp.equals(origin) && filterOrigin(origin)) {
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            // Matches limit origin, return origin statistic node.
            return context.getOriginNode();
        }
        return selectReferenceNode(rule, context, node);
    } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
    //场景2:限流规则未指定任何具体应,默认为default,则当前流量直接命中场景2
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            // Return the cluster node.
            return node.getClusterNode();
        }

        return selectReferenceNode(rule, context, node);
    } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
    //场景3:限流规则设置的是other,当前流量未命中前两种场景
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            return context.getOriginNode();
        }
        return selectReferenceNode(rule, context, node);
    }
    return null;
}

3.2.2 获取流控的处理策略 `FlowRule.getRater().canPass()

TrafficShapingController 的四种实现类.png

4. 统计槽实施指标数据统计 StatisticSlot.entry()

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    try {
        //先执行后续 Slot 检查,再统计数据(即先调用后续所有 Slot)
        fireEntry(context, resourceWrapper, node, count, prioritized, args);

        //【断点步入】使用 Node 统计“增加线程数”和“请求通过数”
        node.increaseThreadNum();
        node.addPassRequest(count);

        //如果存在来源节点,则对来源节点增加线程数和请求通过数
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest(count);
        }
        
        //如果是入口流量,则对全局节点增加线程数和请求通过数
        if (resourceWrapper.getEntryType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }

        //执行事件通知和回调函数
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    //处理优先级等待异常    
    } catch (PriorityWaitException ex) {
        node.increaseThreadNum();
        //如果有来源节点,则对来源节点增加线程数
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseThreadNum();
        }

        //如果是入口流量,对全局节点增加线程数
        if (resourceWrapper.getEntryType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseThreadNum();
        }
        //执行事件通知和回调函数
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    //处理限流、熔断等异常    
    } catch (BlockException e) {
        
        //省略
        
        throw e;
    //处理业务异常    
    } catch (Throwable e) {
        context.getCurEntry().setError(e);
        throw e;
    }
}

4.1 统计“增加线程数”和“请求通过数”

public class StatisticNode implements Node {

    //省略其他代码

    //【断点步入】最近 1s 滑动窗口计数器(默认 1s)
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

    //最近 1min 滑动窗口计数器(默认 1min)
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
    
    //增加 “请求通过数” 
    @Override
    public void addPassRequest(int count) {
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }
    //增加 RT 和成功数
    @Override
    public void addRtAndSuccess(long rt, int successCount) {
        rollingCounterInSecond.addSuccess(successCount);
        rollingCounterInSecond.addRT(rt);
        rollingCounterInMinute.addSuccess(successCount);
        rollingCounterInMinute.addRT(rt);
    }

    //增加“线程数”
    @Override
    public void increaseThreadNum() {
        curThreadNum.increment();
    }
}

4.2 数据统计的数据结构

4.2.1 ArrayMetric 指标数组

public class ArrayMetric implements Metric {
    
    //省略其他代码

    //【点进去 4.2.2】数据存储
    private final LeapArray<MetricBucket> data;
    
    //最近 1s 滑动计数器用的是 OccupiableBucketLeapArray
    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }
    
    //最近 1min 滑动计数器用的是 BucketLeapArray
    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }

    //增加成功数
    @Override
    public void addSuccess(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addSuccess(count);
    }

    //增加通过数
    @Override
    public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addPass(count);
    }

    //增加 RT
    @Override
    public void addRT(long rt) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addRT(rt);
    }
}

4.2.2 LeapArray 环形数组

public abstract class LeapArray<T> {

    //省略其他代码

    //单个窗口的长度(1个窗口多长时间)
    protected int windowLengthInMs;
    //采样窗口个数
    protected int sampleCount;
    //全部窗口的长度(全部窗口多长时间)
    protected int intervalInMs;
    private double intervalInSecond;
    //窗口数组:存储所有窗口(支持原子读取和写入)
    protected final AtomicReferenceArray<WindowWrap<T>> array;
    //更新窗口数据时用的锁
    private final ReentrantLock updateLock = new ReentrantLock();

    public LeapArray(int sampleCount, int intervalInMs) {
        //计算单个窗口的长度
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = intervalInMs / 1000.0;
        this.sampleCount = sampleCount;
        this.array = new AtomicReferenceArray<>(sampleCount);
    }
    //【点进去 4.2.3】获取当前窗口
    public WindowWrap<T> currentWindow() {
        //这里参数是当前时间
        return currentWindow(TimeUtil.currentTimeMillis());
    }
    //获取指定时间的窗口
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        // 计算数组下标
        int idx = calculateTimeIdx(timeMillis);
        //计算当前请求对应的窗口开始时间
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * 从 array 中获取窗口。有 3 种情况:
         * (1) array 中窗口不在,创建一个 CAS 并写入 array;
         * (2) array 中窗口开始时间 = 当前窗口开始时间,直接返回;
         * (3) array 中窗口开始时间 < 当前窗口开始时间,表示 o1d 窗口已过期,重置窗口数据并返回;
         */
        while (true) {
            // 取窗口
            WindowWrap<T> old = array.get(idx);
            //(1)窗口不在
            if (old == null) {
                //创建一个窗口
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                //CAS将窗口写进 array 中并返回(CAS 操作确保只初始化一次)
                if (array.compareAndSet(idx, null, window)) {
                    return window;
                } else {
                    //并发写失败,释放 CPU 资源,避免有线程长时间占用 CPU,一般下次来的时候 array 中有数据了会命中第2种情况;
                    Thread.yield();
                }
            //(2)array 中窗口开始时间 = 当前窗口开始时间
            } else if (windowStart == old.windowStart()) {
                //直接返回
                return old;
            //(3)array 中窗口开始时间 < 当前窗口开始时间    
            } else if (windowStart > old.windowStart()) {
                //尝试获取更新锁
                if (updateLock.tryLock()) {
                    try {
                        //拿到锁的线程才重置窗口
                        return resetWindowTo(old, windowStart);
                    } finally {
                        //释放锁
                        updateLock.unlock();
                    }
                } else {
                    //并发加锁失败,释放 CPU 资源,避免有线程长时间占用 CPU,一般下次来的时候因为 old 对象时间更新了会命中第 2 种情况;
                    Thread.yield();
                }
            //理论上不会出现    
            } else if (windowStart < old.windowStart()) {
                // 正常情况不会进入该分支(机器时钟回拨等异常情况)
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
    //计算索引
    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
        //timeId 降低时间精度
        long timeId = timeMillis / windowLengthInMs;
        //计算当前索引,这样我们就可以将时间戳映射到 leap 数组
        return (int)(timeId % array.length());
    }
    //计算窗口开始时间
    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
        return timeMillis - timeMillis % windowLengthInMs;
    }
}

4.2.3 WindowWrap 窗口包装类

public class WindowWrap<T> {
    //窗口长度,与 LeapArray 的 windowLengthInMs 一致
    private final long windowLengthInMs;
    //窗口开始时间,其值是 windowLengthInMs 的整数倍
    private long windowStart;
    //窗口的数据,支持 MetricBucket 类型,存储统计数据
    private T value;

    //省略其他代码
}

4.2.4 MetricBucket 指标桶

public class MetricBucket {
    /**
     * 存储指标的计数器;
     * LongAdder 是线程安全的计数器
     * counters[0]  PASS 通过数;
     * counters[1]  BLOCK 拒绝数;
     * counters[2]  EXCEPTION 异常数;
     * counters[3]  SUCCESS 成功数;
     * counters[4]  RT 响应时长;
     * counters[5]  OCCUPIED_PASS 预分配通过数;
     **/
    private final LongAdder[] counters;

    //最小 RT,默认值是 5000ms
    private volatile long minRt;

    //构造中初始化
    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }

    //覆盖指标
    public MetricBucket reset(MetricBucket bucket) {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
            counters[event.ordinal()].add(bucket.get(event));
        }
        initMinRt();
        return this;
    }

    private void initMinRt() {
        this.minRt = SentinelConfig.statisticMaxRt();
    }

    //重置指标为0
    public MetricBucket reset() {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
        }
        initMinRt();
        return this;
    }
    //获取指标,从 counters 中返回
    public long get(MetricEvent event) {
        return counters[event.ordinal()].sum();
    }
    //添加指标
    public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
    }

    public long pass() {
        return get(MetricEvent.PASS);
    }

    public long block() {
        return get(MetricEvent.BLOCK);
    }

    public void addPass(int n) {
        add(MetricEvent.PASS, n);
    }

    public void addBlock(int n) {
        add(MetricEvent.BLOCK, n);
    }

    //省略其他代码
}

4.2.5 各数据结构的依赖关系

各数据结构的 UML 图 结构示意图.png

4.2.6 LeapArray 统计数据的大致思路

5. 熔断槽实施服务熔断 DegradeSlot.entry()

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    //【断点步入】熔断检查
    performChecking(context, resourceWrapper);
    //调用下一个 Slot
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
    //根据 resourceName 获取断路器
    List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
    if (circuitBreakers == null || circuitBreakers.isEmpty()) {
        return;
    }
    //循环判断每个断路器
    for (CircuitBreaker cb : circuitBreakers) {
        //【点进去】尝试通过断路器
        if (!cb.tryPass(context)) {
            throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
        }
    }
}

5.1 继续或取消熔断功能

@Override
public boolean tryPass(Context context) {
    //当前断路器状态为关闭
    if (currentState.get() == State.CLOSED) {
        return true;
    }
    if (currentState.get() == State.OPEN) {
        //【点进去】对于半开状态,我们尝试通过
        return retryTimeoutArrived() && fromOpenToHalfOpen(context);
    }
    return false;
}
protected boolean fromOpenToHalfOpen(Context context) {
    //尝试将状态从 OPEN 设置为 HALF_OPEN
    if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
        //状态变化通知
        notifyObservers(State.OPEN, State.HALF_OPEN, null);
        Entry entry = context.getCurEntry();
        //在 entry 添加一个 exitHandler  entry.exit() 时会调用
        entry.whenTerminate(new BiConsumer<Context, Entry>() {
            @Override
            public void accept(Context context, Entry entry) {
                //如果有发生异常,重新将状态设置为OPEN 请求不同通过
                if (entry.getBlockError() != null) {
                    currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
                    notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
                }
            }
        });
        //此时状态已设置为HALF_OPEN正常通行
        return true;
    }
    //熔断
    return false;
}

5.2 请求失败,启动熔断

@Override
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
    Entry curEntry = context.getCurEntry();
    //无阻塞异常
    if (curEntry.getBlockError() != null) {
        fireExit(context, r, count, args);
        return;
    }
    //通过资源名获取断路器
    List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
    //没有配置断路器,则直接放行
    if (circuitBreakers == null || circuitBreakers.isEmpty()) {
        fireExit(context, r, count, args);
        return;
    }

    if (curEntry.getBlockError() == null) {
        for (CircuitBreaker circuitBreaker : circuitBreakers) {
            //【点进去】在请求完成时
            circuitBreaker.onRequestComplete(context);
        }
    }
    fireExit(context, r, count, args);
}
@Override
public void onRequestComplete(Context context) {
    Entry entry = context.getCurEntry();
    if (entry == null) {
        return;
    }
    Throwable error = entry.getError();
    //简单错误计数器
    SimpleErrorCounter counter = stat.currentWindow().value();
    if (error != null) {
        //异常请求数加 1
        counter.getErrorCount().add(1);
    }
    //总请求数加 1
    counter.getTotalCount().add(1);
    //【点进去】超过阈值时变更状态
    handleStateChangeWhenThresholdExceeded(error);
}
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
    //全开则直接放行
    if (currentState.get() == State.OPEN) {
        return;
    }
    //半开状态
    if (currentState.get() == State.HALF_OPEN) {
        //检查请求
        if (error == null) {
            //发生异常,将状态从半开 HALF_OPEN 转为关闭 CLOSE
            fromHalfOpenToClose();
        } else {
            //无异常,解开半开状态
            fromHalfOpenToOpen(1.0d);
        }
        return;
    }
    
    //计算是否超过阈值
    List<SimpleErrorCounter> counters = stat.values();
    long errCount = 0;
    long totalCount = 0;
    for (SimpleErrorCounter counter : counters) {
        errCount += counter.errorCount.sum();
        totalCount += counter.totalCount.sum();
    }
    if (totalCount < minRequestAmount) {
        return;
    }
    double curCount = errCount;
    if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
        //熔断策略为:异常比例
        curCount = errCount * 1.0d / totalCount;
    }
    if (curCount > threshold) {
        transformToOpen(curCount);
    }
}

6. Sentinel 源码结构图小结


最后

\color{blue}{\rm\small{新人制作,如有错误,欢迎指出,感激不尽!}}

\color{blue}{\rm\small{欢迎关注我,并与我交流!}}

\color{blue}{\rm\small{如需转载,请标注出处!}}

上一篇下一篇

猜你喜欢

热点阅读