Java高并发系列——检视阅读(七)

2021-10-02  本文已影响0人  卡斯特梅的雨伞

Java高并发系列——集合

JUC中常见的集合

JUC集合框架图

img

图可以看到,JUC的集合框架也是从Map、List、Set、Queue、Collection等超级接口中继承而来的。所以,大概可以知道JUC下的集合包含了一一些基本操作,并且变得线程安全。

Map

ConcurrentHashMap

功能和HashMap基本一致,内部使用红黑树实现的。

java8的ConcurrentHashMap为何放弃分段锁,为什么要使用CAS+Synchronized取代Segment+ReentrantLock...

特性:

  1. 迭代结果和存入顺序不一致
  2. key和value都不能为空,否则会空指针,这里和HashMap不一样,HashMap的key和value都可以为空。
  3. 线程安全的
ConcurrentSkipListMap

内部使用跳表实现的,放入的元素会进行排序,排序算法支持2种方式来指定:

  1. 通过构造方法传入一个Comparator
  2. 放入的元素实现Comparable接口

上面2种方式必选一个,如果2种都有,走规则1。

特性:

  1. 迭代结果和存入顺序不一致
  2. 放入的元素会排序
  3. key和value都不能为空
  4. 线程安全的

ConcurrentSkipListMap分析和使用

List

CopyOnWriteArrayList

实现List的接口的,一般我们使用ArrayList、LinkedList、Vector,其中只有Vector是线程安全的,可以使用Collections静态类的synchronizedList方法对ArrayList、LinkedList包装为线程安全的List,不过这些方式在保证线程安全的情况下性能都不高。

CopyOnWriteArrayList是线程安全的List,内部使用数组存储数据集合中多线程并行操作一般存在4种情况:读读、读写、写写、写读,这个只有在写写操作过程中会导致其他线程阻塞,其他3种情况均不会阻塞,所以读取的效率非常高。

可以看一下这个类的名称:CopyOnWrite,意思是在写入操作的时候,进行一次自我复制,换句话说,当这个List需要修改时,并不修改原有内容(这对于保证当前在读线程的数据一致性非常重要),而是在原有存放数据的数组上产生一个副本,在副本上修改数据,修改完毕之后,用副本替换原来的数组,这样也保证了写操作不会影响读。

注意:当然,做写操作的时候也是会用Lock加锁保证同步的,因为每次添加都会加锁且重新复制一个数组出来存储,因此,在保证读多写少的情况下,在进行写操作特别是初始化时也尽量调用addAll这些方法来一次性添加一个集合数据来避免不断地复制。

特性:

  1. 迭代结果和存入顺序一致
  2. 元素不重复
  3. 元素可以为空
  4. 线程安全的
  5. 读读、读写、写读3种情况不会阻塞;写写会阻塞
  6. 无界的

Set

ConcurrentSkipListSet

有序的Set,内部基于ConcurrentSkipListMap实现的,放入的元素会进行排序,排序算法支持2种方式来指定:

  1. 通过构造方法传入一个Comparator
  2. 放入的元素实现Comparable接口

上面2种方式需要实现一个,如果2种都有,走规则1

特性:

  1. 迭代结果和存入顺序不一致
  2. 放入的元素会排序
  3. 元素不重复
  4. 元素不能为空
  5. 线程安全的
  6. 无界的
CopyOnWriteArraySet

内部使用CopyOnWriteArrayList实现的,将所有的操作都会转发给CopyOnWriteArrayList。

注意:因为底层是用CopyOnWriteArrayList实现,因此CopyOnWriteArraySet实现元素不重复是用CopyOnWriteArrayList的indexOf方法从头到尾一个一个比较来实现的。

  public boolean addIfAbsent(E e) {
        Object[] snapshot = getArray();
        return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
            addIfAbsent(e, snapshot);
    }

特性:

  1. 迭代结果和存入顺序不一致
  2. 元素不重复
  3. 元素可以为空
  4. 线程安全的
  5. 读读、读写、写读 不会阻塞;写写会阻塞
  6. 无界的

