Java线程池原理
本篇文章主要介绍Java线程池的原理以及源码的分析
线程池的介绍
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。
线程池的优点
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
什么时候使用线程池
1:单个任务处理时间比较短
2:需要处理的任务数量很大
线程池原理(使用ThreadPoolExecutor)
这里使用JDK1.8中的ThreadPoolExecutor分析线程池的原理
Executor
Executor:执行提交的线程任务的对象。这个接口提供了一种将任务提交与每个任务将如何运行实现了分离,包括线程使用、调度等细节。该接口只定义了一个execute()方法。
execute():将任务提交给线程池,由线程池为该任务创建线程并启动。注意这个方法没有返回值,获取不到线程执行结果
ExecutorService
提供用于管理终止的方法如 shutDown()和shutDownNow()用于关闭线程池的方法以及判断线程池是否关闭的方法如,isShutdown(),isTerminated()的方法
提供了可以生成用于跟踪一个或多个异步任务进度的方法如,invokeAll(),submit()。这些方法的返回值都是Future类型,可以获取线程的执行结果。
ThreadPoolExecutor成员变量
查看ThreadPoolExecutor的成员变量
ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,ctl是一个Integer, 它包含两部分的信息: 高三位表示线程池的运行状态 (runState) 和低29位表示线程池内有效线程的数量 (workerCount),
线程池的生命周期,总共有五种状态
RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
进入TERMINATED的条件如下:
线程池不是RUNNING状态;
线程池状态不是TIDYING状态或TERMINATED状态;
如果线程池状态是SHUTDOWN并且workerQueue为空;
workerCount为0;
设置TIDYING状态成功。
线程池的生命周期流程图
还有三个关于ctl的方法
runStateOf:获取运行状态;
workerCountOf:获取活动线程数;
ctlOf:获取运行状态和活动线程数的值
ThreadPoolExecutor构造函数
下面解释构造函数的参数含义
corePoolSize:核心线程数量,当有新任务在execute()方法提交时,会执行以下判断:
a):如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
b):如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,当workQueue未满的时候任务添加到workQueue中,当workQueue满时才创建新的线程去处理任务;
c):如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;
d):如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务;
所以,任务提交时,判断的顺序为 corePoolSize –> workQueue –> maximumPoolSize。
maximumPoolSize:最大线程数量;
workQueue:等待队列,当任务提交时,如果线程池中的线程数量大于等于corePoolSize的时候,把该任务封装成一个Worker对象放入等待队列;
workQueue:保存等待执行的任务的阻塞队列,当提交一个新的任务到线程池以后, 线程池会根据当前线程池中正在运行着的线程的数量来决定对该任务的处理方式,主要有以下几种处理方式:
直接切换:这种方式常用的队列是SynchronousQueue。
使用无界队列:一般使用基于链表的阻塞队列LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是corePoolSize,而maximumPoolSize就不会起作用了。当线程池中所有的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入等待队列中。
使用有界队列:一般使用ArrayBlockingQueue。使用该方式可以将线程池的最大线程数量限制为maximumPoolSize,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可能的降低线程池对资源的消耗,就需要合理的设置这两个数量。
如果要想降低系统资源的消耗(包括CPU的使用率,操作系统资源的消耗,上下文环境切换的开销等), 可以设置较大的队列容量和较小的线程池容量, 但这样也会降低线程处理任务的吞吐量。
如果提交的任务经常发生阻塞,那么可以考虑通过调用 setMaximumPoolSize() 方法来重新设定线程池的容量。
如果队列的容量设置的较小,通常需要将线程池的容量设置大一点,这样CPU的使用率会相对的高一些。但如果线程池的容量设置的过大,则在提交的任务数量太多的情况下,并发量会增加,那么线程之间的调度就是一个要考虑的问题,因为这样反而有可能降低处理任务的吞吐量。
keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:
AbortPolicy:直接抛出异常,这是默认策略;
CallerRunsPolicy:用调用者所在的线程来执行任务;
DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
DiscardPolicy:直接丢弃任务;
线程池的结构
线程池中的核心线程和非核心线程,没有什么区别,都是线程,只不过人为的规则线程池中的一部分线程叫核心线程
线程池的流程
ThreadPoolExecutor执行execute方法分下面4种情况。
1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用
RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。
execute()源码分析
为什么线程池中的线程可以重复利用?
我们知道线程在执行完run()方法里面的逻辑后就会被GC回收,那么线程池是怎样保持线程的存活,并且重复利用线程。
在线程池中使用Worker类来包装向线程池中添加的Runnable线程任务。首先来分析一下addWorker()方法
在execute()方法中使用addWorker()方法的地方只有在添加核心线程和非核心线程的时候调用。
addWorker源码分析
从上面可以看出这是一个添加线程的过程,并没有看到,线程池是如何维护线程不被销毁,从而达到重复利用的
从addWorker()中我们可以看到,向线程池中添加的Runnable被包装成Worker对象,下面就来查看Worker对象,从中寻找为什么线程池中的线程可以重复利用的答案。
Worker源码分析
Worker类继承Runnable,和AbstractQueuedSynchronizer(这个类奠定了Java并发包的基础,很重要,可是我还没有深入研究)
查看run()方法,调用runWorker,并将自身作为参数
查看runWorker(),在前面的addWorker()方法在最后是执行了start()方法,也就是Worker的run()方法,进而执行了runWorker()方法。
这个while就是线程池中线程不被销毁的原因所在,在Worker的run方法中,如果while一直执行下去,那么Worker这个继承了Runnable接口的线程就会一直执行下去,而我们知道线程池中任务的载体是Worker,如果Worker一直执行下去,就表示该载体可以一直存在,换的只是载体上我们通过execute()方法添加的Runnable任务而已。
那么如何保证while方法一直执行下去
在第一次执行完while后task设置为null,那么就要保证task=getTask()!=null
查看getTask(),从名字可以看出这是获取一个任务。
通过代码中的注释,我们这就弄明白线程池的工作原理 以及线程池中如何保证线程(Worker)重复利用,不被销毁。