java进阶

多线程并发总结(六)--阻塞队列和线程池

2021-01-18  本文已影响0人  Jack_Ou

1. 阻塞队列

2. 阻塞队列的用途

​ 阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

2.1 为什么有生产者消费者模式

​ 在并发编程中,为了平衡生产线程和消费线程的工作能力来提高程序整体处理数据的速度,因此生产者消费者模式就有了用武之地。

​ 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。

因此就需要将生产者和消费者隔离开,并且用一个容器将产品装好。生产者生产好产品就把产品放入容器,消费者需要的时候就从容器中拿。 而这个容器就可以用队列来实现。然而有容器为空和满的两种情况导致生产者在容器满的时候无法放入队列,消费者当容器为空的时候无法从队列获取产品。所以阻塞队列就应运而生。

2.2 阻塞队列接口(BlockingQueue)

​ 下面看看BlockingQueue中定义的方法区别

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(E) offer(E) put(E) offer(E, time, unit)
移除方法 remove() poll() take() poll(E, time, unit)
检查队列方法 element() peek() 不可用 不可用

2.3 有哪些阻塞队列

3.线程池

3.1 使用线程池的好处

3.2 线程池相关的类

线程池类关系图.png

3.3 线程池的创建各个参数含义

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, /*核心线程数,最大线程数*/

​ long keepAliveTime, TimeUnit unit, /*空闲线程存活时间 */

​ BlockingQueue<Runnable> workQueue, /*阻塞队列 */

ThreadFactory threadFactory, /*线程工厂 */

RejectedExecutionHandler handler) /*线程池拒绝策略处理器 */

从构造方法可以看出创建线程池有五类7个参数可以配置,下面来一一阐述一下

3.3.1 corePoolSize

​ 线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;

​ 如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;

​ 如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

3.3.2 maximumPoolSize

​ 线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize

3.3.3 keepAliveTime

​ 线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用

3.3.4 TimeUnit

keepAliveTime的时间单位

3.3.5 workQueue

​ workQueue必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能。

​ 一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。

3.3.6 threadFactory

​ 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。

​ Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-数字-thread-数字”。

3.3.7 RejectedExecutionHandler

​ 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

3.4 线程池工作机制

线程池工作机制.png

3.5 提交任务

3.5.1 execute()
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
3.5.2 submit()

​ submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

3.6 关闭线程池

​ 可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程

​ 只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);  // 将线程池状态设置成SHUTDOWN
        interruptIdleWorkers();    // 中断空闲线程
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);  // 将线程池状态设置成STOP
            interruptWorkers();     // 中断所有线程
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

3.7 如何合理配置线程池

​ 要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

针对CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。(可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数)

针对混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。

针对优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行

执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行

测试用例代码见: git@github.com:oujie123/UnderstandingOfThread.git

上一篇 下一篇

猜你喜欢

热点阅读