分布式服务一些收藏

第四章 Sentine 滑动时间窗口设计

2022-06-06  本文已影响0人  原水寒

滑动时间窗口

首先来看时间窗口,假设我们要统计QPS(每秒内的访问数),并且要求QPS<=200。在如下两个时间窗口内,各自的时间窗口内进行统计,大致一看没啥问题。


image.png

再看下图,在每一个时间窗口内都没啥问题,所以根据统计结果不会限流。但是在第一个的后500ms+第二个的前500ms内的QPS=380>200,超出了系统的承受能力。所以此时出现了滑动事件窗口。


image.png
滑动时间窗口将原来的1s跨度窗口按照需求进行拆分,比如拆分成2个500ms的窗口,这样随着窗口的滑动,中间的两个500ms也能组成一个1s的窗口,此时发现这个窗口内的QPS=380>200,会进行限流。当然,按照假设如下的四个窗口中,第一个的后250ms+第二个窗口500ms+第三个窗口的前250ms的访问量之和也可能大于200,为了解决这个问题,可以将窗口再拆分成250ms跨度的。所以,滑动窗口随着拆分粒度越小,准确度越高,但是单来的计算消耗,空间占用也会越大,sentinel 秒级滑动窗口默认是500ms的跨度。
image.png

随着时间的推移,如果不采用一些手段,每500ms,就要新建一个时间窗口,我们发现时间窗口会越来越多,内存占用会越来越大,而一段时间前的滑动窗口可能已经没用了。此时有两种方案,一是新建窗口 + 回收老窗口;二是直接进行窗口复用,后者更容易实现且后者的好处更多。sentinel 使用了窗口复用机制。


image.png
由于我们要做到窗口复用,那么每个时间窗口就需要记录时间窗口时间,不然,由于窗口覆盖机制,我们不知道窗口具体统计的时哪一个时段。来看下 sentinel 滑动时间窗口的最终设计。

sentinel 实现滑动窗口

image.png

计数:获取当前的 WindowWrapper,然后取出其中的 MetricBucket 进行计数。
取值:计算一个 QPS 时,需要获取整个秒级滑动窗口内的所有 WindowWrapper,然后累加器 MetricBucket 的相关统计数据即可。

这里只讲最核心的部分,sentinel 的滑动窗口还做了一些其他事情,比如“预占用”。假设当前时间窗口的名额已满,但是当前这个请求很重要且可以支持一定的等待时间,其会预占用下一时间窗口的名额,然后其线程wait到下一个时间窗口后开始运行。

计数器 MetricBucket

/**
 * Represents metrics data in a period of time span.
 * 统计一段时间内(采样周期)的指标数据
 *
 * @author jialiang.linjl
 * @author Eric Zhao
 */
public class MetricBucket {

    /**
     * 存储各项指标计数
     * 指标类型:MetricEvent.values()
     */
    private final LongAdder[] counters;

    /**
     * 记录最小耗时
     */
    private volatile long minRt;

    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }

    public MetricBucket reset(MetricBucket bucket) {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
            counters[event.ordinal()].add(bucket.get(event));
        }
        initMinRt();
        return this;
    }

    private void initMinRt() {
        this.minRt = SentinelConfig.statisticMaxRt();
    }

    /**
     * Reset the adders.
     *
     * @return new metric bucket in initial state
     */
    public MetricBucket reset() {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
        }
        initMinRt();
        return this;
    }

    public long get(MetricEvent event) {
        return counters[event.ordinal()].sum();
    }

    public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
    }

    public long pass() {
        return get(MetricEvent.PASS);
    }

    public long block() {
        return get(MetricEvent.BLOCK);
    }

    ...

    public long minRt() {
        return minRt;
    }

    public void addPass(int n) {
        add(MetricEvent.PASS, n);
    }

    public void addBlock(int n) {
        add(MetricEvent.BLOCK, n);
    }

    public void addRT(long rt) {
        add(MetricEvent.RT, rt);

        // Not thread-safe, but it's okay.
        if (rt < minRt) {
            minRt = rt;
        }
    }

这里统计数据用的是 LongAdder,在高并发下其相较于 AtomicInteger 和 AtomicLong 具有更高的吞吐,更高的性能。

LongAdder longAdder = new LongAdder();
longAdder.increment(); // 等价于 longAdder.add(1)
longAdder.add(1);
longAdder.add(1);
System.out.println(longAdder.sum());
image.png

时间窗口 WindowWrapper

public class WindowWrap<T> {

    /**
     * Time length of a single window bucket in milliseconds.
     * 时间窗口长度
     */
    private final long windowLengthInMs;