Queue

Queue接口中的方法,我们再回顾一下:

操作类型 抛出异常 返回特殊值
插入 add(e) offer(e)
移除 remove() poll()
检查 element() peek()

3种操作,每种操作有2个方法,不同点是队列为空或者满载时,调用方法是抛出异常还是返回特殊值,大家按照表格中的多看几遍,加深记忆。

ConcurrentLinkedQueue

高效并发队列,内部使用链表实现的。

特性:

  1. 线程安全的
  2. 迭代结果和存入顺序一致
  3. 元素可以重复
  4. 元素不能为空
  5. 线程安全的
  6. 无界队列
Deque

先介绍一下Deque接口,双向队列(Deque)是Queue的一个子接口双向队列是指该队列两端的元素既能入队(offer)也能出队(poll),如果将Deque限制为只能从一端入队和出队,则可实现栈的数据结构。对于栈而言,有入栈(push)和出栈(pop),遵循先进后出原则。

一个线性 collection,支持在两端插入和移除元素。名称 deque 是“double ended queue(双端队列)”的缩写,通常读为“deck”。大多数 Deque 实现对于它们能够包含的元素数没有固定限制,但此接口既支持有容量限制的双端队列,也支持没有固定大小限制的双端队列。

此接口定义在双端队列两端访问元素的方法。提供插入、移除和检查元素的方法。每种方法都存在两种形式:一种形式在操作失败时抛出异常,另一种形式返回一个特殊值(nullfalse,具体取决于操作)。插入操作的后一种形式是专为使用有容量限制的 Deque 实现设计的;在大多数实现中,插入操作不能失败。

下表总结了上述 12 种方法:

img

此接口扩展了 Queue接口。在将双端队列用作队列时,将得到 FIFO(先进先出)行为。将元素添加到双端队列的末尾,从双端队列的开头移除元素。从 Queue 接口继承的方法完全等效于 Deque 方法,如下表所示:

Queue 方法 等效 Deque 方法
add(e) addLast(e)
offer(e) offerLast(e)
remove() removeFirst()
poll() pollFirst()
element() getFirst()
peek() peekFirst()
ConcurrentLinkedDeque

实现了Deque接口,内部使用链表实现的高效的并发双端队列

特性:

  1. 线程安全的
  2. 迭代结果和存入顺序一致
  3. 元素可以重复
  4. 元素不能为空
  5. 线程安全的
  6. 无界队列
BlockingQueue

关于阻塞队列,上一篇有详细介绍。

疑问:

Q:跳表是什么?

跳表,是基于链表实现的一种类似“二分”的算法。它可以快速的实现增,删,改,查操作。跳跃列表的平均查找和插入时间复杂度都是O(logn)。跳表是通过维护一个多层次的链表来构建多层索引结构,是一种以空间来换时间的数据结构,多层链表中的每一层链表元素是前一层链表元素的子集。一开始时,算法在最稀疏的层次进行搜索,直至需要查找的元素在该层两个相邻的元素中间。这时,算法将跳转到下一个层次,重复刚才的搜索,直到找到需要查找的元素为止。

之所以要用跳表,我们可以与单链表进行比较,单链表中查找某个数据的时候需要的时间复杂度为O(n).而我们多个跳表构建多层索引,使得查询效率提升到O(logn)。

数据结构与算法——跳表

跳表

接口性能提升实战篇

需求:电商app的商品详情页,需要给他们提供一个接口获取商品相关信息:

  1. 商品基本信息(名称、价格、库存、会员价格等)
  2. 商品图片列表
  3. 商品描述信息(描述信息一般是由富文本编辑的大文本信息)

普通接口实现伪代码如下:

