浅析HystrixRollingNumber(用于qps计数的数

2016-11-29  本文已影响2284人  LNAmp

前言

考虑到一种需求场景,我们需要统计系统qps、每秒平均错误率等。qps表示每秒的请求数目,能想到的最简单的方法就是统计一定时间内的请求总数然后除以总统计时间,所以计数是其中最核心的部分。通常我们的额系统是工作在多线程的环境下,所以计数我们可以考虑使用AtomicInteger/AtomicLong系列,AtomXXX中没有使用锁,使用的是循环+CAS,在多线程的条件下可以在一定程度上减少锁带来的性能损失。但是在竞争特别激烈的情况,会大量出现cas不成功的情况带来性能上的开销。为了更进一步分散线程写的压力,JDK8中引入了LongAdder,前面的博客中介绍了LongAdder,LongAdder会分成多个桶,将每个线程绑定到固定的桶空间中进行读写,计数可以对所有的桶中的值求总数。前面提到求qps最简单的方法就是统计一定时间内的请求总数然后除以总统计时间,这样的方法虽然简单但是对有一定的问题,比如说统计出的qps跳跃性会比较大,不够平滑等。在本文中将介绍HystrixRollingNumber,这个数据结构在统计qps等类似的求和统计的场景下非常有用。

基本原理

如前所说,HystrixRollingNumber中利用了LongAdder,也借鉴了LongAdder分段的思想。HystrixRollingNumber基本思想就是分段统计,比如说要统计qps,即1秒内的请求总数。如下图所示,我们可以将1s的时间分成10段(图中话的是8段,当成10段好了),每段100ms。在第一个100ms内,写入第一个段中进行计数,在第二个100ms内,写入第二个段中进行计数,这样如果要统计当前时间的qps,我们总是可以通过统计当前时间前1s(共10段)的计数总和值。让我们来看看HystrixRollingNumber中具体是怎么做的。

Bucket

HystrixRollingNumber中对Bucket的描述是“Counters for a given 'bucket' of time”,即“给定时间桶内的计数器”,也即是我们上面所说的“段”。Bucket中有三个重要的属性值

ListState

HystrixRollingNumber中对ListState的描述是“Immutable object that is atomically set every time the state of the BucketCircularArray changes,This handles the compound operations”,即“ListState是个不可变类,每次BucketCircularArray状态改变的时候,会新建一个并且会原子地设置到BucketCircularArray中,它用来处理复合操作”。ListState中比较重要的的属性值介绍如下:

ListState中有几个比较重要的方法

ListState是个不可变类,遵循者不可变类的原则

ListState算是个助手类,维持了一个Bucket数组,定义了一些围绕着Bucket数组的有用操作,并且自身是个不可变类,天然的线程安全属性。

BucketCircularArray

从名字上来说是一个环形数组,数组中的每个元素是一个Bucket。BucketCircularArray中比较重要的属性值介绍如下:

其中主要的比较重要的一个方法是:

HystrixRollingNumber

官方doc中给其的定义是“A number which can be used to track counters (increment) or set values over time.”,用来统计一段时间内的计数。其中比较重要的的属性值如下:

Bucket getCurrentBucket() {
                // 获取当前的毫秒时间
        long currentTime = time.getCurrentTimeInMillis();

        //获取最后一个Bucket(即最新一个Bucket)
        Bucket currentBucket = buckets.peekLast();
        if (currentBucket != null && currentTime < currentBucket.windowStart + this.bucketSizeInMillseconds) {
            //如果当前时间是在currentBucket对应的时间窗口内,直接返回currentBucket
            return currentBucket;
        }

        /* if we didn't find the current bucket above, then we have to create one */

            //如果当前时间对应的Bucket不存在,我们需要创建一个
        if (newBucketLock.tryLock()) {
                //尝试获取一次锁
            try {
                if (buckets.peekLast() == null) {
                    // the list is empty so create the first bucket
                    //首次创建
                    Bucket newBucket = new Bucket(currentTime);
                    buckets.addLast(newBucket);
                    return newBucket;
                } else {
                    // We go into a loop so that it will create as many buckets as needed to catch up to the current time
                    // as we want the buckets complete even if we don't have transactions during a period of time.
                    // 将创建一个或者多个Bucket,直到Bucket代表的时间窗口赶上当前时间
                    for (int i = 0; i < numberOfBuckets; i++) {
                        // we have at least 1 bucket so retrieve it
                        Bucket lastBucket = buckets.peekLast();
                        if (currentTime < lastBucket.windowStart + this.bucketSizeInMillseconds) {
                            // if we're within the bucket 'window of time' return the current one
                            // NOTE: We do not worry if we are BEFORE the window in a weird case of where thread scheduling causes that to occur,
                            // we'll just use the latest as long as we're not AFTER the window
                            return lastBucket;
                        } else if (currentTime - (lastBucket.windowStart + this.bucketSizeInMillseconds) > timeInMilliseconds) {
                            // the time passed is greater than the entire rolling counter so we want to clear it all and start from scratch
                            reset();
                            // recursively call getCurrentBucket which will create a new bucket and return it
                            return getCurrentBucket();
                        } else { // we're past the window so we need to create a new bucket
                            // create a new bucket and add it as the new 'last'
                            buckets.addLast(new Bucket(lastBucket.windowStart + this.bucketSizeInMillseconds));
                            // add the lastBucket values to the cumulativeSum
                            cumulativeSum.addBucket(lastBucket);
                        }
                    }
                    // we have finished the for-loop and created all of the buckets, so return the lastBucket now
                    return buckets.peekLast();
                }
            } finally {
                //释放锁
                newBucketLock.unlock();
            }
        } else {
            //如果获取不到锁,尝试获取最新一个Bucket
            currentBucket = buckets.peekLast();
            if (currentBucket != null) {
                 //如果不为null,直接返回最新Bucket
                // we didn't get the lock so just return the latest bucket while another thread creates the next one
                return currentBucket;
            } else {
                //多个线程同时创建第一个Bucket,尝试等待,递归调用getCurrentBucket
                // the rare scenario where multiple threads raced to create the very first bucket
                // wait slightly and then use recursion while the other thread finishes creating a bucket
                try {
                    Thread.sleep(5);
                } catch (Exception e) {
                    // ignore
                }
                return getCurrentBucket();
            }
        }
    }

其实HystrixRollingNumber中写了很多有用的注释,解释了为什么要这么做。上述getCurrentBucket主要是为了获取当前时间窗所对应的Bucket,但是为了减少竞争,其中只使用了tryLock(),如果不成功则直接返回最新的一个不为空的Bucket。如果获取了锁则尝试增加Bucket(增加Bucket会一直增加到Bucket对应的时间窗口覆盖当前时间)。这样处理会有个小问题,就是获取的Bucket可能没有覆盖当前时间,这是为了减少竞争,提高效率。而且在统计的场景下可以容忍,将计数统计到之前的时间窗口内在计算qps等数值时通常不会有太大影响(numberOfBuckets通常不止一个)。

总结

HystrixRollingNumber这个数据结构用于统计qps很有用,通常这种统计需求(限流监控统计qps的场景下)不能影响主要业务,对性能要求比较高,HystrixRollingNumber中采取了很多技巧避免使用锁,避免多个线程竞争,所以HystrixRollingNumber效率会非常高。

上一篇下一篇

猜你喜欢

热点阅读