限流概念&算法
在开发高并发系统时,有三把利器用来保护系统:熔断、延迟处理、缓存、降级和限流
限流
- 问题:遇到瞬时请求量激增时,会导致接口占用过多服务器资源,使得其他请求响应速度降低或是超时,更有甚者可能导致服务器宕机。
- 目的:通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务(定向到错误页或告知资源没有了)、排队或等待(比如秒杀、评论、下单)、降级(返回兜底数据或默认数据,如商品详情页库存默认有货)。
- 方法:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。
分布式限流
单点限流
分布式集群 单点限流的问题
节点扩容、缩容时无法准确控制整个服务的请求限制
分布式集群 分布式限流
限流算法
计数器
如限制1秒钟内请求数最多为10个,每当进来一个请求,则计数器+1
当计数器达到上限时,则触发限流
时间每经过1秒,则重置计数器
不足:在第1秒的后半时间内,涌入了大量流量,然后到第2秒的前半时间,又涌入了大量流量。由于从第1秒到第2秒,请求计数是清零的,所以在这瞬间的qps有可能超过系统的承载。
public class RateLimiter {
private long updateTimeStamp;
private int intervalMilliSeconds;
private int maxPermits;
private long storedPermits;
public RateLimiter(int maxPermits) {
this(maxPermits, 1);
}
public RateLimiter(int maxPermits, int intervalSeconds) {
this.maxPermits = maxPermits;
this.intervalMilliSeconds = intervalSeconds * 1000;
}
public synchronized Boolean acquire(int permits) {
while (true) {
long now = System.currentTimeMillis();
if (now < updateTimeStamp + intervalMilliSeconds) {
if (storedPermits + permits <= maxPermits) {
storedPermits += permits;
updateTimeStamp = now;
return true;
} else {
return false;
}
} else {
storedPermits = 0;
updateTimeStamp = now;
}
}
}
}
滑动窗口
滑动窗口本质上也是一种计数器,只不过它的粒度更细。比如限制qps为1000,设定窗口大小为10,则每个窗口的时间间隔为100ms。每次窗口滑动时,重置的是前1s至900ms之间内的计数,而不是完整的1s。
public class RateLimiter {
private LinkedList<Integer> deque = new LinkedList<>();
private int windowSize;
private int windowIntervalMilliSeconds;
private int currentWindowPermits;
private long updateTimeStamp;
private int intervalMilliSeconds;
private int maxPermits;
private long storedPermits;
public RateLimiter(int maxPermits, int windowSize) {
this(maxPermits, 1, windowSize);
}
public RateLimiter(int maxPermits, int intervalSeconds, int windowSize) {
this.maxPermits = maxPermits;
this.intervalMilliSeconds = intervalSeconds * 1000;
this.windowSize = windowSize;
this.windowIntervalMilliSeconds = intervalMilliSeconds / windowSize;
}
public synchronized Boolean acquire(int permits) {
while (true) {
long now = System.currentTimeMillis();
if (now < updateTimeStamp + windowIntervalMilliSeconds) {
if (storedPermits + permits + currentWindowPermits <= maxPermits) {
currentWindowPermits += permits;
updateTimeStamp = now;
return true;
} else {
return false;
}
} else {
updateTimeStamp = now;
deque.offerLast(currentWindowPermits);
storedPermits += currentWindowPermits;
currentWindowPermits = 0;
while (deque.size() > windowSize) {
storedPermits -= deque.removeFirst();
}
}
}
}
}
漏桶算法
漏桶算法这个名字就很形象,算法内部有一个容器,当请求进来时,相当于水倒入漏斗,然后从下端小口慢慢匀速的流出。不管上面流量多大,下面流出的速度始终保持不变。 当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求。
- 一个固定容量的漏桶,按照常量固定速率流出水滴;
- 如果桶是空的,则不需流出水滴;
- 可以以任意速率流入水滴到漏桶;
-
如果流入水滴超出了桶的容量,则流入的水滴溢出了(被丢弃),而漏桶容量是不变的。
public class RateLimiter {
private int capacity;
private int rate;
private int water;
private int intervalMilliSeconds;
private Semaphore exit;
public RateLimiter(int capacity, int rate) {
this(capacity, rate, 1);
}
public RateLimiter(int capacity, int rate, int intervalSeconds) {
this.exit = new Semaphore(1);
this.capacity = capacity;
this.rate = rate;
this.intervalMilliSeconds = intervalSeconds * 1000;
}
public Boolean acquire(int permits) throws InterruptedException {
if (water + permits <= capacity) {
water += permits;
while (!exit.tryAcquire(permits, intervalMilliSeconds / rate, TimeUnit.MILLISECONDS)) {}
Thread.sleep(intervalMilliSeconds / rate);
water -= permits;
exit.release(permits);
return true;
} else {
return false;
}
}
}
令牌桶算法
在令牌桶算法中,存在一个桶,用来存放固定数量的令牌。算法中存在一种机制,以一定的速率往桶中放令牌。每次请求调用需要先获取令牌,只有拿到令牌,才有机会继续执行,否则选择选择等待可用的令牌、或者直接拒绝。
public class RateLimiter {
private long updateTimeStamp;
private int capacity;
private int rate;
private int tokens;
private int intervalMilliSeconds;
public RateLimiter(int capacity, int rate) {
this(capacity, rate, 1);
}
public RateLimiter(int capacity, int rate, int intervalSeconds) {
this.capacity = capacity;
this.rate = rate;
this.intervalMilliSeconds = intervalSeconds * 1000;
}
public synchronized Boolean acquire(int permits) {
long now = System.currentTimeMillis();
int newTokens = (int) ((now - updateTimeStamp) / intervalMilliSeconds * rate);
if (newTokens > 0) {
this.tokens = Math.min(capacity, this.tokens + newTokens);
this.updateTimeStamp = now;
}
if (tokens - permits >= 0) {
tokens -= permits;
return true;
} else {
return false;
}
}
}
开源框架
ratelimiter4j
https://github.com/wangzheng0822/ratelimiter4j
Ref:
https://www.jianshu.com/p/2596e559db5c
https://shengbao.org/609.html
https://blog.wangqi.love/articles/Java/%E9%99%90%E6%B5%81%E6%8A%80%E6%9C%AF%E6%80%BB%E7%BB%93.html