高级专题程序员redis

Guava RateLimiter代码与限流逻辑梳理

2018-10-09  本文已影响0人  ro9er

前言

最近需要在网关层做一个限流的需求,由于需要对一个机房内的集群做统一的限流管理,所以可能需要用到redis,而且spring cloud本身的Hystrix也不行了(因为Hystrix只是针对单机)。因此需要考虑自行实现限流逻辑,所以针对目前比较主流的限流 令牌桶算法及其实现(Guava RateLimiter)做了一个调研。

令牌桶(Token Bucket)

令牌桶算法是网络流量整形(Traffic Shaping)和速率控制(Rate Limiting)中比较常用的一种算法,还有另一种漏桶算法,这里就不多展开了。一般来说,令牌桶算法可以控制网络中单位时间内的请求数目,并在一定程度上允许突发数据的产生。

令牌桶的原理是有一个桶来装令牌,系统会以恒定的(1/QPS)时间间隔往桶里面放入令牌,如果桶已经满了就不能再加了,当一个请求进来的时候,需要到桶里面获取自己请求所需的相应数目的令牌,如果获取成功,则进行请求操作;若不能,则拒绝服务,或者等待直到有足够的令牌为止。


令牌桶原理

RateLimiter概述

Google的Guava库中提供了一个基于令牌桶算法的限流工具类RateLimiter。在该类的子类SmoothRateLimiter中有一大段关于如何设计RateLimiter的描述,这里大致翻译一下:

如何设计一个限流器,并且为什么这么设计?

限流器最主要的功能是保证一个稳定的速率,这里稳定速率指的是通常情况下的最高速率。这个机制通常通过控制流入的请求来保证,比如对于一个请求,当达到最高速率时,我们需要计算它需要受限制等待的时间,并让它等待直到有权访问为止。

保证QPS为稳定速率的最简单的方式是保存上一个授权请求的时间戳,然后保证在接下来的1/QPS 秒内没有其他请求进入。举例来说,对于QPS=5的需求,如果我们能保证没有请求能够在上个请求之后的200ms内获得授权,那我们就实现了限流。如果有个请求在上个授权请求之后的100ms到来,那么我们需要做的就是让它再等待100ms。以此类推,对于并发的15个请求,总共会花掉3秒钟。

值得注意的一点是,这样的机制只保存了非常少的关于过去的记忆,因为它只需要记录最近一次请求。那么如果在一个请求获得授权之前的很长时间都没有请求时会发生什么呢?限流器会立马忘记关于过去的寂寞(低利用率),而只记录新的请求的时间戳,然后下个请求也只能在这个请求之后的1/QPS时间间隔之后才能获得授权,这显然与我们期望的QPS不太匹配,并最终会导致低利用率或者请求溢出。

过去的低利用率意味着大量的资源空闲,因此,限流器应该加速利用这些空闲资源。比较典型的场景就是在网络带宽上,低利用率意味着有多的缓存空间可以立即使用。

但是在另一方面,一段时间的低利用率也意味着“服务对于新来的请求没有准备好”。这有点隐晦,举个列子,服务的缓存可能陈旧,请求更有可能触发耗时操作。

为了处理这种两难的情况,我们需要添加另外一个衡量维度,通过storedPermits来描述过去的低利用率。当这个变量为0的时候,代表没有低利用率存在,随着低利用率持续增加时,storedPermits能够到达maxStoredPermits。因此,当获取permits令牌发生时,拿到的令牌通常会来自两个部分:

  • stored permits (过去留存的令牌,当有低利用率存在就能使用)
  • fresh permits(stored permits之后还有剩余的permits,我们认为他需要新鲜fresh的permit令牌来保证)

我们通过下面这个例子来说明:

我们有一个限流器每秒产生一个令牌,即保证QPS=1。如果有一秒限流器没有请求进来,那么我们对storedPermits加1。假定在过去的10秒内都没有请求进来,那storedPermits就会增加至10(假定maxStoredPermits>10)。这时一个请求到来并申请获取3个令牌,我们可以直接从storedPermits中提出来3个令牌支持,并且storedPermits减少到7。在此之后又来了一个请求申请获取10个令牌,这时候我们从storedPermits中直接提取完剩下来的7个令牌,余下的3个令牌我们需要等待限流器放入3个fresh pertmits才能完成这个请求的授权访问。

