JAVA线程池
简介
线程池,从字面含义来看,是指管理一组同构工作线程的资源池。线程池与工作队列(任务队列)密切相关的,如图

- 工作队列(Work Queue)
在工作队列中保存了所有等待执行的任务 - 工作者线程(Worker Thread)
工作者线程,从工作队列中获取一个任务,执行任务,然后返回线程池等待下一个任务
好处
- 降低资源消耗
通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。 - 提高响应速度
任务到达时,无需等待线程创建即可立即执行。 - 提高线程的可管理性
线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。 - 减少资源抢占
可以防止过多线程相互竞争资源而使程序耗尽内存或失败
总体设计
Java中的线程池核心实现类是ThreadPoolExecutor,ThreadPoolExecutor的UML类图如下

生命周期
为了解决Executor
服务的生命周期问题,ExecutorService
对Executor
进行了扩展,添加了一些用于生命周期管理的方法,如下
public interface ExecutorService extends Executor {
//关闭服务,会先完成以及提交的任务而不再接收新的任务
void shutdown();
//暴力关闭服务,尝试取消所有运行中的任务,并且不再启动队列中尚未开始的任务
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
//提交指定任务去执行
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
//执行给定的任务,返回所有个任务的结果
<T> List<Future<T>> invokeAll(Collection<?extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
//执行给定的任务,返回其中一个任务的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService
的生命周期有5种状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED下图根据jdk8的源码,所画的生命周期流程图:

设置线程池的大小
线程池的理想大小取决于被提交任务的类型以及部署系统的特性。例如对于计算密集型任务线程池大小设置为通常能实现更优的利用率,对于I/O密集型操作线程池的规模应该更大。也可以通过设置不同大小的的线程池运行程序观察CPU利用率的水平,从而找到更优的线程池大小。
当然CPU周期并不是唯一影响线程池大小的资源,还包括内存、文件句柄、套接字句柄和数据库连接等。
默认线程池
JAVA类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors
中的静态工厂方法之一来创建一个线程池:
-
newFixedThreadPool
创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这是线程池的规模将不再变化(如果某个线程由于未预期的Exception而结束,那么线程池会补充一个新的线程) -
newCachedThreadPool
创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制 -
newSingleThreadExecutor
单一线程池,它创建单个工作者来执行任务,如果线程异常结束,会创建另一个线程来替代。同时能够确保任务在队列中的顺序来串行执行(FIFO、LIFO、优先级) -
newScheduledThreadPool
定时线程池,它会创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer
配置ThreadPoolExecutor
ThreadPoolExecutor
是一个灵活的、稳定的线程池,允许进行各种定制。如果默认的执行策略不能满足需求,那么可以通过ThreadPoolExecutor的构造函数来实例化一个对象,并且根据自己 的需求来定制,构造函数如下(7大参数):
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler){...}
-
corePoolSize
基本大小
表示线程池的目标大小,即在没有任务执行时线程池的大小,并且只有工作队列满了的情况才会创建超过这个数量的线程 -
maximumPoolSize
最大大小
表示同时活动的线程数量的上限 -
keepAliveTime
存活时间
如果某个空闲的线程池超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止 -
unit
时间单位
参数keepAliveTime的时间单位,如秒、分钟、小时等 -
workQueue
一个阻塞队列,用来存储等待执行的任务,一般有3种:- 无界队列
LinkedBlockingQueue
无界队列,newFixedThreadPool和newSingleThreadExecutor在默认情况下就是使用无界队列 - 有界队列
ArrayBlockingQueue
有界队列,是一种更稳妥的资源管理策略,有助于减少内存使用量,降低CPU的使用率,同时还可以减少上下文切换,但是付出的代价是可能会限制吞吐量 - 同步移交
SynchronousQueue
,SynchronousQueue不是一个真正的队列,而是一直在线程之间移交的机制。
例如,要将一个元素放到其中,必须有另一个线程正在等待接受这个新的元素,如果没有线程正常等待,并且线程池的当前大小小于最大值,那么将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝,newCachedThreadPool
就是使用这种队列
当然也可以使用其他队列,例如PriorityBlockingQueue等
注
:只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么就有可能导致线程“饥饿”死锁问题,此时应该使用newScheduledThreadPool
- 无界队列
-
threadFactory
线程工厂,主要用来创建线程。默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂,可以定制线程池的配置信息 -
handler
当有界工作队列被填满后,饱和策略开始发挥作用,JDK提供了4种策略:-
ThreadPoolExecutor.AbortPolicy
中止(Abort),为默认饱和策略,该策略会抛弃任务并抛出RejectedExecutionException异常 -
ThreadPoolExecutor.DiscardPolicy
抛弃(Discard),抛弃任务,但是不抛出异常 -
ThreadPoolExecutor.DiscardOldestPolicy
抛弃最旧的(Discard-Oldest),抛弃队列中队头的任务,然后重新尝试提交新的任务 -
ThreadPoolExecutor.CallerRunsPolicy
调用者运行(Caller-Runs),在调用execute的线程中执行该任务
-
源码解读
execute()
方法是Executor
中定义的方法,是线程池的核心方法,ThreadPoolExecutor
的实现如下
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以看出大致分为三步
- 如果当前线程数小于corePoolSize,将创建一个新线程,并将command做为第一个任务。调用addWorker()方法原子性地检查运行状态和工作者线程数量,返回false来防止错误警报,该错误警报在不应该添加线程的情况下会增加线程。
- 如果一个任务成功入队,仍需要再次检查是否应该添加一个线程(因为可能距离上次检查有的线程已经死亡)或者线程池已经关闭。因此,重新检查状态后,工作者线程执行任务或者拒绝任务并且触发饱和策略
- 如果不能将任务入队,则尝试添加一个新线程。如果失败了,则代表已经停止或饱和了,所以拒绝这个任务,并且触发饱和策略