Sentinel

2020-07-02  本文已影响0人  剑道_7ffc

主要特性

限流,熔断,降级,监控等功能


image.png

基本使用

QPS(Queries-per-second):每秒的请求次数
资源:接口和方法

快速入门

整体思路是类似于锁,若成功获取资源,则执行下面的逻辑,若没有则抛出阻塞异常。
https://github.com/alibaba/Sentinel/wiki/%E6%96%B0%E6%89%8B%E6%8C%87%E5%8D%97#%E5%85%AC%E7%BD%91-demo

控制台

https://github.com/alibaba/Sentinel/wiki/%E6%8E%A7%E5%88%B6%E5%8F%B0#2-%E5%90%AF%E5%8A%A8%E6%8E%A7%E5%88%B6%E5%8F%B0

案例代码

@Configuration
public class AopConfiguration {
    @Bean
    public SentinelResourceAspect sentinelResourceAspect(){
        return new SentinelResourceAspect();
    }
}
@RestController
public class SentinelController {

    @SentinelResource(value = "sayHello") //针对方法级别的限流
    @GetMapping("/say")
    public String sayHello(){
        System.out.println("hello world");
        return "hello world";
    }
}
@SpringBootApplication
public class SentinelDemoApplication {

    public static void main(String[] args) {
        initFlowRules();
        SpringApplication.run(SentinelDemoApplication.class, args);
    }

    //初始化规则
    private static void initFlowRules(){
        List<FlowRule> rules=new ArrayList<>(); //限流规则的集合
        FlowRule flowRule=new FlowRule();
        flowRule.setResource("sayHello");//资源(方法名称、接口)
        flowRule.setGrade(RuleConstant.FLOW_GRADE_QPS); //限流的阈值的类型
        flowRule.setCount(10);
        rules.add(flowRule);
        FlowRuleManager.loadRules(rules);
    }

}

运行结果


image.png

实现原理

链路如何添加的

责任链模式


image.png
image.png
源码分析

1 com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder#build
通过spi来实现责任链的获取

public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();

    // Note: the instances of ProcessorSlot should be different, since they are not stateless.
    List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
    for (ProcessorSlot slot : sortedSlotList) {
        if (!(slot instanceof AbstractLinkedProcessorSlot)) {
            RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
            continue;
        }

        chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
    }

    return chain;
}

2 com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain#addLast
通过addLast构建上下级关系,第一个end是一个抽象内部类,只是起一个开头的作用。

public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
    end.setNext(protocolProcessor);
    end = protocolProcessor;
}
image.png

如何实现限流

滑动窗口(StatisticSlot)


image.png

sampleCount:表示1ms中滑动窗口的总个数
intervalInMs:表示把1ms分成多少份
windowLengthInMs:表示1个活动窗口占多少份

源码分析

1 com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
    // Request passed, add thread count and pass count.
    node.increaseThreadNum();
    node.addPassRequest(count);//滑动窗口

    // Handle pass event with registered entry callback handlers.
    for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
        handler.onPass(context, resourceWrapper, node, count, args);
    }
}

2 com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#currentWindow

public WindowWrap<T> currentWindow(long timeMillis) {
    // 910/200=4
    int idx = calculateTimeIdx(timeMillis);
    // 第五个窗口的开始时间:800
    long windowStart = calculateWindowStart(timeMillis);

    /*
     * Get bucket item at given time from the array.
     *
     * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
     * (2) Bucket is up-to-date, then just return the bucket.
     * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
     */
    while (true) {
        WindowWrap<T> old = array.get(idx);
        if (old == null) {
            /*
             *     B0       B1      B2    NULL      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             *            bucket is empty, so create new and update
             *
             * If the old bucket is absent, then we create a new bucket at {@code windowStart},
             * then try to update circular array via a CAS operation. Only one thread can
             * succeed to update, while other threads yield its time slice.
             */
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            if (array.compareAndSet(idx, null, window)) {
                // Successfully updated, return the created bucket.
                return window;
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart == old.windowStart()) {
            /*
             *     B0       B1      B2     B3      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             *            startTime of Bucket 3: 800, so it's up-to-date
             *
             * If current {@code windowStart} is equal to the start timestamp of old bucket,
             * that means the time is within the bucket, so directly return the bucket.
             */
            return old;
        } else if (windowStart > old.windowStart()) {
            /*
             *   (old)
             *             B0       B1      B2    NULL      B4
             * |_______||_______|_______|_______|_______|_______||___
             * ...    1200     1400    1600    1800    2000    2200  timestamp
             *                              ^
             *                           time=1676
             *          startTime of Bucket 2: 400, deprecated, should be reset
             *
             * If the start timestamp of old bucket is behind provided time, that means
             * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
             * Note that the reset and clean-up operations are hard to be atomic,
             * so we need a update lock to guarantee the correctness of bucket update.
             *
             * The update lock is conditional (tiny scope) and will take effect only when
             * bucket is deprecated, so in most cases it won't lead to performance loss.
             */
            if (updateLock.tryLock()) {
                try {
                    // Successfully get the update lock, now we reset the bucket.
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            // Should not go through here, as the provided time is already behind.
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
    }

规则如何判断的

FlowSlot

源码分析

1 com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
    if (rules != null) {
        for (FlowRule rule : rules) {
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读