我们知道我们的QPS=1,所以我们需要等待3秒才能拿到新的3个fresh permits。那我们拿到7个stored permits需要多少时间呢?根据上面的讨论,这个问题没有一个标准答案。如果我们想加速处理来快速填满过去低利用率带来的损失,那我们肯定希望我们拿到stored permits的速度快于fresh permits,因为低利用率代表了更多空闲资源可以利用。如果我们主要的关注点在防溢出上,那stored permits的提取速度应该要比fresh permits要慢。因此我们需要一个函数来衡量stored permits和受控制等待时间之间的关联。这个函数就是storedPermitsToWaitTime。(后面的描述会比较复杂,这里不进行深入展开了。对于我们一般使用的SmoothBursty,这个函数恒定返回0,即立即获取storedPermits)。

最后,让我们考虑一个场景,有个QPS=1的限流器,当限流器空闲时来了一个请求需要获取100个令牌,这时候我们应该直接等待100秒再开始处理?这样的情况多半会使得结果毫无意义。一种更好的策略是对这个请求放行,就像获取1个令牌一样,然后推迟后续的请求。换句话说,我们允许立即完成对这个请求的授权,然后后续的请求进来就至少得等100s的时间。这保证了请求完成的及时。

这个策略产生了非常重要的结果,就是限流器不会记录最近的请求时间,而是记录下一个请求可用的期望时间。这也保证了我们有能力判断在一个timeout时间段内一个请求是否能够获取到令牌。另一方面,根据这个期望时间,我们可以很好地判断一个限流器的未使用时间,一旦这个期望时间在当前时间之前,那么当前时间与期望时间的差值就是限流器未使用的时长,而这个时长也可以转换到stored permits上(根据前文所述storedPermits随着空闲时间增长)

RateLimiter代码解析

不知道前面给大家所翻译的大家是否看着头疼,先给大家说声抱歉,从小语文就不太好。接下来我们从代码着手,希望能给大家一个比较清晰的视角。

首先我们看下几个比较关键的变量:

 /**
   * The currently stored permits. 目前保存下来的令牌数目
   */
  double storedPermits;

  /**
   * The maximum number of stored permits.最大的令牌保存量,即桶大小
   */
  double maxPermits;

  /**
   * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
   * per second has a stable interval of 200ms.
   两个请求之间的间隙,也就是添加一个令牌到桶中的时间间隔。
   */
  double stableIntervalMicros;

/**
   * The time when the next request (no matter its size) will be granted. After granting a
   * request, this is pushed further in the future. Large requests push this further than small
   * requests.
   下一个请求能够被授权的期望时间,当一个请求被授权之后(通过acquire可以预定),这个时间会被继续往后推,大令牌量的请求会比少量的请求推的更远。
   */
  private long nextFreeTicketMicros = 0L; // could be either in the past or future 有可能在过去或者将来

这几个变量跟我们之前提到的几个概念息息相关,相信大家还记得。

接下来是几个常用的变量:

/**
   * The underlying timer; used both to measure elapsed time and sleep as necessary. A separate
   * object to facilitate testing.
   底层计时器,通过它来进行时间的计算和线程sleep
   */
  private final SleepingStopwatch stopwatch;

  // Can't be initialized in the constructor because mocks don't call the constructor. 非直接用的互斥锁
  private volatile Object mutexDoNotUseDirectly;
  /**
  双重判定的互斥锁构建过程,线程安全
  **/
  private Object mutex() {
    Object mutex = mutexDoNotUseDirectly;
    if (mutex == null) {
      synchronized (this) {
        mutex = mutexDoNotUseDirectly;
        if (mutex == null) {
          mutexDoNotUseDirectly = mutex = new Object();
        }
      }
    }
    return mutex;
  }

我们从最简单的acquire入手:

