kratos aegis 限流器的使用和实现过程解析

2021-09-27  本文已影响0人  哆啦在这A梦在哪

一。基础知识点说明

Kraots 的限流算法

kratos 借鉴了 Sentinel 项目的自适应限流系统,通过综合分析服务的 cpu 使用率、请求成功的 load(负载), qps 和请求成功的 rt(请求成功的响应耗时) 来做自适应限流保护。从官方文档上看,限流算法要实现的核心目标有如下两点:

  1. ** 自动 ** 嗅探负载和 qps,减少人工配置 && 干预
  2. 削顶, 保证超载时系统不被拖垮,并能以高水位 qps 继续运行

自适应限流怎么做

前面我们遇到的主要问题就是每个服务实例的限流阈值实际应该是动态变化的,我们应该根据系统能够承载的最大吞吐量,来进行限流,当当前的流量大于最大吞吐的时候就限制流量进入,反之则允许通过。那现在的问题就是

计算吞吐量:利特尔法则 L = λ * W

利特尔法则由麻省理工大学斯隆商学院(MIT Sloan School of Management)的教授 John Little﹐于 1961 年所提出与证明。它是一个有关提前期与在制品关系的简单数学公式,这一法则为精益生产的改善方向指明了道路。 —- MBA 智库百科 (mbalib.com)

image.png

如上图所示,如果我们开一个小店,平均每分钟进店 2 个客人(λ),每位客人从等待到完成交易需要 4 分钟(W),那我们店里能承载的客人数量就是 2 * 4 = 8 个人

同理,我们可以将 λ 当做 QPS, W 呢是每个请求需要花费的时间,那我们的系统的吞吐就是 L = λ * W ,所以我们可以使用利特尔法则来计算系统的吞吐量。

什么时候系统的吞吐量就是最大的吞吐量?

首先我们可以通过统计过去一段时间的数据,获取到平均每秒的请求量,也就是 QPS,以及请求的耗时时间,为了避免出现前面 900ms 一个请求都没有最后 100ms 请求特别多的情况,我们可以使用滑动窗口算法来进行统计。

最容易想到的就是我们从系统启动开始,就把这些值给保存下来,然后计算一个吞吐的最大值,用这个来表示我们的最大吞吐量就可以了。但是这样存在一个问题是,我们很多系统其实都不是独占一台机器的,一个物理机上面往往有很多服务,并且一般还存在一些超卖,所以可能第一个小时最大处理能力是 100,但是这台节点上其他服务实例同时都在抢占资源的时候,这个处理能力最多就只能到 80 了

所以我们需要一个数据来做启发阈值,只要这个指标达到了阈值那我们就进入流控当中。常见的选择一般是 CPU、Memory、System Load,这里我们以 CPU 为例

只要我们的 CPU 负载超过 80% 的时候,获取过去 5s 的最大吞吐数据,然后再统计当前系统中的请求数量,只要当前系统中的请求数大于最大吞吐那么我们就丢弃这个请求。

kratos 自适应限流分析

二。实际代码及使用解释

限流公式

源码分析

type BBR struct {
    cpu             cpuGetter   // 请求数,和响应时间的采样数据,使用滑动窗口进行统计    
    passStat        window.RollingCounter
    rtStat          window.RollingCounter
    inFlight        int64
    bucketPerSecond int64
    bucketDuration  time.Duration

    // prevDropTime defines previous start drop since initTime
    prevDropTime atomic.Value
    maxPASSCache atomic.Value
    minRtCache   atomic.Value

    opts options
}

Allow: 判断请求是否允许通过

// Allow checks all inbound traffic.
// Once overload is detected, it raises limit.ErrLimitExceed error.
func (l *BBR) Allow() (ratelimit.DoneFunc, error) {
    if l.shouldDrop() {
        return nil, ratelimit.ErrLimitExceed
    }
    atomic.AddInt64(&l.inFlight, 1)
    start := time.Now().UnixNano()
    return func(ratelimit.DoneInfo) {
        rt := (time.Now().UnixNano() - start) / int64(time.Millisecond)
        l.rtStat.Add(rt)
        atomic.AddInt64(&l.inFlight, -1)
        l.passStat.Add(1)
    }, nil
}

这个方法主要是给中间件使用的

判断请求是否应该被丢弃