public Map<String,Object> detail(long goodsId){
    //创建一个map
    //step1:查询商品基本信息,放入map
    map.put("goodsModel",(select * from t_goods where id = #gooldsId#));
    //step2:查询商品图片列表,返回一个集合放入map
    map.put("goodsImgsModelList",(select * from t_goods_imgs where goods_id = #gooldsId#));
    //step3:查询商品描述信息,放入map
    map.put("goodsExtModel",(select * from t_goods_ext where goods_id = #gooldsId#));
    return map;
}

上面这种写法应该很常见,代码很简单,假设上面每个步骤耗时200ms,此接口总共耗时>=600毫秒

整个过程是按顺序执行的,实际上3个查询之间是没有任何依赖关系,所以说3个查询可以同时执行,那我们对这3个步骤采用多线程并行执行实现如下:

示例:

public class GetProductDetailTest {

    //自定义包含策略
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 60,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
            new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());

    /**
     * 获取商品基本信息
     *
     * @param goodsId 商品id
     * @return 商品基本信息
     * @throws InterruptedException
     */
    public String goodsDetailModel(long goodsId) throws InterruptedException {
        //模拟耗时,休眠200ms
        TimeUnit.MILLISECONDS.sleep(200);
        return "商品id:" + goodsId + ",商品基本信息....";
    }

    /**
     * 获取商品图片列表
     *
     * @param goodsId 商品id
     * @return 商品图片列表
     * @throws InterruptedException
     */
    public List<String> goodsImgsModelList(long goodsId) throws InterruptedException {
        //模拟耗时,休眠200ms
        TimeUnit.MILLISECONDS.sleep(200);
        return Arrays.asList("图1", "图2", "图3");
    }

    /**
     * 获取商品描述信息
     *
     * @param goodsId 商品id
     * @return 商品描述信息
     * @throws InterruptedException
     */
    public String goodsExtModel(long goodsId) throws InterruptedException {
        //模拟耗时,休眠200ms
        TimeUnit.MILLISECONDS.sleep(200);
        return "商品id:" + goodsId + ",商品描述信息......";
    }

    public Map<String,Object> getGoodsDetail(long goodsId) throws ExecutionException, InterruptedException {
        Map<String, Object> result = new HashMap<>();
        Future<String> gooldsDetailModelFuture  = executor.submit(() -> goodsDetailModel(goodsId));
        Future<List<String>> goodsImgsModelFuture = executor.submit(() -> goodsImgsModelList(goodsId));
        //异步获取商品描述信息
        Future<String> goodsExtModelFuture = executor.submit(() -> goodsExtModel(goodsId));
        result.put("gooldsDetailModel", gooldsDetailModelFuture.get());
        result.put("goodsImgsModelList", goodsImgsModelFuture.get());
        result.put("goodsExtModel", goodsExtModelFuture.get());
        return result;
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        GetProductDetailTest detailTest = new GetProductDetailTest();
        long starTime = System.currentTimeMillis();
        Map<String, Object> map = detailTest.getGoodsDetail(1L);
        System.out.println(map);
        System.out.println("耗时(ms):" + (System.currentTimeMillis() - starTime));
    }
}

输出:

{goodsImgsModelList=[图1, 图2, 图3], gooldsDetailModel=商品id:1,商品基本信息...., goodsExtModel=商品id:1,商品描述信息......}
耗时(ms):255

可以看出耗时200毫秒左右,性能提升了2倍,假如这个接口中还存在其他无依赖的操作,性能提升将更加显著,上面使用了线程池并行去执行3次查询的任务,最后通过Future获取异步执行结果。

整个优化过程:

  1. 先列出无依赖的一些操作
  2. 将这些操作改为并行的方式

总结

  1. 对于无依赖的操作尽量采用并行方式去执行,可以很好的提升接口的性能

解决微服务日志的痛点

日志有什么用?

  1. 系统出现故障的时候,可以通过日志信息快速定位问题,修复bug,恢复业务
  2. 提取有用数据,做数据分析使用

本文主要讨论通过日志来快速定位并解决问题。

日志存在的痛点

先介绍一下多数公司采用的方式:目前比较流行的是采用springcloud(或者dubbo)做微服务,按照业务拆分为多个独立的服务,服务采用集群的方式部署在不同的机器上,当一个请求过来的时候,可能会调用到很多服务进行处理,springcloud一般采用logback(或者log4j)输出日志到文件中。当系统出问题的时候,按照系统故障的严重程度,严重的会回退版本,然后排查bug,轻的,找运维去线上拉日志,然后排查问题。

这个过程中存在一些问题

  1. 日志文件太大太多,不方便查找
  2. 日志分散在不同的机器上,也不方便查找
  3. 一个请求可能会调用多个服务,完整的日志难以追踪(没有完整的链路日志)
  4. 系统出现了问题,只能等到用户发现了,自己才知道(没有报错预警)

本文要解决上面的几个痛点,构建我们的日志系统,达到以下要求:

  1. 方便追踪一个请求完整的日志
  2. 方便快速检索日志
  3. 系统出现问题自动报警,通知相关人员

构建日志系统

方便追踪一个请求完整的日志

当一个请求过来的时候,可能会调用多个服务,多个服务内部可能又会产生子线程处理业务,所以这里面有两个问题需要解决:

  1. 多个服务之间日志的追踪
  2. 服务内部子线程和主线程日志的追踪,这个地方举个例子,比如一个请求内部需要给10000人发送推送,内部开启10个线程并行处理,处理完毕之后响应操作者,这里面有父子线程,我们要能够找到这个里面所有的日志

需要追踪一个请求完整日志,我们需要给每个请求设置一个全局唯一编号,可以使用UUID或者其他方式也行。

多个服务之间日志追踪的问题:当一个请求过来的时候,在入口处生成一个trace_id,然后放在ThreadLocal中,如果内部设计到多个服务之间相互调用,调用其他服务的时,将trace_id顺便携带过去。

父子线程日志追踪的问题:可以采用InheritableThreadLocal来存放trace_id,这样可以在线程中获取到父线程中的trace_id。

所以此处我们需要使用InheritableThreadLocal来存储trace_id。

使用了线程池处理请求的,由于线程池中的线程采用的是复用的方式,所以需要对执行的任务Runable做一些改造 包装。

public class TraceRunnable implements Runnable {
    private String tranceId;
    private Runnable target;

    public TraceRunnable(Runnable target) {
        this.tranceId = TraceUtil.get();
        this.target = target;
    }

    @Override
    public void run() {
        try {
            TraceUtil.set(this.tranceId);
            MDC.put(TraceUtil.MDC_TRACE_ID, TraceUtil.get());
            this.target.run();
        } finally {
            MDC.remove(TraceUtil.MDC_TRACE_ID);
            TraceUtil.remove();
        }
    }

    public static Runnable trace(Runnable target) {
        return new TraceRunnable(target);
    }
}

需要用线程池执行的任务使用TraceRunnable封装一下就可以了。

TraceUtil代码:

public class TraceUtil {

    public static final String REQUEST_HEADER_TRACE_ID = "com.ms.header.trace.id";
    public static final String MDC_TRACE_ID = "trace_id";

    private static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();

    /**
     * 获取traceid
     *
     * @return
     */
    public static String get() {
        String traceId = inheritableThreadLocal.get();
        if (traceId == null) {
            traceId = IDUtil.getId();
            inheritableThreadLocal.set(traceId);
        }
        return traceId;
    }

    public static void set(String trace_id) {
        inheritableThreadLocal.set(trace_id);
    }

    public static void remove() {
        inheritableThreadLocal.remove();
    }

}

日志输出中携带上trace_id,这样最终我们就可以通过trace_id找到一个请求的完整日志了。

方便快速检索日志

日志分散在不同的机器上,如果要快速检索,需要将所有服务产生的日志汇集到一个地方。

关于检索日志的,列一下需求:

  1. 我们将收集日志发送到消息中间件中(可以是kafka、rocketmq),消息中间件这块不介绍,选择玩的比较溜的就可以了
  2. 系统产生日志尽量不要影响接口的效率
  3. 带宽有限的情况下,发送日志也尽量不要去影响业务
  4. 日志尽量低延次,产生的日志,尽量在生成之后1分钟后可以检索到
  5. 检索日志功能要能够快速响应

关于上面几点,我们需要做的:日志发送的地方进行改造,引入消息中间件,将日志异步发送到消息中间件中,查询的地方采用elasticsearch日志系统需要订阅消息中间件中的日志,然后丢给elasticsearch建索引,方便快速检索,咱们来一点点的介绍。

日志发送端的改造

日志是由业务系统产生的,一个请求过来的时候会产生很多日志,日志产生时,我们尽量减少日志输出对业务耗时的影响,我们的过程如下:

  1. 业务系统内部引用一个线程池来异步处理日志,线程池内部可以使用一个容量稍微大一点的阻塞队列
  2. 业务系统将日志丢给线程池进行处理
  3. 线程池中将需要处理的日志先压缩一下,然后发送至mq

线程池的使用可以参考:JAVA线程池,这一篇就够了

引入mq存储日志

业务系统将日志先发送到mq中,后面由其他消费者订阅进行消费。日志量比较大的,对mq的要求也比较高,可以选择kafka,业务量小的,也可以选取activemq。

使用elasticsearch来检索日志

elasticsearch(以下简称es)是一个全文检索工具,具体详情可以参考其官网相关文档。使用它来检索数据效率非常高。日志系统中需要我们开发一个消费端来拉取mq中的消息,将其存储到es中方便快速检索,关于这块有几点说一下:

  1. 建议按天在es中建立数据库,日志量非常大的,也可以按小时建立数据库。查询的时候,时间就是必选条件了,这样可以快速让es定位到日志库进行检索,提升检索效率
  2. 日志常见的需要收集的信息:trace_id、时间、日志级别、类、方法、url、调用的接口开始时间、调用接口的结束时间、接口耗时、接口状态码、异常信息、日志信息等等,可以按照这些在es中建立索引,方便检索。
日志监控报警——可自定义配置报警

日志监控报警是非常重要的,这个必须要有,日志系统中需要开发监控报警功能,这块我们可以做成通过页面配置的方式,支持报警规则的配置,如日志中产生了某些异常、接口响应时间大于多少、接口返回状态码404等异常信息的时候能够报警,具体的报警可以是语音电话、短信通知、钉钉机器人报警等等,这些也做成可以配置的

日志监控模块从mq中拉取日志,然后去匹配我们启用的一些规则进行报警。

日志处理结构图如下:
image.png

高并发中常见的限流方式

常见的限流的场景

  1. 秒杀活动,数量有限,访问量巨大,为了防止系统宕机,需要做限流处理
  2. 国庆期间,一般的旅游景点人口太多,采用排队方式做限流处理
  3. 医院看病通过发放排队号的方式来做限流处理。

常见的限流算法

  1. 通过控制最大并发数来进行限流
  2. 使用漏桶算法来进行限流
  3. 使用令牌桶算法来进行限流

通过控制最大并发数来进行限流

以秒杀业务为例,10个iphone,100万人抢购,100万人同时发起请求,最终能够抢到的人也就是前面几个人,后面的基本上都没有希望了,那么我们可以通过控制并发数来实现,比如并发数控制在10个,其他超过并发数的请求全部拒绝,提示:秒杀失败,请稍后重试。

单机中的JUC中提供了这样的工具类:Semaphore:如果是集群,则可以用redis或者zk代替Semaphore

示例:

public class MaxAccessLimiter {

    private static Semaphore limiter = new Semaphore(5);
    //自定义包含策略
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 60,
            TimeUnit.SECONDS, new SynchronousQueue(),
            new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {
        for (int i = 0; i < 20; i++) {
            executor.submit(() -> {
                boolean flag = false;
                try {
                    flag = limiter.tryAcquire(100, TimeUnit.MICROSECONDS);
                    if (flag) {
                        //休眠2秒,模拟下单操作
                        System.out.println(Thread.currentThread() + ",尝试下单中。。。。。");
                        TimeUnit.SECONDS.sleep(2);
                    } else {
                        System.out.println(Thread.currentThread() + ",秒杀失败,请稍微重试!");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    if (flag) {
                        limiter.release();
                    }
                }
            });
        }
           executor.shutdown();
    }
}

输出:

Thread[From DemoThreadFactory's 订单创建组-Worker-1,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-2,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-3,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-4,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-5,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-9,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-14,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-16,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-17,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-18,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-20,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-12,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-11,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-7,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-8,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-6,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-10,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-19,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-15,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-13,5,main],秒杀失败,请稍微重试!

使用漏桶算法来进行限流

国庆期间比较火爆的景点,人流量巨大,一般入口处会有限流的弯道,让游客进去进行排队,排在前面的人,每隔一段时间会放一拨进入景区。排队人数超过了指定的限制,后面再来的人会被告知今天已经游客量已经达到峰值,会被拒绝排队,让其明天或者以后再来,这种玩法采用漏桶限流的方式。

漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

漏桶算法示意图:

image.png

示例:代码中BucketLimit.build(10, 60, TimeUnit.MINUTES);创建了一个容量为10,流水为60/分钟的漏桶。

public class BucketLimitTest {

    public static class BucketLimit {
        static AtomicInteger threadNum = new AtomicInteger(1);
        //容量
        private int capcity;
        //流速
        private int flowRate;
        //流速时间单位
        private TimeUnit flowRateUnit;
        private BlockingQueue<Node> queue;
        //漏桶流出的任务时间间隔(纳秒)
        private long flowRateNanosTime;

        public BucketLimit(int capcity, int flowRate, TimeUnit flowRateUnit) {
            this.capcity = capcity;
            this.flowRate = flowRate;
            this.flowRateUnit = flowRateUnit;
            this.bucketThreadWork();
        }

        //漏桶线程
        public void bucketThreadWork() {
            this.queue = new ArrayBlockingQueue<Node>(capcity);
            //漏桶流出的任务时间间隔(纳秒)
            this.flowRateNanosTime = flowRateUnit.toNanos(1) / flowRate;
            System.out.println(TimeUnit.NANOSECONDS.toSeconds(this.flowRateNanosTime));
            Thread thread = new Thread(this::bucketWork);
            thread.setName("漏桶线程-" + threadNum.getAndIncrement());
            thread.start();
        }

        //漏桶线程开始工作
        public void bucketWork() {
            while (true) {
                Node node = this.queue.poll();
                if (Objects.nonNull(node)) {
                    //唤醒任务线程
                    LockSupport.unpark(node.thread);
                }
                //阻塞当前线程,最长不超过nanos纳秒
                //休眠flowRateNanosTime
                LockSupport.parkNanos(this.flowRateNanosTime);
            }
        }

        //返回一个漏桶
        public static BucketLimit build(int capcity, int flowRate, TimeUnit flowRateUnit) {
            if (capcity < 0 || flowRate < 0) {
                throw new IllegalArgumentException("capcity、flowRate必须大于0!");
            }
            return new BucketLimit(capcity, flowRate, flowRateUnit);
        }

        //当前线程加入漏桶,返回false,表示漏桶已满;true:表示被漏桶限流成功,可以继续处理任务
        public boolean acquire() {
            Thread thread = Thread.currentThread();
            Node node = new Node(thread);
            if (this.queue.offer(node)) {
                LockSupport.park();
                return true;
            }
            return false;
        }

        //漏桶中存放的元素
        class Node {
            private Thread thread;

            public Node(Thread thread) {
                this.thread = thread;
            }
        }
    }
    //自定义包含策略
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(15, 15, 60,
            TimeUnit.SECONDS, new SynchronousQueue(),
            new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
    public static void main(String[] args) {
        //容量为10,流速为1个/秒,即60/每分钟
        BucketLimit bucketLimit = BucketLimit.build(10, 60, TimeUnit.MINUTES);
        for (int i = 0; i < 15; i++) {
            executor.submit(() -> {
                boolean acquire = bucketLimit.acquire();
                System.out.println(Thread.currentThread().getName()+ " ," +System.currentTimeMillis() + " " + acquire);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

输出:

From DemoThreadFactory's 订单创建组-Worker-11 ,1599545066963 false
From DemoThreadFactory's 订单创建组-Worker-12 ,1599545066963 false
From DemoThreadFactory's 订单创建组-Worker-13 ,1599545066963 false
From DemoThreadFactory's 订单创建组-Worker-14 ,1599545066964 false
From DemoThreadFactory's 订单创建组-Worker-15 ,1599545066964 false
From DemoThreadFactory's 订单创建组-Worker-3 ,1599545067961 true
From DemoThreadFactory's 订单创建组-Worker-1 ,1599545068962 true
From DemoThreadFactory's 订单创建组-Worker-2 ,1599545069963 true
From DemoThreadFactory's 订单创建组-Worker-4 ,1599545070964 true
From DemoThreadFactory's 订单创建组-Worker-5 ,1599545071965 true
From DemoThreadFactory's 订单创建组-Worker-6 ,1599545072966 true
From DemoThreadFactory's 订单创建组-Worker-7 ,1599545073966 true
From DemoThreadFactory's 订单创建组-Worker-8 ,1599545074967 true
From DemoThreadFactory's 订单创建组-Worker-9 ,1599545075967 true
From DemoThreadFactory's 订单创建组-Worker-10 ,1599545076968 true

使用令牌桶算法来进行限流

令牌桶算法的原理是系统以恒定的速率产生令牌,然后把令牌放到令牌桶中,令牌桶有一个容量,当令牌桶满了的时候,再向其中放令牌,那么多余的令牌会被丢弃;当想要处理一个请求的时候,需要从令牌桶中取出一个令牌,如果此时令牌桶中没有令牌,那么则拒绝该请求从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”。这种算法可以应对突发程度的请求,因此比漏桶算法好。

令牌桶算法示意图:

image.png

限流工具类RateLimiter

Google开源工具包Guava提供了限流工具类RateLimiter,可以非常方便的控制系统每秒吞吐量.

示例:RateLimiter.create(5)创建QPS为5的限流对象,后面又调用rateLimiter.setRate(10);将速率设为10,输出中分2段,第一段每次输出相隔200毫秒,第二段每次输出相隔100毫秒,可以非常精准的控制系统的QPS。

public class RateLimiterTest {

    public static void main(String[] args) {
        //permitsPerSecond=1 即QPS=1
        RateLimiter rateLimiter = RateLimiter.create(1);
        for (int i = 0; i < 10; i++) {
            //调用acquire会根据QPS计算需要睡眠多久,返回耗时时间
            double acquire = rateLimiter.acquire();
            System.out.println(System.currentTimeMillis()+"耗时"+acquire);
        }
        System.out.println("----------");
        //可以随时调整速率,我们将qps调整为10
        rateLimiter.setRate(10);
        for (int i = 0; i < 10; i++) {
            //rateLimiter.acquire();
            double acquire = rateLimiter.acquire();
            System.out.println(System.currentTimeMillis()+"耗时"+acquire);
        }
    }
}

输出:

1599545866820耗时0.0
1599545867820耗时0.998552
1599545868819耗时0.997836
1599545869820耗时0.999819
1599545870820耗时0.998723
1599545871819耗时0.999232
1599545872819耗时0.999328
1599545873819耗时1.000024
1599545874819耗时0.99995
1599545875820耗时0.999597
----------
1599545876819耗时0.998575
1599545876920耗时0.099593
1599545877020耗时0.098779
1599545877119耗时0.098661
1599545877220耗时0.099558
1599545877319耗时0.098965
1599545877419耗时0.099139
1599545877520耗时0.099768
1599545877620耗时0.098729
1599545877720耗时0.0986
上一篇下一篇

猜你喜欢

热点阅读