    /**
     * Start timestamp of the window in milliseconds.
     * 当前时间窗口的开始时间
     */
    private long windowStart;

    /**
     * Statistic data.
     * 统计数据:即 MetricBucket
     */
    private T value;

    /**
     * @param windowLengthInMs a single window bucket's time length in milliseconds.
     * @param windowStart      the start timestamp of the window
     * @param value            statistic data
     */
    public WindowWrap(long windowLengthInMs, long windowStart, T value) {
        this.windowLengthInMs = windowLengthInMs;
        this.windowStart = windowStart;
        this.value = value;
    }

    public long windowLength() {
        return windowLengthInMs;
    }

    public long windowStart() {
        return windowStart;
    }

    public T value() {
        return value;
    }

    public void setValue(T value) {
        this.value = value;
    }

    /**
     * Reset start timestamp of current bucket to provided time.
     *
     * @param startTime valid start timestamp
     * @return bucket after reset
     */
    public WindowWrap<T> resetTo(long startTime) {
        this.windowStart = startTime;
        return this;
    }

    /**
     * Check whether given timestamp is in current bucket.
     * 传入的时间是否在当前的时间窗口内
     *
     * @param timeMillis valid timestamp in ms
     * @return true if the given time is in current bucket, otherwise false
     * @since 1.5.0
     */
    public boolean isTimeInWindow(long timeMillis) {
        return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
    }
}

滑动窗口 LeapArray

public abstract class LeapArray<T> {
    /**
     * 每个窗口 windowWrapper(metricBucket) 的大小
     * 秒级:500ms
     * 分钟级:1000ms
     */
    protected int windowLengthInMs;
    /**
     * 滑动窗口的窗口个数
     * 秒级:默认为2
     * 分钟级:默认为60
     */
    protected int sampleCount;
    /**
     * 滑动窗口的时间跨度,
     * 秒级:1000ms
     * 分钟级:60*1000ms
     */
    protected int intervalInMs;
    /**
     * 滑动窗口的时间跨度,
     * 秒级:1s
     * 分钟级:60s
     */
    private double intervalInSecond;

    /**
     * 滑动时间窗口
     */
    protected final AtomicReferenceArray<WindowWrap<T>> array;

    /**
     * The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
     *
     * @param sampleCount  bucket count of the sliding window
     * @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
     */
    public LeapArray(int sampleCount, int intervalInMs) {
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = intervalInMs / 1000.0;
        this.sampleCount = sampleCount;

        this.array = new AtomicReferenceArray<>(sampleCount);
    }

    /**
     * Get the bucket at current timestamp.
     *
     * @return the bucket at current timestamp
     */
    public WindowWrap<T> currentWindow() {
        return currentWindow(TimeUtil.currentTimeMillis());
    }

    /**
     * Create a new statistic value for bucket.
     *
     * @param timeMillis current time in milliseconds
     * @return the new empty bucket
     */
    public abstract T newEmptyBucket(long timeMillis);

    /**
     * Reset given bucket to provided start time and reset the value.
     *
     * @param startTime  the start time of the bucket in milliseconds
     * @param windowWrap current bucket
     * @return new clean bucket at given start time
     */
    protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime);

    /**
     * 计算当前时间在滑动窗口数组中的索引位置
     * @param timeMillis
     * @return
     */
    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
        long timeId = timeMillis / windowLengthInMs;
        // Calculate current index so we can map the timestamp to the leap array.
        return (int)(timeId % array.length());
    }

    /**
     * 计算某一个 windowWrapper 的开始时间
     * @param timeMillis
     * @return
     */
    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
        return timeMillis - timeMillis % windowLengthInMs;
    }

    /**
     * Get bucket item at provided timestamp.
     *
     * @param timeMillis a valid timestamp in milliseconds
     * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
     */
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        // 计算给定时间在滑动窗口数组中的索引位置
        int idx = calculateTimeIdx(timeMillis);
        // Calculate current bucket start time.
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * Get bucket item at given time from the array.
         * 从滑动时间窗口中根据给定时间获取 时间窗口
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array. 如果 Bucket 不存在,则新建,并且放置到滑动时间窗口中
         * (2) Bucket is up-to-date, then just return the bucket. 如果查询到的 Bucket 就是当前时间节点的 Bucket,则直接返回
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets. 如果获取到的 Bucket 已经过期了,则重置 Bucket,用于新的统计计算
         */
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    return window;
                } else {
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                return old;
            } else if (windowStart > old.windowStart()) {
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读