func (l *BBR) shouldDrop() bool {
    now := time.Duration(time.Now().UnixNano())
    if l.cpu() < l.opts.CPUThreshold {    //判断是否达到 cpu 的最高压力,cpu()获取当前cpu利用率,获取方式在下面讲解
        // current cpu payload below the threshold
        prevDropTime, _ := l.prevDropTime.Load().(time.Duration)//prevDropTime保存了上一次cpu达到峰值的时间,如果是0,说明还没有到峰值的记录
        if prevDropTime == 0 {
            // haven't start drop,
            // accept current request
            return false
        }
        if time.Duration(now-prevDropTime) <= time.Second {//一秒内
            // just start drop one second ago,
            // check current inflight count
            inFlight := atomic.LoadInt64(&l.inFlight)
            return inFlight > 1 && inFlight > l.maxInFlight()//判断实际的正在处理逻辑数量是否达到最高值
        }
        l.prevDropTime.Store(time.Duration(0))//记录此次达到峰值的时间
        return false
    }
    // current cpu payload exceeds the threshold
    inFlight := atomic.LoadInt64(&l.inFlight)
    drop := inFlight > 1 && inFlight > l.maxInFlight()
    if drop {//如果cpu未达到峰值,但是协程达到了高峰,判断是否有达到过cpu的峰值
        prevDrop, _ := l.prevDropTime.Load().(time.Duration)
        if prevDrop != 0 {//如果达到过cpu最高峰值,拒绝请求
            // already started drop, return directly
            return drop
        }
        // store start drop time
        l.prevDropTime.Store(now)//没有达到过就记录此次时间,当做一次峰值
    }
    return drop
}

cpu模块,aegis/pkg/cpu

其中,cpu 利用率部分使用了linuxcgroup的工具使用以及gopsutil包,主要考虑到了宿主机和 docker 容器的不同。

第一部分,cpu内容的来源

每500ms获取一次,使用了指数加权平均算法的公式 ,地址代码中有。将每次获取的cpu信息保存在全局变量中。

func init() {
    go cpuproc()
}

// cpu = cpuᵗ⁻¹ * decay + cpuᵗ * (1 - decay)
func cpuproc() {
    ticker := time.NewTicker(time.Millisecond * 500) // same to cpu sample rate
    defer func() {
        ticker.Stop()
        if err := recover(); err != nil {
            go cpuproc()
        }
    }()

    // EMA algorithm: https://blog.csdn.net/m0_38106113/article/details/81542863
    for range ticker.C {
        stat := &cpu.Stat{}
        cpu.ReadStat(stat)
        prevCPU := atomic.LoadInt64(&gCPU)
        curCPU := int64(float64(prevCPU)*decay + float64(stat.Usage)*(1.0-decay))
        atomic.StoreInt64(&gCPU, curCPU)
    }
}

上述只是获取,真正的cpu利用率的计算,来源于 pkg/cpu 包中的逻辑。以下代码

const (
    interval time.Duration = time.Millisecond * 500 //每隔500ms收集一次
)
var (
    stats CPU
    usage uint64
)

// CPU is cpu stat usage.
type CPU interface {
    Usage() (u uint64, e error)
    Info() Info
}

func init() {
    var (
        err error
    )
    stats, err = newCgroupCPU()//这里内部使用了linux的cgroup的方法,考虑到docker 容器的资源限制
    if err != nil {
        // fmt.Printf("cgroup cpu init failed(%v),switch to psutil cpu\n", err)
        stats, err = newPsutilCPU(interval)//这里直接使用 `gopsutil` 包获取了cpu的利用率
        if err != nil {
            panic(fmt.Sprintf("cgroup cpu init failed!err:=%v", err))
        }
    }
    go func() {
        ticker := time.NewTicker(interval)
        defer ticker.Stop()
        for {
            <-ticker.C
            u, err := stats.Usage()
            if err == nil && u != 0 {
                atomic.StoreUint64(&usage, u)
            }
        }
    }()
}

注意:cpu 这部分,需要有一定的linux cgroup 的基础知识才能看懂

上述代码中 newCgroupCPU 和 newPsutilCPU 就是对应了两种不同的情况的处理。其中,大量使用了原子包,将获取 cpu 利用率保存在一个全局变变量中(代码跟下去就能看到,太多这里不作截取了)。

回到 shouldDrop 这个方法其实就是开头讲到的限流公式了,逻辑如下图所示

Go可用性 限流-分布式限流-kratos 限流策略.png

系统的最大负载

func (l *BBR) maxInFlight() int64 {
    return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.bucketPerSecond)/1000.0) + 0.5)
}

这个就是计算过去一段时间系统的最大负载是多少

上一篇 下一篇

猜你喜欢

热点阅读