服务监控和治理

Sentinel之OccupiableBucketLeapArr

2020-04-23  本文已影响0人  九点半的马拉

在前面有一篇文章中讲解了有关实时数据统计的文章,在分钟级别时是利用BucketLeapArray来实现的,但是在秒级会有一定的问题。

设想一个场景,我们的一个资源,访问的QPS稳定是10.假设请求是均匀分布的,在相对的时间0.0-1.0秒区间中,通过了10个请求,我们在1.1秒的时候,观察到的Qps可能只有5,因为此时第一个时间窗口被重置了,只要第二个时间窗口有值。当在秒级统计的情形下,用BucketLeapArray会有0~50%的数据误差。这是就要用OccupiableBucketLeapArray来解决。

还是以StatisticSlotentry()方法中的增加请求数node.addPassRequest(count);为例介绍。

@Override
public void addPassRequest(int count) {
        super.addPassRequest(count);
        this.clusterNode.addPassRequest(count);
}
public class StatisticNode implements Node {
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

@Override
public void addPassRequest(int count) {
       rollingCounterInSecond.addPass(count);
       rollingCounterInMinute.addPass(count);
}

在讲解具体的统计之前,先介绍下rollingCounterInSecond变量是怎么创建的。

public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;
    
    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);     @(1)
    }

    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }

从上面我们可以看出z在秒级统计rollingCounterInSecond中在初始化实例的方法是有两个参数,所以它会执行第一个构造参数@(1)。从下面可以看到,原先的只有array变量数组,它有多了borrowArray 变量。

public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {

    private final FutureBucketLeapArray borrowArray;
    public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
        // This class is the original "CombinedBucketArray".
        super(sampleCount, intervalInMs);
        this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
    }

super(sampleCount, intervalInMs)会调用下面的构造方法。

public LeapArray(int sampleCount, int intervalInMs) {
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.sampleCount = sampleCount;

        this.array = new AtomicReferenceArray<>(sampleCount);
    }

实时数据统计调用

到这里,还是和之前的一样,没有发生变化。这个data数组和之前的是一样的。

@Override
public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addPass(count);
}
public WindowWrap<T> currentWindow() {
        return currentWindow(TimeUtil.currentTimeMillis());
}
TIM截图1 (2).png

从上图中可以看出,大体的逻辑是一样的,只是有几个方法被重写了。 在这里做一下对比。

newEmptyBucket方法

BucketLeapArray类

@Override
public MetricBucket newEmptyBucket(long time) {
        return new MetricBucket();
}

public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
}

OccupiableBucketLeapArray类

@Override    time变量是当前时间
public MetricBucket newEmptyBucket(long time) {
        //在BucketLeapArray中初始化了MetricBucket就结束了
        MetricBucket newBucket = new MetricBucket();
        //获取在borrowArray时间窗口数组中的时间窗口
        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        //如果不为空,将borrowArray中对用时间窗口的信息添加到data数组中时间窗口中。
        if (borrowBucket != null) {
            newBucket.reset(borrowBucket);
        }
        return newBucket;
}
 public T getWindowValue(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        int idx = calculateTimeIdx(timeMillis);
        WindowWrap<T> bucket = array.get(idx);
        if (bucket == null || !bucket.isTimeInWindow(timeMillis)) {
            return null;
        }
        return bucket.value();
}

public MetricBucket reset(MetricBucket bucket) {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
            counters[event.ordinal()].add(bucket.get(event));
        }
        initMinRt();
        return this;
}

resetWindowTo方法

BucketLeapArray类

protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
        // 重设该窗口的开始时间
        w.resetTo(startTime);
        //实际就是将统计的数据重置为0
        w.value().reset();
        return w;
}

public MetricBucket reset() {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
        }
        initMinRt();
        return this;
}

public void reset() {
        internalReset(0L);
}

OccupiableBucketLeapArray类

@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
        // 这个和上面类的一样,将该窗口的开始时间重设
        w.resetTo(time);
        //获取在borrowArray时间窗口数组当前时间的时间窗口
        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
           //和上面一样,实际就是将统计的数据重置为0
            w.value().reset();
            //然后将在borrowArray中当前时间对用的时间窗口中统计的通请求通过数添加到在array数组中的当前时间的时间窗口中
            w.value().addPass((int)borrowBucket.pass());
        } else {
            w.value().reset();
        }
        return w;
}

borrowArray添加数据

前面只讲了如何从borrowArray中提取数据放到array数组里,还没有说borrowArray中的数据是如何产生的?

添加数据时会调用该方法:

@Override
public void addWaiting(long time, int acquireCount) {
        WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);
        window.value().add(MetricEvent.PASS, acquireCount);
}

public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
}

因为borrowArrayFutureBucketLeapArray类型, 这两个方法重写和BucketLeapArray一致。

@Override
public MetricBucket newEmptyBucket(long time) {
        return new MetricBucket();
}

@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
        // Update the start time and reset value.
        w.resetTo(startTime);
        w.value().reset();
        return w;
}

只是判断该窗口是否过期发生了变化。

BucketLeapArray类

public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
        return time - windowWrap.windowStart() > intervalInMs;
}

FutureBucketLeapArray类

@Override
public boolean isWindowDeprecated(long time, WindowWrap<MetricBucket> windowWrap) {
        // Tricky: will only calculate for future.
        return time >= windowWrap.windowStart();
}

addWaiting()在什么时候被调用呢?
只有在限流操作中执行DefaultController中才会被调用。

public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    //添加到borrowArray中未来一个时间窗口上
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                   //将抢占的未来的令牌也添加到原来data中的当前时间窗口里
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
}

整体就分析到这里了,实际后面的一些地方还是有些不明白,欢迎大家交流。

上一篇 下一篇

猜你喜欢

热点阅读