/**
   * Acquires a single permit from this {@code RateLimiter}, blocking until the
   * request can be granted. Tells the amount of time slept, if any.
   *
   * <p>This method is equivalent to {@code acquire(1)}.
   *
   * @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited
   * @since 16.0 (present in 13.0 with {@code void} return type})
   */
  public double acquire() {
    return acquire(1);
  }

  /**
   * Acquires the given number of permits from this {@code RateLimiter}, blocking until the
   * request can be granted. Tells the amount of time slept, if any.
   *
   * @param permits the number of permits to acquire
   * @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited
   * @throws IllegalArgumentException if the requested number of permits is negative or zero
   * @since 16.0 (present in 13.0 with {@code void} return type})
   */
  public double acquire(int permits) {
    long microsToWait = reserve(permits);
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

这两段代码比较简单,第一个就是把acquire方法调用委托到acquire(1),第二个稍微复杂一点,首先调用reserve()方法得到获取permits个令牌需要的等待时间,然后通过stopwatch直接无中断地sleep这么长的时间,最后返回等待的时间毫秒数。那我们再深入reserve方法:

  /**
   * Reserves the given number of permits from this {@code RateLimiter} for future use, returning
   * the number of microseconds until the reservation can be consumed.
   *
   * @return time in microseconds to wait until the resource can be acquired, never negative
   */
  final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }

首先做一些参数检验,然后获取互斥锁,接着调用reserveAndGetWaitTime,传入需要获取的令牌数和当前的毫秒数。(插句题外话,不得不服google的代码质量,从注释到命名,一目了然)

   /**
   * Reserves next ticket and returns the wait time that the caller must wait for.
   *
   * @return the required wait time, never negative
   */
  final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
  }

这一段代码通过调用reserveEarliestAvailable来得到该请求能够获取令牌授权的毫秒时刻,然后通过运算返回得到需要等待的毫秒数,我们继续看reserveEarliestAvailable方法:

   /**
   * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
   * 这个函数功能是在每次请求调用产生时更新限流器的令牌数
   */
  void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    // 如果下次能授权的毫秒数在现在的毫秒计数之前
    // 说明这个限流器已经有一段时间没有使用了
    // 需要计算这段时间产生的stored permits
    // 否则说明这段时间限流器一直有请求进来,则不需要更新
    if (nowMicros > nextFreeTicketMicros) {
      //stored permits 最多为maxPermits,
      //大小根据这段空闲时间长度(nowMicros - nextFreeTicketMicros)确定
      storedPermits = min(maxPermits,
          storedPermits
            + (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros());
      //更新nextFreeTiecket为now
      nextFreeTicketMicros = nowMicros;
    }
  }


  /*
    直接返回放入令牌间隔,即 1 / QPS * 1000(毫秒)
  */
  @Override
  double coolDownIntervalMicros() {
      return stableIntervalMicros;
  }


 /*
  在当前场景下,对于storedPermits,我们的策略是立即获取,因此没有wait time,返回0
*/
  @Override
  long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      return 0L;
  }

  @Override
  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    // 更新令牌桶
    resync(nowMicros);
    //保存nextFreeTicketMicros
    long returnValue = nextFreeTicketMicros;
    //获取这次能够使用的storedPermits
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    //计算需要等待获取的fresh permits
    double freshPermits = requiredPermits - storedPermitsToSpend;
    //总的等待时间等于storedPermits的等待时间加上fresh permit的等待时间
    //fresh的等待时间就是放入令牌的间隔*fresh permits数目
    long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
        + (long) (freshPermits * stableIntervalMicros);
  
    // 增加nextFreeTicketMicros, 这里支持预定
    try {
      this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros);
    } catch (ArithmeticException e) {
      this.nextFreeTicketMicros = Long.MAX_VALUE;
    }
    //更新stored permits
    this.storedPermits -= storedPermitsToSpend;
    //因为支持预定,所以返回的是这些计算之前nextFreeTicketMicros作为需要wait的时间
    //而不是计算后的
    return returnValue;
  }

这段代码的讲解我大部分已经写在代码注释里面了,需要说明的是,我最开始一直在想按照令牌桶算法的描述,应该有一个定时插入令牌的过程,但是我看了下确实没有多的线程同步机制来做这个事儿,原来Guava中采用了触发式的更新令牌桶机制。原理就是在每次请求到来的时候去完成令牌桶中令牌插入工作和其他属性如nextFreeTicketMicros的更新工作,这样减少了线程使用, 节约了资源,并且也简化了操作。这个功能在resync函数代码中完成。需要值得注意的是,因为Guava的实现支持令牌预定功能,即当限流器当前处于空闲状态时,一个大量令牌请求进来的时候,可以提前预授权给他足够的令牌让它能够立即执行,并推迟后续请求的等待时间(如之前所述),因此才会出现nowMicros < nextFreeTicketMicro的情况,而这种情况就说明当前仍处于对于之前一个请求的预授权阶段,不需要更新storedPermits,否则就还是nowMicros >= nextFreeTicketMicro的情况。

