

2021-06-14  本文已影响0人  晴天哥_王志




public class StatisticNode implements Node {
    // 对每秒指标统计
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
    // 每分钟指标统计
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
    private LongAdder curThreadNum = new LongAdder();
    private long lastFetchTime = -1;

    public void addPassRequest(int count) {



public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) { = new OccupiableBucketLeapArray(sampleCount, intervalInMs);

    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
   = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
   = new BucketLeapArray(sampleCount, intervalInMs);


public class BucketLeapArray extends LeapArray<MetricBucket> {
    public BucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);

public abstract class LeapArray<T> {
    protected int windowLengthInMs;
    protected int sampleCount;
    protected int intervalInMs;
    private double intervalInSecond;
    protected final AtomicReferenceArray<WindowWrap<T>> array;
    private final ReentrantLock updateLock = new ReentrantLock();

    public LeapArray(int sampleCount, int intervalInMs) {
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = intervalInMs / 1000.0;
        this.sampleCount = sampleCount;
        this.array = new AtomicReferenceArray<>(sampleCount);


public class WindowWrap<T> {

    private final long windowLengthInMs; // 时间窗口的长度
    private long windowStart; // 时间窗口开始时间
    private T value; // MetricBucket对象,保存各个指标数据

    public WindowWrap(long windowLengthInMs, long windowStart, T value) {
        this.windowLengthInMs = windowLengthInMs;
        this.windowStart = windowStart;
        this.value = value;


public class MetricBucket {

    private final LongAdder[] counters;
    private volatile long minRt;

    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();

public enum MetricEvent {
    PASS, // 正常通过
    BLOCK, // 阻塞
    EXCEPTION, // 异常
    SUCCESS, // 成功
    RT, // RT统计
    OCCUPIED_PASS // 抢占通过


public abstract class LeapArray<T> {

    protected int windowLengthInMs; // 时间窗口的长度
    protected int sampleCount; // 时间窗口的个数
    protected int intervalInMs;
    private double intervalInSecond;
    protected final AtomicReferenceArray<WindowWrap<T>> array;

    public WindowWrap<T> currentWindow() {
        return currentWindow(TimeUtil.currentTimeMillis());

    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        // 根据当前时间和时间窗口的长度进行计算获取窗口下标
        int idx = calculateTimeIdx(timeMillis);
        // 获取指定下标的时间窗口的开始时间
        long windowStart = calculateWindowStart(timeMillis);

         * Get bucket item at given time from the array.
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                // 1.为空表示当前时间窗口为初始化过,创建WindowWrap并cas设置到array中
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
            } else if (windowStart == old.windowStart()) {
                // 2.获取的时间窗口正好对应当前时间,直接返回
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                return old;
            } else if (windowStart > old.windowStart()) {
                // 3.获取的时间窗口为老的,进行窗口reset操作复用
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                } else {
            } else if (windowStart < old.windowStart()) {
                // 4.时间回拨了,正常情况下不会走到这里
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));

    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
        long timeId = timeMillis / windowLengthInMs;
        // Calculate current index so we can map the timestamp to the leap array.
        return (int)(timeId % array.length());

    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
        return timeMillis - timeMillis % windowLengthInMs;

public class BucketLeapArray extends LeapArray<MetricBucket> {

    public BucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);

    public MetricBucket newEmptyBucket(long time) {
        return new MetricBucket();

    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
        // 重置窗口的开始时间和对应的统计值
        return w;


public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) { = new OccupiableBucketLeapArray(sampleCount, intervalInMs);

    public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();

public class MetricBucket {
    private final LongAdder[] counters;

    public void addPass(int n) {
        add(MetricEvent.PASS, n);

    public MetricBucket add(MetricEvent event, long n) {
        return this;
上一篇 下一篇

