线程池与阻塞队列

2022-01-06  本文已影响0人  舞鹤Roc

一、线程池

1、为什么需要使用线程池

2、如何创建线程池

// java.util.concurrent.ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

3、线程池参数

4、 拒绝策略

// java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        //移除队头元素
        e.getQueue().poll();
        //再尝试入队
        e.execute(r);
    }
}

5、常见四种线程池

newFixedThreadPool(nThread) 执行长期任务 =》\color{#FF0000}{LinkedBlockQueue}

可控制线程最大并发数(同时执行的线程数)
超出的线程会在队列中等待

newScheduledThreadPool() 一个任务一个任务执行的场景 =》\color{#FF0000}{DelayedWorkQueue}

支持定时及周期性任务执行

newCachedThreadPool() 执行很多短期异步的小程序或者负载较轻的服务器 =》\color{#FF0000}{SynchronousQueue}

线程数无限制
有空闲线程则复用空闲线程,若无空闲线程则新建线程
一定程序减少频繁创建/销毁线程,减少系统开销

newWorkStealingPool(int) Java8新增,使用目前机器上可用的处理器作为它的并行级别

ForkJoinPool工作窃取者,使用分治法,它的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”分发到不同的cpu核心上执行,执行完后再把结果收集到一起返回。

二、阻塞队列

1、阻塞队列的作用

在线程池中,往往就会用阻塞队列来保存那些暂时没有空闲线程可以直接执行的任务,等到线程空闲之后再从阻塞队列中弹出任务来执行。一旦队列为空,那么线程就会被阻塞,直到有新任务被插入为止。
阻塞队列是一种数据结构,用来存储任务,由线程池来控制对阻塞队列的操作(插入、弹出等)。

2、阻塞队列的核心方法

// java.util.concurrent.BlockingQueue
public interface BlockingQueue<E> extends Queue<E> {
    //将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
    boolean add(E e);
    //将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
    boolean offer(E e);
    //将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
    void put(E e) throws InterruptedException;
    //将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    //从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
    E take() throws InterruptedException;
    //在给定的时间里,从队列中获取值,如果没有取到会抛出异常。
    E poll(long timeout, TimeUnit unit) throws InterruptedException;
    //获取队列中剩余的空间。
    int remainingCapacity();
    //从队列中移除指定的值。
    boolean remove(Object o);
    //判断队列中是否拥有该值。
    public boolean contains(Object o);
    //将队列中值,全部移除,并发设置到给定的集合中。
    int drainTo(Collection<? super E> c);
    //指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。
    int drainTo(Collection<? super E> c, int maxElements);
}

3、JDK8提供了7个阻塞队列。

分别是:

如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

类似于无中介的直接交易,每一个put操作必须等待take操作,否则不能添加元素。

4、Redis阻塞队列

redis有一种数据结构是List(有序队列),常用命令为lpush,rpush,lpop,rpop,lrange,brpop,当我们一边 lpush(生成者) 另一边 bpop(消费者),就可以实现一种阻塞队列的用法。

Redis Brpop 命令移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

但是这样会潜在着,消费失败,消息丢失的风险,所以不会使用bpop,而是使用BRPOPLPUSHLREM来实现。
参见redis官方文档:安全的队列

上一篇下一篇

猜你喜欢

热点阅读