高可用系统处理方式-限流
在开发高并发高可用系统时有三把利器用来保护系统:
缓存
、降级
和限流
。(1)缓存的目的是提升系统访问速度和增大系统能处理的容量,可谓是抗高并发流量的银弹;
(2)降级是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉,待高峰或者问题解决后再打开;
(3)而有些场景并不能用缓存和降级来解决,比如稀缺资源(秒杀、抢购)、写服务(如评论、下单)、频繁的复杂查询(评论的最后几页),因此需有一种手段来限制这些场景的并发/请求量,即限流。
后面两条主要是为了提高系统的高可用
限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务(定向到错误页或告知资源没有了)、排队或等待(比如秒杀、评论、下单)、降级(返回兜底数据或默认数据,如商品详情页库存默认有货)
常见的限流算法有:令牌桶
、漏桶
。计数器也可以进行粗暴限流实现。
漏桶算法
漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率
。
线程池就是一个例子
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);
对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了, 令牌桶算法
更为适合
令牌桶(Token bucket)
令牌桶算法的基本过程如下:
每秒会有 r 个令牌放入桶中,或者说,每过 1/r 秒桶中增加一个令牌
桶中最多存放 b 个令牌,如果桶满了,新放入的令牌会被丢弃
当一个 n 字节的数据包到达时,消耗 n 个令牌,然后发送该数据包
如果桶中可用令牌小于 n,则该数据包将被缓存或丢弃
我们可以使用 Guava 的 RateLimiter 来实现基于令牌桶的流量控制。RateLimiter 令牌桶算法的单桶实现,RateLimiter 对简单的令牌桶算法做了一些工程上的优化,具体的实现是 SmoothBursty。需要注意的是,RateLimiter 的另一个实现 SmoothWarmingUp,就不是令牌桶了,而是漏桶算法。
SmoothBursty 有一个可以放 N 个时间窗口产生的令牌的桶,系统空闲的时候令牌就一直攒着,最好情况下可以扛 N 倍于限流值的高峰而不影响后续请求,就像三峡大坝一样能扛千年一遇的洪水.
使用
1、在我们的java项目中pom.xml文件中添加依赖
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0-rc2</version>
</dependency>
2、代码
package com.dubbo.demo;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by haichenglhc on 27/02/2017.
*
* @author haichenglhc
* @date 2017/02/27
*/
public class TT {
private static final ConcurrentMap<String, RateLimiter> resourceRateLimiterMap =
new ConcurrentHashMap<String, RateLimiter>();
public static void createFlowLimitMap(String resource, double qps) {
RateLimiter limiter = resourceRateLimiterMap.get(resource);
if (limiter == null) {
limiter = RateLimiter.create(qps);
resourceRateLimiterMap.putIfAbsent(resource, limiter);
}
limiter.setRate(qps);
}
public static boolean enter(int i, String resource) throws Exception {
RateLimiter limiter = resourceRateLimiterMap.get(resource);
if (limiter == null) {
throw new Exception(resource);
}
// System.out.println(limiter.acquire());
if (!limiter.tryAcquire()) {
System.out.println(i + " >>>>>>>>>>>>>>>>>被限流了>>>>>>>>");
return true;
}
return false;
}
static class TestWork implements Runnable {
private int i;
public TestWork() {
}
public TestWork(int i) {
this.i = i;
}
@Override
public void run() {
try {
if (!enter(i,"test")) {
System.out.println(i + " ++++++++++++ 没有被限流");
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
String source = "test";
double qps = 10;
createFlowLimitMap(source, qps);
Thread.sleep(1000l);
ExecutorService pools = Executors.newFixedThreadPool(40);
for (int i = 0; i < 16; i++) {
TestWork testWork = new TestWork(i);
pools.execute(testWork);
}
}
}