Sentinel源码分析----滑动窗口

2019-03-05  本文已影响0人  _六道木

之前说过Sentinel内部统计使用Node来进行处理,而Node底层是使用滑动窗口实现的,这篇文章主要分析一下滑动窗口的实现。

StatisticNode#addPassRequest方法为入口,先看看内部实现:

    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000);
    @Override
    public void addPassRequest(int count) {
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }

addPassRequest方法内部调用了rollingCounterInSecond.addPassrollingCounterInMinute.addPass,看了下两个变量的声明,可以看到类型都是ArrayMetric,只不过参数不一样,而从变量名来看,一个是统计一分钟的数据,一个是统计一秒钟的数据
那么看看ArrayMetric是什么东西,构造方法和成员变量如下

    private final MetricsLeapArray data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new MetricsLeapArray(sampleCount, intervalInMs);
    }

内部只有一个MetricsLeapArray类型,似乎又将统计的任务委托给了MetricsLeapArray,抽两个方法看看其实现:

    public void addPass(int count) {
        // 获取当前窗口
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        // 将数据统计在当前窗口中
        wrap.value().addPass(count);
    }
    public long pass() {
        data.currentWindow();
        long pass = 0;
        // 获取所有窗口
        List<MetricBucket> list = data.values();
        // 所有窗口的值累加返回
        for (MetricBucket window : list) {
            pass += window.pass();
        }
        return pass;
    }

的确是核心实现是在MetricsLeapArray中,这里只做一层转发、汇总

接着继续看下MetricsLeapArray实现

public class MetricsLeapArray extends LeapArray<MetricBucket> {

    public MetricsLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
    }

    @Override
    public MetricBucket newEmptyBucket() {
        return new MetricBucket();
    }

    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
        // Update the start time and reset value.
        w.resetTo(startTime);
        w.value().reset();
        return w;
    }
}

MetricsLeapArray貌似也只是重写了部分方法,核心逻辑在父类LeapArray中,父类构造方法如下:

    public LeapArray(int sampleCount, int intervalInMs) {
        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.sampleCount = sampleCount;

        this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount);
    }

注意:后续没有特殊说明,时间窗口都指小窗口

获取当前时间窗口

那么从MetricsLeapArray#addPass方法开始分析,方法内部首先调用currentWindow方法获取了当前时间窗口,实现如下:

    public WindowWrap<T> currentWindow() {
        return currentWindow(TimeUtil.currentTimeMillis());
    }
    // timeMillis表示当前时间
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        // 通过当前时间计算下标,即通过当前时间判断属于哪个窗口
        // 范围:0~sampleCount-1
        int idx = calculateTimeIdx(timeMillis);
        // 计算当前窗口的开始时间
        long windowStart = calculateWindowStart(timeMillis);

        while (true) {
            // 获取当前时间对应的窗口的位置
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                // 如果为空,则表示还未进行初始化,创建一个窗口对象放入执行位置
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket());
                // CAS更新
                if (array.compareAndSet(idx, null, window)) {
                    return window;
                } else {
                    // 更新失败,代表有别的线程在处理,让出CPU时间重新执行循环流程
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                // 如果对应时间窗口的开始时间与计算得到的开始时间一样
                // 那么代表当前即是我们要找的窗口对象,直接返回
                return old;
            } else if (windowStart > old.windowStart()) {
                // 如果数组中的小窗口对象的开始时间比计算得到的小
                // 代表当前时间窗口已经落后了,需要更新
                if (updateLock.tryLock()) {
                    try { 
                        // 更新当前时间窗口
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // 如果数组中的窗口对象的开始时间比计算得到的大
                // 可能机器的时间不对,例如时钟回拨,属于异常情况,可以无视
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket());
            }
        }
    }

注意:

计算下标

    protected int calculateTimeIdx(/*@Valid*/ long timeMillis) {
        long timeId = timeMillis / windowLengthInMs;
        return (int)(timeId % array.length());
    }

这个比较简单,举个栗子就能明白了,以rollingCounterInSecond为例,该方法得到的值如下:

当前敲动键盘的时间=1551700999644
timeMillis / windowLengthInMs=1551700999644/500=3103401999
下标=(int)(timeId % array.length())=1

由于数组长度为2,那么最后取余后,得到的结果只能是0或者1,这里得到的是1

计算当前时间窗口的开始时间

    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
        return timeMillis - timeMillis % windowLengthInMs;
    }

这个也比较简单,画个图就明白了


image.png

timeMillis表示当前时间739,从图上来看他属于1号窗口,而怎么计算当前窗口的开始时间呢,那么就是
739-239=739-739%500=500

图示窗口变化流程

获取当前窗口的代码上面已经详细的注释了,但是这个东西不画图是比较难理解的,而作为一名"灵魂画手"不画图怎么行

窗口初始状态如下:

image.png
虚线表示当前两个窗口,即数组是空的,这时候执行第一个分支流程即if (old == null)之后,得到的窗口如下(实现表示窗口已经有值),假设当前时间求得的下标为0,那么0这个窗口会初始化
image.png
这时候,窗口0已经有值,窗口开始时间为0,假设又有请求过来,当前时间为750,计算得到的下标为1,由于1这个位置未初始化,那么进行初始化后,时间窗口如下:
image.png
窗口开始时间为500