看完了acquire的流程,我们再来看tryAcquire的代码:

 /**
   * Acquires a permit from this {@code RateLimiter} if it can be obtained
   * without exceeding the specified {@code timeout}, or returns {@code false}
   * immediately (without waiting) if the permit would not have been granted
   * before the timeout expired.
   *
   * <p>This method is equivalent to {@code tryAcquire(1, timeout, unit)}.
   *
   * @param timeout the maximum time to wait for the permit. Negative values are treated as zero.
   * @param unit the time unit of the timeout argument
   * @return {@code true} if the permit was acquired, {@code false} otherwise
   * @throws IllegalArgumentException if the requested number of permits is negative or zero
   */
  public boolean tryAcquire(long timeout, TimeUnit unit) {
    return tryAcquire(1, timeout, unit);
  }

  /**
   * Acquires permits from this {@link RateLimiter} if it can be acquired immediately without delay.
   *
   * <p>
   * This method is equivalent to {@code tryAcquire(permits, 0, anyUnit)}.
   *
   * @param permits the number of permits to acquire
   * @return {@code true} if the permits were acquired, {@code false} otherwise
   * @throws IllegalArgumentException if the requested number of permits is negative or zero
   * @since 14.0
   */
  public boolean tryAcquire(int permits) {
    return tryAcquire(permits, 0, MICROSECONDS);
  }

  /**
   * Acquires a permit from this {@link RateLimiter} if it can be acquired immediately without
   * delay.
   *
   * <p>
   * This method is equivalent to {@code tryAcquire(1)}.
   *
   * @return {@code true} if the permit was acquired, {@code false} otherwise
   * @since 14.0
   */
  public boolean tryAcquire() {
    return tryAcquire(1, 0, MICROSECONDS);
  }

这还是很简单。。层层委托。我们来看最后这个tryAcquire

/**
   * Acquires the given number of permits from this {@code RateLimiter} if it can be obtained
   * without exceeding the specified {@code timeout}, or returns {@code false}
   * immediately (without waiting) if the permits would not have been granted
   * before the timeout expired.
   *
   * @param permits the number of permits to acquire
   * @param timeout the maximum time to wait for the permits. Negative values are treated as zero.
   * @param unit the time unit of the timeout argument
   * @return {@code true} if the permits were acquired, {@code false} otherwise
   * @throws IllegalArgumentException if the requested number of permits is negative or zero
   */
  public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
    long timeoutMicros = max(unit.toMicros(timeout), 0);
    checkPermits(permits);
    long microsToWait;
    //获取互斥锁
    synchronized (mutex()) {
      //获取当前时间
      long nowMicros = stopwatch.readMicros();
      //判断是否能够在timeout时间内能够获取
      if (!canAcquire(nowMicros, timeoutMicros)) {
        return false;
      } else {
       //如果判断能够获取,则调用reserveAndGetWaitLength获取等待时间
       //其实就是走了一遍acquire
        microsToWait = reserveAndGetWaitLength(permits, nowMicros);
      }
    }
    // sleep直到能获取令牌
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return true;
  }

通过上述分析,我们知道主要逻辑在canAcquire方法内:

private boolean canAcquire(long nowMicros, long timeoutMicros) {
    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}

通过调用queryEarliestAvailable得到最近的令牌可用时间,然后看这个时间与now的差值是否小于timeout,如果小于则表示这个timeout内可以获取到令牌,返回true,否则返回false

  @Override
  final long queryEarliestAvailable(long nowMicros) {
    return nextFreeTicketMicros;
  }

在SmoothBursty实现中,queryEarliestAvailable的实现直接返回nextFreeTicketMicros,这个也很清晰,nextFreeTicketMicros本来的意义就是最近的令牌可用时间。

小结

以上就是针对Guava RateLimiter的代码和限流逻辑的一个整体梳理,主要是针对SmoothBursty的实现来做的一个分析。希望大家能够喜欢,后续可能需要考虑针对多机做一个类似的机制。

上一篇下一篇

猜你喜欢

热点阅读