深入理解 Java线程池
前言
线程池之前需要是[多线程知识:https://www.jianshu.com/p/1b2daac373d5]
什么是线程池
顾名思义,线程池就是有一个容器[底层数据结构HashSet<Worker>
],容器用于存放多个线程。线程池中存在多个线程,如果需要执行任务的话,则从这个池子中取得一个线程对象用于执行此任务。[只是一个大概的粗略的介绍,具体细节请接着往下看!!!]
怎么使用线程池[对于CPU/IO密集型]
- CPU密集型:对于计算密集型的任务较多的场景的话,由于任务会占用大量的CPU时间片,多创建线程也没有空闲的CPU去处理,所以就可以适当的减少线程池中的线程个数(一般线程个数=cpu个数即可)。
- IO密集型:对于IO密集型的任务较多的场景的话,由于任务并不会占用大量的CPU时间片,相反会有更多的IO阻塞导致CPU空闲,这个时候可以适当地增加线程池中的个数(一般线程数=2*cpu个数即可),提高CPU利用率。
为什么要使用线程池
- 由于线程池的创建和销毁的过程会涉及到用户态和内核态的切换等一些消耗计算机资源的操作(此处针对的是内核线程模型,可见https://www.jianshu.com/p/39d2a4c050f8),导致效率的降低。线程池的作用就是利用一个数据结构容器维系一些线程,用于执行Application的Task,以此达到复用线程提高效率(提高并发、降低RT)。
- 有些可以实现与时间相关的任务,如定时任务、周期性任务等。
Java线程池Demo
public class ThreadPoolDemo {
public static void main(String[] args) throws Exception {
// 创建Cached线程池,运行速度最快、线程数=理论上可以理解为无穷大
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 创建混合线程池,运行速度中等、线程数=传入的 nThreads
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
// 创建单个线程的线程池,运行速度最慢,但是可以实现顺序执行,线程数=1
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
Future<Double> future1 = cachedThreadPool.submit(() -> doTask((e1, e2) -> e1*1.0 + e2, 1, 9));
Future<Double> future2 = cachedThreadPool.submit(() -> doTask((e1, e2) -> e1*1.0 - e2, 1, 9));
Future<Double> future3 = cachedThreadPool.submit(() -> doTask((e1, e2) -> e1*1.0 * e2, 1, 9));
Future<Double> future4 = cachedThreadPool.submit(() -> doTask((e1, e2) -> e1*1.0 / e2, 1, 9));
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
System.out.println(future4.get());
// 关闭资源
close(cachedThreadPool);
close(fixedThreadPool);
close(singleThreadExecutor);
close(scheduledThreadPool);
}
public static double doTask(BiFunction<Integer, Integer, Double> biFunction, int arg1, int arg2) {
return biFunction.apply(arg1, arg2);
}
public static void close(ExecutorService executorService) {
executorService.shutdown();
}
}
Executors
类是线程池的工具类(不推荐使用),有一些创建线程池的方法,最主要的方法如下:
- Executors.newCachedThreadPool():此方法是创建cached的线程池,此线程池由于来一个需要执行的Task,就会创建一个线程来执行Task(线程池的最大数量:理论上无穷大),所以速度也是最快的。此线程适用大量非耗时的任务,如果大量耗时的任务的话,要么会导致CPU被打满,影响其他业务任务。
- Executors.newFixedThreadPool(10):此方法是创建混合[给定数量]的线程池。此线程也是使用场景比较广泛的,CPU密集型和IO密集型都是可以,但是需要根据不同的场景设定不同的nThread的值。
- Executors.newSingleThreadExecutor():此方法是创建一个单个线程的线程池,顾名思义,这个线程池中只有一个线程,所有的任务都是顺序排队处理的,所以速度也是最慢的,但是也因为这个特性导致可以任务的顺序执行,适用于一些特殊的场景。
- Executors.newScheduledThreadPool(10):此方法是创建一个可以可以让Task在给定的延迟后运行或定期执行。适合于一些定时和周期性的场景。
前三种方法底层都是调用的同一个方法,如下
ThreadPoolExecutor
如果是使用Java原生的线程池,推荐使用ThreadPoolExecutor
先来看看类层级结构
类层级结构-
Executor
:ThreadPoolExecutor
线程池的顶级接口,此借口中只定义了一个提交执行Task的方法。
-
ExecutorService
:此接口是Executor
接口的扩展,其中定义了很多的有关于管理线程池的方法,例如:shutdown()
、submit()
等。
-
AbstractExecutorService
:此抽象类,实现了ExecutorService
接口,并对其中的submit()
和InvokeAny
的相关方法给出了一个默认实现。
-
ThreadPoolExecutor
:此对象是使用频率较高的线程池类,它对AbstractExecutorService
进行进一步的实现,包括至关重要的execute
以及shutdown/shutdownNow
方法,除此之外,还定义了ctl
和一些数据结构来维护线程池的运行,还提供了线程工厂对象创建线程,还提供了基本的拒绝策略等。
构造方法
构造方法一
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:除非设置了
allowCoreThreadTimeOut
,即使它们处于空闲状态也要保留在池中的线程数。 - maximumPoolSize:池中允许的最大线程数。
- keepAliveTime:当线程数大于corePoolSize数时,这是多余的空闲线程将在终止之前等待新任务的最长时间
- unit:
keepAliveTime
参数的时间单位。 - workQueue:在执行任务之前用于保留任务的队列。此队列将仅保存由
execute
方法提交的Runnable
且多于corePoolSize
任务,默认为LinkedBlockingQueue
,也可以自定义。 - threadFactory:执行程序创建新线程时使用的工厂,默认使用
DefaultThreadFactory
,也可以自定义。 - handler:达到了线程界限和队列容量而在执行被阻止时使用的处理程序(拒绝策略),默认
AbortPolicy
,也可以自定义。
构造方法二
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
构造方法三
public ThreadPoolExecutor
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
构造方法四
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
阻塞队列
主要的阻塞队列如下:
-
ArrayBlockingQueue
:底层数据结构为数组的有界阻塞队列,此队列符合FIFO(先进先出)原则。 -
LinkedBlockingQueue
:底层数据结构为链表的无界[理论上]阻塞队列,此队列符合FIFO(先进先出)原则。 -
ConcurrentLinkedQueue
:底层数据结构为链表的无界(理论上)线程安全的阻塞队列,此队列符合FIFO(先进先出)原则。 -
DelayQueue
:延迟队列,其中的元素只能在其延迟到期后才能使用。 -
SynchronousQueue
:同步队列,每个插入操作必须等待另一个线程进行相应的删除操作,反之亦然。同步队列没有任何内部容量,甚至没有一个容量。您无法在同步队列中peek
,因为仅当您尝试删除它时,该元素才存在。您不能插入元素(使用任何方法),除非另一个线程试图将其删除;您无法进行迭代,因为没有要迭代的内容。队列的head是第一个排队的插入线程试图添加到队列中的元素;如果没有这样的排队线程,则没有元素可用于删除,并且poll()
将返回null
。出于其他Collection
方法(例如contains
)的目的,SynchronousQueue
用作空集合。此队列不允许null
元素。
拒绝策略
拒绝策略抽象接口RejectedExecutionHandler
,其所有的实现类都在ThreadPoolExecutor
类中,当然你也可以自己自己的拒绝策略。
-
AbortPolicy
:顾名思义,此拒绝策略则是当提交执行的Task超过线程池的``maximumPoolSize会抛出
RejectedExecutionException。如果不指定拒绝策略,那么对于
Executors`工具类,此拒绝策略是默认的拒绝策略。 -
CallerRunsPolicy
:此拒绝策略,如果线程池没有被shutdown,那么就会在调用execute
方法的线程中去执行该线程(如果是main线程执行的execute
方法,那么就会使用主线程去执行该Task,那么会导致main线程被阻塞直到该Task执行结束)。 -
DiscardOldestPolicy
:此拒绝策略,静默(不会抛出异常)丢弃被拒绝的任务。 -
DiscardPolicy
:此拒绝策略,它丢弃最旧的未处理请求,然后重试execute
,如果线程池被关闭,在这种情况下,该任务将被丢弃。
自定义拒绝策略使用Demo
public class ThreadPoolRejectedExecutionHandlerDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(
Runnable r, // 被拒绝的任务Task
ThreadPoolExecutor executor // 线程池对象
) {
// do something
}
});
// 执行Task
executor.execute(() -> doTask());
// 关闭资源
close(executor);
}
public static void doTask() {
System.out.println("do Task");
}
public static void close(ExecutorService executorService) {
executorService.shutdown();
}
}
线程池详解
ThreadPoolExecutor
类中有一个ctl
的原子整数的属性,该属性巧妙的表示了两个属性值(workerCount
、runState
),workerCount
指示线程的有效数量,runState
指示线程池的生命周期(是否运行,关闭等)。
-
runState源码注释runState
:ctl
高3位表示的是线程的一些生命状态。
-
RUNNING
:运行中,此状态可以接受新任务并处理排队的任务。 -
SHUTDOWN
:关闭中状态,此状态不接受新任务,而是处理排队的任务。 -
STOP
:停止状态,不接受新任务,不处理排队的任务以及中断进行中的任务 -
TIDYING
:所有任务已终止,workerCount为零,转换为状态TIDYING的线程将运行terminated()
方法终结线程池。 -
TERMINATED
:terminated()
方法(默认空实现,可以自己根据场景进行自定义)运行完毕,代表线程池正在的停止。
-
-
workerCount
:ctl
低29位表示的是线程池中的线程数量,可以轻易的计算出一个线程池中最大可容纳的线程数为2 ^ 29)-1(约5亿个)。
为什么要使用一个ctl
的原子整数来表示这些值呢?
- 原子整数通过CAS以较高效率保证线程安全,毕竟线程池本身就要天然支持多线程环境。
- 使用一个属性表示两个属性,无论是从空间消耗,还是维护成本来说都是比较理想的。
- 总所周知,计算机底层硬件都是通过位运算实现一切的复杂运算的,那么位运算就天然比其他运输效率要高。
线程池模型(生命周期)
线程池模型-
RUNNING
:运行中,此状态可以接受新任务并处理排队的任务。 -
RUNNING
->SHUTDOWN
:执行shutdown()方法,将会导致线程池状态又RUNNING
->SHUTDOWN
的变迁,具体的shutdown()下面会有介绍。 -
RUNNING
->STOP
:执行shutdownNow()方法,将会导致线程池状态又RUNNING
->STOP
的变迁,具体的shutdown()下面会有介绍。 -
SHUTDOWN
/STOP
->TIDYING
:所有任务已终止,workerCount为零,状态转换为TIDYING。 -
TIDYING
->TERMINATED
:状态TIDYING的线程将运行terminated()
方法,会导致状态变迁为TERMINATED
。
shutdown VS shutdownNow
- shutdown:此方法是停止线程的方法,不会接受新提交的任务,并且会将当前线程池中的工作线程的任务和阻塞队列中的所有任务都执行完。
- shutdownNow:此方法是停止线程的方法,不会接受新提交的任务,也不会执行阻塞队列中的所有任务,并且会尝试停止当前线程池中的工作线程的任务。
线程池执行流程
线程提交顺序 VS 线程执行顺序
提交顺序
提交顺序- 应用程序调用
execute
方法向线程池中提交Task执行。 - 首先会向corePool中提交Task,如果corePool中有空闲的线程或者数量<
maximumPoolSize
,则选择/创建一个线程执行Task。 - 否则,会将Task提交到Queue中,如果Queue队列未满,那么增将Task任务添加至Queue等待。
- 否则,会将Task提交到临时Pool中(如果有的话,临时Pool=
maximumPoolSize
-corePoolSize
),如果临时Pool有空闲或者数量<maximumPoolSize
,则选择/创建一个线程执行Task。 - 否则,将会调用拒绝策略(默认是抛出异常),当然还有其他的实现或者自定义实现。
执行顺序
执行顺序- 执行顺序与提交顺序有一些差别
- 首先会执行corePool和临时Pool中的线程任务,执行顺序不固定(抢占式)。
- 等待Queue中的任务,是处于等待状态。只有当corePool和临时Pool中线程有空闲的时候才会被执行。
submit VS execute
上源码
public Future<?> submit(Runnable task)
可见本质上还是调用的
execute
方法,但是在执行execute
方法之前对任务进行了额外的操作,将Runnable
的Task通过newTaskFor
方法转为RunnableFuture
Task,目的是为了能够拿到执行的结果,此处返回结果默认为null。
public <T> Future<T> submit(Runnable task, T result)
与
1
中类似,T result
是指定的任务的返回值
public <T> Future<T> submit(Callable<T> task)
与
1
中类似,不同的是,此次提交的Task是Callable<T>
类型的,也就是有返回值的。