再过了一段时间,当前时间时间变成了900,求得的下标仍然为1,执行计算窗口开始时间=900-900%500=500,再取出old,old的开始时间为500,那么此时执行else if (windowStart == old.windowStart())流程,两者时间相等,old就是要找的窗口
.

image.png

光阴似箭,日月如梭,不知不觉中当前时间到了1200,理论上当前时间窗口应该如下:

image.png
时间为1200应该走到第3个窗口中(下标为2),但是由于一秒的大窗口只有两个元素,那么只能求余得到0这个位置,但是0这个位置已经有元素了,即old,那么怎么处理呢?
这种情况就会走到else if (windowStart > old.windowStart())这个逻辑,由于时间1200计算得到的开始时间为1000>大于old的开始时间0,所以这个old窗口是要被废弃的,然后调用resetWindowTo更新old窗口的开始时间以及重置窗口的统计数据,更新过后,窗口实际如下
image.png
可以看到,0号下标这个窗口的开始时间已经变成1000了,而1号窗口还是老样子不变,因为时间还未到来,假设当前时间到达1500~2000之间,假设就1700吧,那么计算得到的下标为1,执行与上述一样的流程,更新old的时间

上面就是整个窗口的变化流程,我在学习当中,遇到个问题,假设在1000~1500这段时间内没有流量进来,到了1700的时候才有流量,那么窗口状态就与上一个图不一样,此时窗口如下:


image.png

时间1700到来的时候,计算得到的开始时间为1500,且下标为1,那么就会将下标1的窗口更新为如图所示,而由于1000~1500这段时间没有流量,导致0号下标窗口没有被更新,开始时间以及0号时间窗口的时间还是旧的,那么我们获取最近一秒的统计的时候就会有问题,因为0号窗口的数据是上一秒的,那么Sentinel是如何解决这个问题的呢?

这个需要看下获取统计的地方处理

获取当前窗口的统计数据

ArrayMetric#pass方法为例看看获取统计的地方

    public long pass() {
        data.currentWindow();
        long pass = 0;
        List<MetricBucket> list = data.values();

        for (MetricBucket window : list) {
            pass += window.pass();
        }
        return pass;
    }

通过MetricsLeapArray获取所有窗口对象里保存数据的对象,然后进行累加,按上面的说法,values方法可能会把0号窗口即0~500这个上一秒的窗口数据给拿出来,导致计算错误,那么values是不是存在这个问题呢?我们进去看看

    public List<T> values() {
        int size = array.length();
        List<T> result = new ArrayList<T>(size);

        for (int i = 0; i < size; i++) {
            WindowWrap<T> windowWrap = array.get(i);
            if (windowWrap == null || isWindowDeprecated(windowWrap)) {
                continue;
            }
            result.add(windowWrap.value());
        }
        return result;
    }

首先遍历数组拿出所有窗口对象,这样一看似乎会出现上述问题,但是看到这个isWindowDeprecated方法似乎做了什么

    protected boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) {
        return TimeUtil.currentTimeMillis() - windowWrap.windowStart() >= intervalInMs;
    }

看到这里,恍然大悟了,即使0~500的窗口还存在于数组当中,但是该窗口与当前时间相差的距离已经大于一秒了(前面的分析是以一秒的时间大窗口来举例的),所以,即使该窗口存在于数组之中,在获取统计的时候还是会被过滤掉,这样统计就正常了

WindowWrap

窗口中放的就是WindowWrap对象,其中有个value字段,该字段类型有不同实现,普通的统计则这里的对象类型为MetricBucket类型,看看其内部实现

public class MetricBucket {

    private final LongAdder[] counters;

    private volatile long minRt;

    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }

    private void initMinRt() {
        this.minRt = Constants.TIME_DROP_VALVE;
    }

    public MetricBucket reset() {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
        }
        initMinRt();
        return this;
    }

    public long get(MetricEvent event) {
        return counters[event.ordinal()].sum();
    }

    public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
    }

    public long pass() {
        return get(MetricEvent.PASS);
    }

    public long block() {
        return get(MetricEvent.BLOCK);
    }

    public long exception() {
        return get(MetricEvent.EXCEPTION);
    }

    public long rt() {
        return get(MetricEvent.RT);
    }

    public long minRt() {
        return minRt;
    }

    public long success() {
        return get(MetricEvent.SUCCESS);
    }

    public void addPass(int n) {
        add(MetricEvent.PASS, n);
    }

    public void addException(int n) {
        add(MetricEvent.EXCEPTION, n);
    }

    public void addBlock(int n) {
        add(MetricEvent.BLOCK, n);
    }

    public void addSuccess(int n) {
        add(MetricEvent.SUCCESS, n);
    }

    public void addRT(long rt) {
        add(MetricEvent.RT, rt);
        if (rt < minRt) {
            minRt = rt;
        }
    }
}

一个MetricBucket中统计了多个维度的信息,底层数据结构为一个LongAdder数组,每个位置存放的类型由枚举类MetricEvent中每个枚举的ordinal()方法所决定,外部对某个维度进行统计,到底层会转换成对数组中某个元素的递增操作。

注意点:

总结:

整个滑动窗口的大概原理就是上面的内容了,限于表达能力以及画图能力,可能不能很好的表达出整个流程,如上描述有问题或者哪里分析得不够明白的欢迎评论或者私信

上一篇下一篇

猜你喜欢

热点阅读