Sentinel之OccupiableBucketLeapArr
在前面有一篇文章中讲解了有关实时数据统计的文章,在分钟级别时是利用BucketLeapArray
来实现的,但是在秒级会有一定的问题。
设想一个场景,我们的一个资源,访问的QPS稳定是10.假设请求是均匀分布的,在相对的时间0.0-1.0秒区间中,通过了10个请求,我们在1.1秒的时候,观察到的Qps可能只有5,因为此时第一个时间窗口被重置了,只要第二个时间窗口有值。当在秒级统计的情形下,用BucketLeapArray
会有0~50%的数据误差。这是就要用OccupiableBucketLeapArray
来解决。
还是以StatisticSlot
的entry()
方法中的增加请求数node.addPassRequest(count);
为例介绍。
@Override
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
public class StatisticNode implements Node {
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
在讲解具体的统计之前,先介绍下rollingCounterInSecond
变量是怎么创建的。
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); @(1)
}
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
从上面我们可以看出z在秒级统计rollingCounterInSecond
中在初始化实例的方法是有两个参数,所以它会执行第一个构造参数@(1)。从下面可以看到,原先的只有array
变量数组,它有多了borrowArray
变量。
public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {
private final FutureBucketLeapArray borrowArray;
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
// This class is the original "CombinedBucketArray".
super(sampleCount, intervalInMs);
this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
}
super(sampleCount, intervalInMs)
会调用下面的构造方法。
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
实时数据统计调用
到这里,还是和之前的一样,没有发生变化。这个data数组和之前的是一样的。
@Override
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
TIM截图1 (2).png
从上图中可以看出,大体的逻辑是一样的,只是有几个方法被重写了。 在这里做一下对比。
newEmptyBucket方法
BucketLeapArray类
@Override
public MetricBucket newEmptyBucket(long time) {
return new MetricBucket();
}
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
OccupiableBucketLeapArray类
@Override time变量是当前时间
public MetricBucket newEmptyBucket(long time) {
//在BucketLeapArray中初始化了MetricBucket就结束了
MetricBucket newBucket = new MetricBucket();
//获取在borrowArray时间窗口数组中的时间窗口
MetricBucket borrowBucket = borrowArray.getWindowValue(time);
//如果不为空,将borrowArray中对用时间窗口的信息添加到data数组中时间窗口中。
if (borrowBucket != null) {
newBucket.reset(borrowBucket);
}
return newBucket;
}
public T getWindowValue(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
WindowWrap<T> bucket = array.get(idx);
if (bucket == null || !bucket.isTimeInWindow(timeMillis)) {
return null;
}
return bucket.value();
}
public MetricBucket reset(MetricBucket bucket) {
for (MetricEvent event : MetricEvent.values()) {
counters[event.ordinal()].reset();
counters[event.ordinal()].add(bucket.get(event));
}
initMinRt();
return this;
}
resetWindowTo方法
BucketLeapArray类
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// 重设该窗口的开始时间
w.resetTo(startTime);
//实际就是将统计的数据重置为0
w.value().reset();
return w;
}
public MetricBucket reset() {
for (MetricEvent event : MetricEvent.values()) {
counters[event.ordinal()].reset();
}
initMinRt();
return this;
}
public void reset() {
internalReset(0L);
}
OccupiableBucketLeapArray类
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
// 这个和上面类的一样,将该窗口的开始时间重设
w.resetTo(time);
//获取在borrowArray时间窗口数组当前时间的时间窗口
MetricBucket borrowBucket = borrowArray.getWindowValue(time);
if (borrowBucket != null) {
//和上面一样,实际就是将统计的数据重置为0
w.value().reset();
//然后将在borrowArray中当前时间对用的时间窗口中统计的通请求通过数添加到在array数组中的当前时间的时间窗口中
w.value().addPass((int)borrowBucket.pass());
} else {
w.value().reset();
}
return w;
}
borrowArray添加数据
前面只讲了如何从borrowArray
中提取数据放到array
数组里,还没有说borrowArray
中的数据是如何产生的?
添加数据时会调用该方法:
@Override
public void addWaiting(long time, int acquireCount) {
WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);
window.value().add(MetricEvent.PASS, acquireCount);
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
因为borrowArray
是FutureBucketLeapArray
类型, 这两个方法重写和BucketLeapArray
一致。
@Override
public MetricBucket newEmptyBucket(long time) {
return new MetricBucket();
}
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
w.value().reset();
return w;
}
只是判断该窗口是否过期发生了变化。
BucketLeapArray类
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}
FutureBucketLeapArray类
@Override
public boolean isWindowDeprecated(long time, WindowWrap<MetricBucket> windowWrap) {
// Tricky: will only calculate for future.
return time >= windowWrap.windowStart();
}
那addWaiting()
在什么时候被调用呢?
只有在限流操作中执行DefaultController
中才会被调用。
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
//添加到borrowArray中未来一个时间窗口上
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
//将抢占的未来的令牌也添加到原来data中的当前时间窗口里
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
整体就分析到这里了,实际后面的一些地方还是有些不明白,欢迎大家交流。