Java高并发系列——检视阅读(七)
Java高并发系列——集合
JUC中常见的集合
JUC集合框架图
img图可以看到,JUC的集合框架也是从Map、List、Set、Queue、Collection等超级接口中继承而来的。所以,大概可以知道JUC下的集合包含了一一些基本操作,并且变得线程安全。
Map
ConcurrentHashMap
功能和HashMap基本一致,内部使用红黑树实现的。
java8的ConcurrentHashMap为何放弃分段锁,为什么要使用CAS+Synchronized取代Segment+ReentrantLock...
特性:
- 迭代结果和存入顺序不一致
- key和value都不能为空,否则会空指针,这里和HashMap不一样,HashMap的key和value都可以为空。
- 线程安全的
ConcurrentSkipListMap
内部使用跳表实现的,放入的元素会进行排序,排序算法支持2种方式来指定:
- 通过构造方法传入一个
Comparator
- 放入的元素实现
Comparable
接口
上面2种方式必选一个,如果2种都有,走规则1。
特性:
- 迭代结果和存入顺序不一致
- 放入的元素会排序
- key和value都不能为空
- 线程安全的
List
CopyOnWriteArrayList
实现List的接口的,一般我们使用ArrayList、LinkedList、Vector
,其中只有Vector是线程安全的,可以使用Collections静态类的synchronizedList方法对ArrayList、LinkedList包装为线程安全的List,不过这些方式在保证线程安全的情况下性能都不高。
CopyOnWriteArrayList是线程安全的List,内部使用数组存储数据,集合中多线程并行操作一般存在4种情况:读读、读写、写写、写读,这个只有在写写操作过程中会导致其他线程阻塞,其他3种情况均不会阻塞
,所以读取的效率非常高。
可以看一下这个类的名称:CopyOnWrite,意思是在写入操作的时候,进行一次自我复制,换句话说,当这个List需要修改时,并不修改原有内容(这对于保证当前在读线程的数据一致性非常重要),而是在原有存放数据的数组上产生一个副本,在副本上修改数据,修改完毕之后,用副本替换原来的数组,这样也保证了写操作不会影响读。
注意:当然,做写操作的时候也是会用Lock加锁保证同步的,因为每次添加都会加锁且重新复制一个数组出来存储,因此,在保证读多写少的情况下,在进行写操作特别是初始化时也尽量调用addAll这些方法来一次性添加一个集合数据来避免不断地复制。
特性:
- 迭代结果和存入顺序一致
- 元素不重复
- 元素可以为空
- 线程安全的
- 读读、读写、写读3种情况不会阻塞;写写会阻塞
- 无界的
Set
ConcurrentSkipListSet
有序的Set,内部基于ConcurrentSkipListMap实现的,放入的元素会进行排序,排序算法支持2种方式来指定:
- 通过构造方法传入一个
Comparator
- 放入的元素实现
Comparable
接口
上面2种方式需要实现一个,如果2种都有,走规则1
特性:
- 迭代结果和存入顺序不一致
- 放入的元素会排序
- 元素不重复
- 元素不能为空
- 线程安全的
- 无界的
CopyOnWriteArraySet
内部使用CopyOnWriteArrayList实现的,将所有的操作都会转发给CopyOnWriteArrayList。
注意:因为底层是用CopyOnWriteArrayList实现,因此CopyOnWriteArraySet实现元素不重复是用CopyOnWriteArrayList的indexOf方法从头到尾一个一个比较来实现的。
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
}
特性:
- 迭代结果和存入顺序不一致
- 元素不重复
- 元素可以为空
- 线程安全的
- 读读、读写、写读 不会阻塞;写写会阻塞
- 无界的
Queue
Queue接口中的方法,我们再回顾一下:
操作类型 | 抛出异常 | 返回特殊值 |
---|---|---|
插入 | add(e) |
offer(e) |
移除 | remove() |
poll() |
检查 | element() |
peek() |
3种操作,每种操作有2个方法,不同点是队列为空或者满载时,调用方法是抛出异常还是返回特殊值,大家按照表格中的多看几遍,加深记忆。
ConcurrentLinkedQueue
高效并发队列,内部使用链表实现的。
特性:
- 线程安全的
- 迭代结果和存入顺序一致
- 元素可以重复
- 元素不能为空
- 线程安全的
- 无界队列
Deque
先介绍一下Deque接口,双向队列(Deque)是Queue的一个子接口,双向队列是指该队列两端的元素既能入队(offer)也能出队(poll),如果将Deque限制为只能从一端入队和出队,则可实现栈的数据结构。对于栈而言,有入栈(push)和出栈(pop),遵循先进后出原则。
一个线性 collection,支持在两端插入和移除元素。名称 deque 是“double ended queue(双端队列)”的缩写,通常读为“deck”。大多数 Deque
实现对于它们能够包含的元素数没有固定限制,但此接口既支持有容量限制的双端队列,也支持没有固定大小限制的双端队列。
此接口定义在双端队列两端访问元素的方法。提供插入、移除和检查元素的方法。每种方法都存在两种形式:一种形式在操作失败时抛出异常,另一种形式返回一个特殊值(null
或 false
,具体取决于操作)。插入操作的后一种形式是专为使用有容量限制的 Deque
实现设计的;在大多数实现中,插入操作不能失败。
下表总结了上述 12 种方法:
img此接口扩展了 Queue
接口。在将双端队列用作队列时,将得到 FIFO(先进先出)行为。将元素添加到双端队列的末尾,从双端队列的开头移除元素。从 Queue
接口继承的方法完全等效于 Deque
方法,如下表所示:
Queue 方法 | 等效 Deque 方法 |
---|---|
add(e) |
addLast(e) |
offer(e) |
offerLast(e) |
remove() |
removeFirst() |
poll() |
pollFirst() |
element() |
getFirst() |
peek() |
peekFirst() |
ConcurrentLinkedDeque
实现了Deque接口,内部使用链表实现的高效的并发双端队列。
特性:
- 线程安全的
- 迭代结果和存入顺序一致
- 元素可以重复
- 元素不能为空
- 线程安全的
- 无界队列
BlockingQueue
关于阻塞队列,上一篇有详细介绍。
疑问:
Q:跳表是什么?
跳表,是基于链表实现的一种类似“二分”的算法。它可以快速的实现增,删,改,查操作。跳跃列表的平均查找和插入时间复杂度都是O(logn)。跳表是通过维护一个多层次的链表来构建多层索引结构,是一种以空间来换时间的数据结构,多层链表中的每一层链表元素是前一层链表元素的子集。一开始时,算法在最稀疏的层次进行搜索,直至需要查找的元素在该层两个相邻的元素中间。这时,算法将跳转到下一个层次,重复刚才的搜索,直到找到需要查找的元素为止。
之所以要用跳表,我们可以与单链表进行比较,单链表中查找某个数据的时候需要的时间复杂度为O(n).而我们多个跳表构建多层索引,使得查询效率提升到O(logn)。
接口性能提升实战篇
需求:电商app的商品详情页,需要给他们提供一个接口获取商品相关信息:
- 商品基本信息(名称、价格、库存、会员价格等)
- 商品图片列表
- 商品描述信息(描述信息一般是由富文本编辑的大文本信息)
普通接口实现伪代码如下:
public Map<String,Object> detail(long goodsId){
//创建一个map
//step1:查询商品基本信息,放入map
map.put("goodsModel",(select * from t_goods where id = #gooldsId#));
//step2:查询商品图片列表,返回一个集合放入map
map.put("goodsImgsModelList",(select * from t_goods_imgs where goods_id = #gooldsId#));
//step3:查询商品描述信息,放入map
map.put("goodsExtModel",(select * from t_goods_ext where goods_id = #gooldsId#));
return map;
}
上面这种写法应该很常见,代码很简单,假设上面每个步骤耗时200ms,此接口总共耗时>=600毫秒
整个过程是按顺序执行的,实际上3个查询之间是没有任何依赖关系,所以说3个查询可以同时执行,那我们对这3个步骤采用多线程并行执行实现如下:
示例:
public class GetProductDetailTest {
//自定义包含策略
private ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
/**
* 获取商品基本信息
*
* @param goodsId 商品id
* @return 商品基本信息
* @throws InterruptedException
*/
public String goodsDetailModel(long goodsId) throws InterruptedException {
//模拟耗时,休眠200ms
TimeUnit.MILLISECONDS.sleep(200);
return "商品id:" + goodsId + ",商品基本信息....";
}
/**
* 获取商品图片列表
*
* @param goodsId 商品id
* @return 商品图片列表
* @throws InterruptedException
*/
public List<String> goodsImgsModelList(long goodsId) throws InterruptedException {
//模拟耗时,休眠200ms
TimeUnit.MILLISECONDS.sleep(200);
return Arrays.asList("图1", "图2", "图3");
}
/**
* 获取商品描述信息
*
* @param goodsId 商品id
* @return 商品描述信息
* @throws InterruptedException
*/
public String goodsExtModel(long goodsId) throws InterruptedException {
//模拟耗时,休眠200ms
TimeUnit.MILLISECONDS.sleep(200);
return "商品id:" + goodsId + ",商品描述信息......";
}
public Map<String,Object> getGoodsDetail(long goodsId) throws ExecutionException, InterruptedException {
Map<String, Object> result = new HashMap<>();
Future<String> gooldsDetailModelFuture = executor.submit(() -> goodsDetailModel(goodsId));
Future<List<String>> goodsImgsModelFuture = executor.submit(() -> goodsImgsModelList(goodsId));
//异步获取商品描述信息
Future<String> goodsExtModelFuture = executor.submit(() -> goodsExtModel(goodsId));
result.put("gooldsDetailModel", gooldsDetailModelFuture.get());
result.put("goodsImgsModelList", goodsImgsModelFuture.get());
result.put("goodsExtModel", goodsExtModelFuture.get());
return result;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
GetProductDetailTest detailTest = new GetProductDetailTest();
long starTime = System.currentTimeMillis();
Map<String, Object> map = detailTest.getGoodsDetail(1L);
System.out.println(map);
System.out.println("耗时(ms):" + (System.currentTimeMillis() - starTime));
}
}
输出:
{goodsImgsModelList=[图1, 图2, 图3], gooldsDetailModel=商品id:1,商品基本信息...., goodsExtModel=商品id:1,商品描述信息......}
耗时(ms):255
可以看出耗时200毫秒左右,性能提升了2倍,假如这个接口中还存在其他无依赖的操作,性能提升将更加显著,上面使用了线程池并行去执行3次查询的任务,最后通过Future获取异步执行结果。
整个优化过程:
- 先列出无依赖的一些操作
- 将这些操作改为并行的方式
总结
- 对于无依赖的操作尽量采用并行方式去执行,可以很好的提升接口的性能
解决微服务日志的痛点
日志有什么用?
- 系统出现故障的时候,可以通过日志信息快速定位问题,修复bug,恢复业务
- 提取有用数据,做数据分析使用
本文主要讨论通过日志来快速定位并解决问题。
日志存在的痛点
先介绍一下多数公司采用的方式:目前比较流行的是采用springcloud(或者dubbo)做微服务,按照业务拆分为多个独立的服务,服务采用集群的方式部署在不同的机器上,当一个请求过来的时候,可能会调用到很多服务进行处理,springcloud一般采用logback(或者log4j)输出日志到文件中。当系统出问题的时候,按照系统故障的严重程度,严重的会回退版本,然后排查bug,轻的,找运维去线上拉日志,然后排查问题。
这个过程中存在一些问题:
- 日志文件太大太多,不方便查找
- 日志分散在不同的机器上,也不方便查找
- 一个请求可能会调用多个服务,完整的日志难以追踪(没有完整的链路日志)
- 系统出现了问题,只能等到用户发现了,自己才知道(没有报错预警)
本文要解决上面的几个痛点,构建我们的日志系统,达到以下要求:
- 方便追踪一个请求完整的日志
- 方便快速检索日志
- 系统出现问题自动报警,通知相关人员
构建日志系统
方便追踪一个请求完整的日志
当一个请求过来的时候,可能会调用多个服务,多个服务内部可能又会产生子线程处理业务,所以这里面有两个问题需要解决:
- 多个服务之间日志的追踪
- 服务内部子线程和主线程日志的追踪,这个地方举个例子,比如一个请求内部需要给10000人发送推送,内部开启10个线程并行处理,处理完毕之后响应操作者,这里面有父子线程,我们要能够找到这个里面所有的日志
需要追踪一个请求完整日志,我们需要给每个请求设置一个全局唯一编号,可以使用UUID或者其他方式也行。
多个服务之间日志追踪的问题:当一个请求过来的时候,在入口处生成一个trace_id,然后放在ThreadLocal中,如果内部设计到多个服务之间相互调用,调用其他服务的时,将trace_id顺便携带过去。
父子线程日志追踪的问题:可以采用InheritableThreadLocal来存放trace_id,这样可以在线程中获取到父线程中的trace_id。
所以此处我们需要使用InheritableThreadLocal
来存储trace_id。
使用了线程池处理请求的,由于线程池中的线程采用的是复用的方式,所以需要对执行的任务Runable做一些改造 包装。
public class TraceRunnable implements Runnable {
private String tranceId;
private Runnable target;
public TraceRunnable(Runnable target) {
this.tranceId = TraceUtil.get();
this.target = target;
}
@Override
public void run() {
try {
TraceUtil.set(this.tranceId);
MDC.put(TraceUtil.MDC_TRACE_ID, TraceUtil.get());
this.target.run();
} finally {
MDC.remove(TraceUtil.MDC_TRACE_ID);
TraceUtil.remove();
}
}
public static Runnable trace(Runnable target) {
return new TraceRunnable(target);
}
}
需要用线程池执行的任务使用TraceRunnable
封装一下就可以了。
TraceUtil代码:
public class TraceUtil {
public static final String REQUEST_HEADER_TRACE_ID = "com.ms.header.trace.id";
public static final String MDC_TRACE_ID = "trace_id";
private static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();
/**
* 获取traceid
*
* @return
*/
public static String get() {
String traceId = inheritableThreadLocal.get();
if (traceId == null) {
traceId = IDUtil.getId();
inheritableThreadLocal.set(traceId);
}
return traceId;
}
public static void set(String trace_id) {
inheritableThreadLocal.set(trace_id);
}
public static void remove() {
inheritableThreadLocal.remove();
}
}
日志输出中携带上trace_id,这样最终我们就可以通过trace_id找到一个请求的完整日志了。
方便快速检索日志
日志分散在不同的机器上,如果要快速检索,需要将所有服务产生的日志汇集到一个地方。
关于检索日志的,列一下需求:
- 我们将收集日志发送到消息中间件中(可以是kafka、rocketmq),消息中间件这块不介绍,选择玩的比较溜的就可以了
- 系统产生日志尽量不要影响接口的效率
- 带宽有限的情况下,发送日志也尽量不要去影响业务
- 日志尽量低延次,产生的日志,尽量在生成之后1分钟后可以检索到
- 检索日志功能要能够快速响应
关于上面几点,我们需要做的:日志发送的地方进行改造,引入消息中间件,将日志异步发送到消息中间件中,查询的地方采用elasticsearch,日志系统需要订阅消息中间件中的日志,然后丢给elasticsearch建索引,方便快速检索,咱们来一点点的介绍。
日志发送端的改造
日志是由业务系统产生的,一个请求过来的时候会产生很多日志,日志产生时,我们尽量减少日志输出对业务耗时的影响,我们的过程如下:
- 业务系统内部引用一个线程池来异步处理日志,线程池内部可以使用一个容量稍微大一点的阻塞队列
- 业务系统将日志丢给线程池进行处理
- 线程池中将需要处理的日志先压缩一下,然后发送至mq
线程池的使用可以参考:JAVA线程池,这一篇就够了
引入mq存储日志
业务系统将日志先发送到mq中,后面由其他消费者订阅进行消费。日志量比较大的,对mq的要求也比较高,可以选择kafka,业务量小的,也可以选取activemq。
使用elasticsearch来检索日志
elasticsearch(以下简称es)是一个全文检索工具,具体详情可以参考其官网相关文档。使用它来检索数据效率非常高。日志系统中需要我们开发一个消费端来拉取mq中的消息,将其存储到es中方便快速检索,关于这块有几点说一下:
- 建议按天在es中建立数据库,日志量非常大的,也可以按小时建立数据库。查询的时候,时间就是必选条件了,这样可以快速让es定位到日志库进行检索,提升检索效率
- 日志常见的需要收集的信息:trace_id、时间、日志级别、类、方法、url、调用的接口开始时间、调用接口的结束时间、接口耗时、接口状态码、异常信息、日志信息等等,可以按照这些在es中建立索引,方便检索。
日志监控报警——可自定义配置报警
日志监控报警是非常重要的,这个必须要有,日志系统中需要开发监控报警功能,这块我们可以做成通过页面配置的方式,支持报警规则的配置,如日志中产生了某些异常、接口响应时间大于多少、接口返回状态码404等异常信息的时候能够报警,具体的报警可以是语音电话、短信通知、钉钉机器人报警等等,这些也做成可以配置的。
日志监控模块从mq中拉取日志,然后去匹配我们启用的一些规则进行报警。
日志处理结构图如下:
image.png高并发中常见的限流方式
常见的限流的场景
- 秒杀活动,数量有限,访问量巨大,为了防止系统宕机,需要做限流处理
- 国庆期间,一般的旅游景点人口太多,采用排队方式做限流处理
- 医院看病通过发放排队号的方式来做限流处理。
常见的限流算法
- 通过控制最大并发数来进行限流
- 使用漏桶算法来进行限流
- 使用令牌桶算法来进行限流
通过控制最大并发数来进行限流
以秒杀业务为例,10个iphone,100万人抢购,100万人同时发起请求,最终能够抢到的人也就是前面几个人,后面的基本上都没有希望了,那么我们可以通过控制并发数来实现,比如并发数控制在10个,其他超过并发数的请求全部拒绝,提示:秒杀失败,请稍后重试。
单机中的JUC中提供了这样的工具类:Semaphore:如果是集群,则可以用redis或者zk代替Semaphore
示例:
public class MaxAccessLimiter {
private static Semaphore limiter = new Semaphore(5);
//自定义包含策略
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 60,
TimeUnit.SECONDS, new SynchronousQueue(),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
executor.submit(() -> {
boolean flag = false;
try {
flag = limiter.tryAcquire(100, TimeUnit.MICROSECONDS);
if (flag) {
//休眠2秒,模拟下单操作
System.out.println(Thread.currentThread() + ",尝试下单中。。。。。");
TimeUnit.SECONDS.sleep(2);
} else {
System.out.println(Thread.currentThread() + ",秒杀失败,请稍微重试!");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (flag) {
limiter.release();
}
}
});
}
executor.shutdown();
}
}
输出:
Thread[From DemoThreadFactory's 订单创建组-Worker-1,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-2,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-3,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-4,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-5,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-9,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-14,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-16,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-17,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-18,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-20,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-12,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-11,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-7,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-8,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-6,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-10,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-19,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-15,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-13,5,main],秒杀失败,请稍微重试!
使用漏桶算法来进行限流
国庆期间比较火爆的景点,人流量巨大,一般入口处会有限流的弯道,让游客进去进行排队,排在前面的人,每隔一段时间会放一拨进入景区。排队人数超过了指定的限制,后面再来的人会被告知今天已经游客量已经达到峰值,会被拒绝排队,让其明天或者以后再来,这种玩法采用漏桶限流的方式。
漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
漏桶算法示意图:
image.png示例:代码中BucketLimit.build(10, 60, TimeUnit.MINUTES);
创建了一个容量为10,流水为60/分钟的漏桶。
public class BucketLimitTest {
public static class BucketLimit {
static AtomicInteger threadNum = new AtomicInteger(1);
//容量
private int capcity;
//流速
private int flowRate;
//流速时间单位
private TimeUnit flowRateUnit;
private BlockingQueue<Node> queue;
//漏桶流出的任务时间间隔(纳秒)
private long flowRateNanosTime;
public BucketLimit(int capcity, int flowRate, TimeUnit flowRateUnit) {
this.capcity = capcity;
this.flowRate = flowRate;
this.flowRateUnit = flowRateUnit;
this.bucketThreadWork();
}
//漏桶线程
public void bucketThreadWork() {
this.queue = new ArrayBlockingQueue<Node>(capcity);
//漏桶流出的任务时间间隔(纳秒)
this.flowRateNanosTime = flowRateUnit.toNanos(1) / flowRate;
System.out.println(TimeUnit.NANOSECONDS.toSeconds(this.flowRateNanosTime));
Thread thread = new Thread(this::bucketWork);
thread.setName("漏桶线程-" + threadNum.getAndIncrement());
thread.start();
}
//漏桶线程开始工作
public void bucketWork() {
while (true) {
Node node = this.queue.poll();
if (Objects.nonNull(node)) {
//唤醒任务线程
LockSupport.unpark(node.thread);
}
//阻塞当前线程,最长不超过nanos纳秒
//休眠flowRateNanosTime
LockSupport.parkNanos(this.flowRateNanosTime);
}
}
//返回一个漏桶
public static BucketLimit build(int capcity, int flowRate, TimeUnit flowRateUnit) {
if (capcity < 0 || flowRate < 0) {
throw new IllegalArgumentException("capcity、flowRate必须大于0!");
}
return new BucketLimit(capcity, flowRate, flowRateUnit);
}
//当前线程加入漏桶,返回false,表示漏桶已满;true:表示被漏桶限流成功,可以继续处理任务
public boolean acquire() {
Thread thread = Thread.currentThread();
Node node = new Node(thread);
if (this.queue.offer(node)) {
LockSupport.park();
return true;
}
return false;
}
//漏桶中存放的元素
class Node {
private Thread thread;
public Node(Thread thread) {
this.thread = thread;
}
}
}
//自定义包含策略
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(15, 15, 60,
TimeUnit.SECONDS, new SynchronousQueue(),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
//容量为10,流速为1个/秒,即60/每分钟
BucketLimit bucketLimit = BucketLimit.build(10, 60, TimeUnit.MINUTES);
for (int i = 0; i < 15; i++) {
executor.submit(() -> {
boolean acquire = bucketLimit.acquire();
System.out.println(Thread.currentThread().getName()+ " ," +System.currentTimeMillis() + " " + acquire);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
输出:
From DemoThreadFactory's 订单创建组-Worker-11 ,1599545066963 false
From DemoThreadFactory's 订单创建组-Worker-12 ,1599545066963 false
From DemoThreadFactory's 订单创建组-Worker-13 ,1599545066963 false
From DemoThreadFactory's 订单创建组-Worker-14 ,1599545066964 false
From DemoThreadFactory's 订单创建组-Worker-15 ,1599545066964 false
From DemoThreadFactory's 订单创建组-Worker-3 ,1599545067961 true
From DemoThreadFactory's 订单创建组-Worker-1 ,1599545068962 true
From DemoThreadFactory's 订单创建组-Worker-2 ,1599545069963 true
From DemoThreadFactory's 订单创建组-Worker-4 ,1599545070964 true
From DemoThreadFactory's 订单创建组-Worker-5 ,1599545071965 true
From DemoThreadFactory's 订单创建组-Worker-6 ,1599545072966 true
From DemoThreadFactory's 订单创建组-Worker-7 ,1599545073966 true
From DemoThreadFactory's 订单创建组-Worker-8 ,1599545074967 true
From DemoThreadFactory's 订单创建组-Worker-9 ,1599545075967 true
From DemoThreadFactory's 订单创建组-Worker-10 ,1599545076968 true
使用令牌桶算法来进行限流
令牌桶算法的原理是系统以恒定的速率产生令牌,然后把令牌放到令牌桶中,令牌桶有一个容量,当令牌桶满了的时候,再向其中放令牌,那么多余的令牌会被丢弃;当想要处理一个请求的时候,需要从令牌桶中取出一个令牌,如果此时令牌桶中没有令牌,那么则拒绝该请求。从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”。这种算法可以应对突发程度的请求,因此比漏桶算法好。
令牌桶算法示意图:
image.png限流工具类RateLimiter
Google开源工具包Guava提供了限流工具类RateLimiter,可以非常方便的控制系统每秒吞吐量.
示例:RateLimiter.create(5)
创建QPS为5的限流对象,后面又调用rateLimiter.setRate(10);
将速率设为10,输出中分2段,第一段每次输出相隔200毫秒,第二段每次输出相隔100毫秒,可以非常精准的控制系统的QPS。
public class RateLimiterTest {
public static void main(String[] args) {
//permitsPerSecond=1 即QPS=1
RateLimiter rateLimiter = RateLimiter.create(1);
for (int i = 0; i < 10; i++) {
//调用acquire会根据QPS计算需要睡眠多久,返回耗时时间
double acquire = rateLimiter.acquire();
System.out.println(System.currentTimeMillis()+"耗时"+acquire);
}
System.out.println("----------");
//可以随时调整速率,我们将qps调整为10
rateLimiter.setRate(10);
for (int i = 0; i < 10; i++) {
//rateLimiter.acquire();
double acquire = rateLimiter.acquire();
System.out.println(System.currentTimeMillis()+"耗时"+acquire);
}
}
}
输出:
1599545866820耗时0.0
1599545867820耗时0.998552
1599545868819耗时0.997836
1599545869820耗时0.999819
1599545870820耗时0.998723
1599545871819耗时0.999232
1599545872819耗时0.999328
1599545873819耗时1.000024
1599545874819耗时0.99995
1599545875820耗时0.999597
----------
1599545876819耗时0.998575
1599545876920耗时0.099593
1599545877020耗时0.098779
1599545877119耗时0.098661
1599545877220耗时0.099558
1599545877319耗时0.098965
1599545877419耗时0.099139
1599545877520耗时0.099768
1599545877620耗时0.098729
1599545877720耗时0.0986