Sentinel源码分析----滑动窗口
之前说过Sentinel内部统计使用Node来进行处理,而Node底层是使用滑动窗口实现的,这篇文章主要分析一下滑动窗口的实现。
以StatisticNode#addPassRequest
方法为入口,先看看内部实现:
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000);
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
addPassRequest
方法内部调用了rollingCounterInSecond.addPass
和rollingCounterInMinute.addPass
,看了下两个变量的声明,可以看到类型都是ArrayMetric
,只不过参数不一样,而从变量名来看,一个是统计一分钟的数据,一个是统计一秒钟的数据
那么看看ArrayMetric是什么东西,构造方法和成员变量如下
private final MetricsLeapArray data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new MetricsLeapArray(sampleCount, intervalInMs);
}
内部只有一个MetricsLeapArray
类型,似乎又将统计的任务委托给了MetricsLeapArray
,抽两个方法看看其实现:
public void addPass(int count) {
// 获取当前窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 将数据统计在当前窗口中
wrap.value().addPass(count);
}
public long pass() {
data.currentWindow();
long pass = 0;
// 获取所有窗口
List<MetricBucket> list = data.values();
// 所有窗口的值累加返回
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
的确是核心实现是在MetricsLeapArray
中,这里只做一层转发、汇总
接着继续看下MetricsLeapArray
实现
public class MetricsLeapArray extends LeapArray<MetricBucket> {
public MetricsLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
}
@Override
public MetricBucket newEmptyBucket() {
return new MetricBucket();
}
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
w.value().reset();
return w;
}
}
MetricsLeapArray
貌似也只是重写了部分方法,核心逻辑在父类LeapArray
中,父类构造方法如下:
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount);
}
- intervalInMs:窗口的长度,
rollingCounterInSecond
变量传入的是1000,即该该时间窗口总的跨度为1秒;rollingCounterInMinute
传入的是60000,即该该时间窗口总的跨度为60秒 - sampleCount:样本数量,即当前窗口有多少个小窗口组成,
rollingCounterInSecond
传入的是2,则表示当前一秒钟的时间窗口由两个500毫秒的小窗口组成;rollingCounterInMinute
传入的是60,即表示当前一分钟的时间窗口由60个1000毫秒的小窗口组成 - windowLengthInMs:每个小窗口的时间跨度,
rollingCounterInSecond
时间跨度是500毫秒,rollingCounterInMinute
时间跨度是1000毫秒 - array:存放统计数据的数组,个数与sampleCount相同
注意:后续没有特殊说明,时间窗口都指小窗口
获取当前时间窗口
那么从MetricsLeapArray#addPass
方法开始分析,方法内部首先调用currentWindow
方法获取了当前时间窗口,实现如下:
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
// timeMillis表示当前时间
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 通过当前时间计算下标,即通过当前时间判断属于哪个窗口
// 范围:0~sampleCount-1
int idx = calculateTimeIdx(timeMillis);
// 计算当前窗口的开始时间
long windowStart = calculateWindowStart(timeMillis);
while (true) {
// 获取当前时间对应的窗口的位置
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 如果为空,则表示还未进行初始化,创建一个窗口对象放入执行位置
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket());
// CAS更新
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
// 更新失败,代表有别的线程在处理,让出CPU时间重新执行循环流程
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
// 如果对应时间窗口的开始时间与计算得到的开始时间一样
// 那么代表当前即是我们要找的窗口对象,直接返回
return old;
} else if (windowStart > old.windowStart()) {
// 如果数组中的小窗口对象的开始时间比计算得到的小
// 代表当前时间窗口已经落后了,需要更新
if (updateLock.tryLock()) {
try {
// 更新当前时间窗口
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 如果数组中的窗口对象的开始时间比计算得到的大
// 可能机器的时间不对,例如时钟回拨,属于异常情况,可以无视
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket());
}
}
}
注意:
- WindowWrap为窗口对象,包含了窗口长度、开始时间、保存数据的对象3个值,其中保存数据的对象不同场景有不同的实现,这里为普通的统计场景所以为
MetricBucket
,而统计热点限流的则是ParamMapBucket
等等
计算下标
protected int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
return (int)(timeId % array.length());
}
这个比较简单,举个栗子就能明白了,以rollingCounterInSecond
为例,该方法得到的值如下:
当前敲动键盘的时间=1551700999644
timeMillis / windowLengthInMs=1551700999644/500=3103401999
下标=(int)(timeId % array.length())=1
由于数组长度为2,那么最后取余后,得到的结果只能是0或者1,这里得到的是1
计算当前时间窗口的开始时间
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
这个也比较简单,画个图就明白了
image.png
timeMillis表示当前时间739,从图上来看他属于1号窗口,而怎么计算当前窗口的开始时间呢,那么就是
739-239=739-739%500=500
图示窗口变化流程
获取当前窗口的代码上面已经详细的注释了,但是这个东西不画图是比较难理解的,而作为一名"灵魂画手"不画图怎么行
窗口初始状态如下:
虚线表示当前两个窗口,即数组是空的,这时候执行第一个分支流程即
if (old == null)
之后,得到的窗口如下(实现表示窗口已经有值),假设当前时间求得的下标为0,那么0这个窗口会初始化image.png
这时候,窗口0已经有值,窗口开始时间为0,假设又有请求过来,当前时间为750,计算得到的下标为1,由于1这个位置未初始化,那么进行初始化后,时间窗口如下:
image.png
窗口开始时间为500
再过了一段时间,当前时间时间变成了900,求得的下标仍然为1,执行计算窗口开始时间=900-900%500=500,再取出old,old的开始时间为500,那么此时执行else if (windowStart == old.windowStart())
流程,两者时间相等,old就是要找的窗口
.
光阴似箭,日月如梭,不知不觉中当前时间到了1200,理论上当前时间窗口应该如下:
时间为1200应该走到第3个窗口中(下标为2),但是由于一秒的大窗口只有两个元素,那么只能求余得到0这个位置,但是0这个位置已经有元素了,即old,那么怎么处理呢?
这种情况就会走到
else if (windowStart > old.windowStart())
这个逻辑,由于时间1200计算得到的开始时间为1000>大于old的开始时间0,所以这个old窗口是要被废弃的,然后调用resetWindowTo更新old窗口的开始时间以及重置窗口的统计数据,更新过后,窗口实际如下image.png
可以看到,0号下标这个窗口的开始时间已经变成1000了,而1号窗口还是老样子不变,因为时间还未到来,假设当前时间到达1500~2000之间,假设就1700吧,那么计算得到的下标为1,执行与上述一样的流程,更新old的时间
上面就是整个窗口的变化流程,我在学习当中,遇到个问题,假设在1000~1500这段时间内没有流量进来,到了1700的时候才有流量,那么窗口状态就与上一个图不一样,此时窗口如下:
image.png
时间1700到来的时候,计算得到的开始时间为1500,且下标为1,那么就会将下标1的窗口更新为如图所示,而由于1000~1500这段时间没有流量,导致0号下标窗口没有被更新,开始时间以及0号时间窗口的时间还是旧的,那么我们获取最近一秒的统计的时候就会有问题,因为0号窗口的数据是上一秒的,那么Sentinel是如何解决这个问题的呢?
这个需要看下获取统计的地方处理
获取当前窗口的统计数据
以ArrayMetric#pass
方法为例看看获取统计的地方
public long pass() {
data.currentWindow();
long pass = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
通过MetricsLeapArray
获取所有窗口对象里保存数据的对象,然后进行累加,按上面的说法,values
方法可能会把0号窗口即0~500这个上一秒的窗口数据给拿出来,导致计算错误,那么values
是不是存在这个问题呢?我们进去看看
public List<T> values() {
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
首先遍历数组拿出所有窗口对象,这样一看似乎会出现上述问题,但是看到这个isWindowDeprecated方法似乎做了什么
protected boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) {
return TimeUtil.currentTimeMillis() - windowWrap.windowStart() >= intervalInMs;
}
看到这里,恍然大悟了,即使0~500的窗口还存在于数组当中,但是该窗口与当前时间相差的距离已经大于一秒了(前面的分析是以一秒的时间大窗口来举例的),所以,即使该窗口存在于数组之中,在获取统计的时候还是会被过滤掉,这样统计就正常了
WindowWrap
窗口中放的就是WindowWrap对象,其中有个value字段,该字段类型有不同实现,普通的统计则这里的对象类型为MetricBucket
类型,看看其内部实现
public class MetricBucket {
private final LongAdder[] counters;
private volatile long minRt;
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
private void initMinRt() {
this.minRt = Constants.TIME_DROP_VALVE;
}
public MetricBucket reset() {
for (MetricEvent event : MetricEvent.values()) {
counters[event.ordinal()].reset();
}
initMinRt();
return this;
}
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
public long pass() {
return get(MetricEvent.PASS);
}
public long block() {
return get(MetricEvent.BLOCK);
}
public long exception() {
return get(MetricEvent.EXCEPTION);
}
public long rt() {
return get(MetricEvent.RT);
}
public long minRt() {
return minRt;
}
public long success() {
return get(MetricEvent.SUCCESS);
}
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public void addException(int n) {
add(MetricEvent.EXCEPTION, n);
}
public void addBlock(int n) {
add(MetricEvent.BLOCK, n);
}
public void addSuccess(int n) {
add(MetricEvent.SUCCESS, n);
}
public void addRT(long rt) {
add(MetricEvent.RT, rt);
if (rt < minRt) {
minRt = rt;
}
}
}
一个MetricBucket
中统计了多个维度的信息,底层数据结构为一个LongAdder
数组,每个位置存放的类型由枚举类MetricEvent
中每个枚举的ordinal()
方法所决定,外部对某个维度进行统计,到底层会转换成对数组中某个元素的递增操作。
注意点:
- 这里在旧版本中用的不是这种形式,而是每种统计类型使用了一个变量,新版本中换成了数组,代码就简便了很多
- 这里使用了LongAdder而不是使用AtomicLong,这是一种用空间换时间的做法,因为AtomicLong在并发大的时候性能比较差
总结:
- Sentinel使用了滑动窗口来保存统计数据
- 默认有两个窗口,分别是一秒钟的窗口和一分钟的窗口
- 一秒钟的滑动窗口中有两个小窗口,每个时间跨度为500毫秒
- 一分钟的滑动窗口中有60个小窗口,每个时间跨度为1000毫秒
- 滑动窗口底层是一个数组,数组中每个元素即为一个小窗口
- 通过当前时间获取当前窗口的时候先计算当前时间再数组中的位置下标和窗口的开始时间,然后通过位置下标从数组中获取窗口对象进行判断
- 如果获取的对象为空,则新建一个窗口对象放进去
- 如果获取的对象不为空,则比较开始时间,如果开始时间相等,则表示该窗口就是当前窗口,可以直接返回
- 如果获取的对象不为空且窗口开始时间小于当前时间,表示当前窗口已经过期,更新窗口时间和重置窗口里的统计数据a
- 如果获取的对象不为空且窗口开始时间大于当前时间,这种属于异常情况,窗口一个对象返回,但是不对数组进行修改
- 窗口的数组中可能存在上一个周期的数据,这种情况在获取数据的时候进行过滤
整个滑动窗口的大概原理就是上面的内容了,限于表达能力以及画图能力,可能不能很好的表达出整个流程,如上描述有问题或者哪里分析得不够明白的欢迎评论或者私信