Java多线程(三)线程池
为什么使用线程池
当我们在使用线程时,如果每次需要一个线程时都去创建一个线程,这样实现起来很简单,但是会有一个问题:当并发线程数过多时,并且每个线程都是执行一个时间很短的任务就结束时,这样创建和销毁线程的时间要比花在实际处理任务的时间要长的多,在一个JVM里创建太多的线程可能会导致由于系统过度消耗内存或切换过度导致系统资源不足而导致OOM问题;
线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。
使用线程池的优势与存在的风险
优势
- 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗。
- 提高系统响应速度,当有任务到达时,无需等待新线程的创建便能立即执行。
- 通过适当地调整线程池中的线程数目,可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
- 更强大的功能,线程池提供了定时、定期以及可控线程数等功能的线程池,使用方便简单。
风险
死锁
线程池引入了一种死锁可能,当所有池线程都在等待已阻塞的等待队列中另一任务的执行结果,但这一任务却因为没有未被占用的线程而不能运行,这种情况就导致了死锁。如:当线程池被用来实现涉及许多交互对象的模拟,被模拟的对象可以相互发送查询,这些查询接下来作为排队的任务执行,查询对象又同步等待着响应时,会发生这种情况。
资源不足
如果线程池太大,那么被这些线程消耗的资源可能严重地影响系统性能。在线程之间进行切换将会浪费时间,而且使用超出比实际需要的线程可能会引起资源匮乏问题,因为池线程正在消耗一些资源,而这些资源可能会被其它任务更有效地利用。
线程池的大小需要根据系统运行的软硬件环境以及应用本身的特点来决定。 一般来说, 如果代码结构合理的话, 线程数目与 CPU数量相适合即可。 如果线程运行时可能出现阻塞现象, 可相应增加池的大小; 如有必要可采用自适应算法来动态调整线程池的大小, 以提高 CPU 的有效利用率和系统的整体性能。
并发错误
线程池和其它排队机制依靠使用 wait() 和 notify() 方法,要特别注意并发错误, 要从逻辑上保证程序的正确性, 注意避免死锁现象的发生。
线程泄漏
线程池中一个严重的风险是线程泄漏,当从池中除去一个线程以执行一项任务,而在任务完成后该线程却没有返回池时,会发生这种情况。发生线程泄漏的一种情形出现在任务抛出一个 RuntimeException 或一个 Error 时。如果线程池类没有捕捉到它们,那么线程只会退出而线程池的大小将会永久减少一个。当这种情况发生的次数足够多时,线程池最终为空,系统将停止,因为没有可用的线程来处理任务。
有些任务可能会永远等待某些资源或来自用户的输入,而这些资源又不能保证一定可用或着用户一定有输入,诸如此类的任务会永久停止,而这些停止的任务也会引起和线程泄漏同样的问题。如果某个线程被这样一个任务永久地消耗着,那么它实际上就被从池除去了。对于这样的任务,要么只给予它们自己的线程,要么让它们等待有限的时间。
请求过载
请求过多压垮了服务器,这种情况是可能的。在这种情形下,我们可能不想将每个到来的请求都排队到我们的工作队列,因为排在队列中等待执行的任务可能会消耗太多的系统资源并引起资源缺乏。在这种情形下决定如何做取决于你;在某些情况下,可以简单地抛弃请求,依靠更高级别的协议稍后重试请求,也可以用一个指出服务器暂时很忙的响应来拒绝请求。
如何正确的使用线程池
- 不要对那些同步等待其它任务结果的任务排队。这可能会导致上面所描述的那种形式的死锁,在那种死锁中,所有线程都被一些任务所占用,这些任务依次等待排队任务的结果,而这些任务又无法执行,因为没有空闲的线程可以使用。
- 在为任务时间可能很长的线程使用合用的线程时要小心。如果程序必须等待诸如 I/O 完成这样的某个资源,那么请指定最长的等待时间,以及随后是失效还是将任务重新排队以便稍后执行。
- 理解任务。要有效地调整线程池大小,需要理解正在排队的任务以及它们正在做什么。它们是 CPU 限制的吗?它们是 I/O 限制的吗?你的答案将影响如何调整应用程序。如果有不同的任务类,这些类有着截然不同的特征,那么为不同任务类设置多个工作队列可能会有意义,这样可以相应地调整每个池。
线程池大小的分配
调整线程池的大小基本上就是避免两类错误:线程太少或线程太多。幸运的是,对于大多数应用程序来说,太多和太少之间的余地相当宽。
线程池的最佳大小取决于可用处理器的数目以及工作队列中的任务的性质。若在一个具有 N 个处理器的系统上只有一个工作队列,其中全部是计算性质的任务,在线程池具有 N 或 N+1 个线程时一般会获得最大的 CPU 利用率。
对于那些可能需要等待 I/O 完成的任务,需要让池的大小超过可用处理器的数目,因为并不是所有线程都一直在工作。通过使用概要分析,可以估计某个典型请求的等待时间(WT)与服务时间(ST)之间的比例。如果我们将这一比例称之为 WT/ST,那么对于一个具有 N 个处理器的系统,需要设置大约 N*(1+WT/ST) 个线程来保持处理器得到充分利用。
处理器利用率不是调整线程池大小过程中的唯一考虑事项。随着线程池的增长,你可能会碰到调度程序、可用内存方面的限制,或者其它系统资源方面的限制,例如套接字、打开的文件句柄或数据库连接等的数目。
ThreadPoolExecutor详解
几个重要字段和方法
(这块内容引用了ideabuffer
博主的https://www.jianshu.com/p/d2729853c4da文章,并做了稍微修改,有兴趣的同学可以查看这篇文章,写的很详细)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
- ctl:对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1,这个常量表示workerCount的上限值,大约是5亿。
- RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED表示线程池的五种状态,在下面的章节中我们会具体介绍。
- runStateOf,workerCountOf,ctlOf是对ctl进行计算的方法;
- runStateOf:获取运行状态;
- workerCountOf:获取活动线程数;
- ctlOf:获取运行状态和活动线程数的值;
- execute方法:用来提交任务
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
/*
* 获取线程池状态以及有效线程数即runState以及workerCount
*/
int c = ctl.get();
/*
* 如果当前线程活动数小于corePoolSize,则新建一个线程放入线程池中,并作为线程池的第一个任务
*/
if (workerCountOf(c) < corePoolSize) {
/*
* 第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
* 如果为true,根据corePoolSize来判断;
* 如果为false,则根据maximumPoolSize来判断
*/
if (addWorker(command, true))
return;
/*
* 如果添加失败,则重新获取ctl值
*/
c = ctl.get();
}
/*
* 如果当前线程是运行状态,并且添加任务到队列中成功
*/
if (isRunning(c) && workQueue.offer(command)) {
/*
* 重新获取ctl值
*/
int recheck = ctl.get();
/*
* 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,这时需要移除该command,执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
*/
if (! isRunning(recheck) && remove(command))
reject(command);
/*
* 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
* 这里传入的参数表示:第一个参数为null,表示在线程池中创建一个线程,但不去启动;第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
* 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 如果执行到这里,有两种情况:
* 1. 线程池已经不是RUNNING状态;
* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
* 如果失败则拒绝该任务
*/
else if (!addWorker(command, false))
reject(command);
}
- addWorker方法:在线程池中创建一个新的线程并执行,firstTask参数用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
/*
* 获取ctl值
*/
int c = ctl.get();
/*
* 获取线程池运行状态
*/
int rs = runStateOf(c);
/*
* 如果rs >= SHUTDOWN,即线程池处于SHUTDOWN,STOP,TIDYING,TERMINATED中的一个状态,此时线程池不再接收新任务;
* 接着判断以下3个条件,只要有1个不满足,则返回false:
* 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
* 2. firsTask为空
* 3. 阻塞队列不为空
*
* 首先考虑rs == SHUTDOWN的情况
* 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
* 然后,如果firstTask为空,并且workQueue也为空,则返回false,
* 因为队列中已经没有任务了,不需要再添加线程了
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
/*
* 获取线程数
*/
int wc = workerCountOf(c);
/*
* 如果wc超过或等于CAPACITY,或者wc超过或等于corePoolSize当core为true时或wc超过或等于maximumPoolSize当core为false时,返回false
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/*
* 尝试增加workerCount,如果成功,则跳出第一个for循环
*/
if (compareAndIncrementWorkerCount(c))
break retry;
/*
* 如果增加workerCount失败,则重新获取ctl的值
*/
c = ctl.get(); // Re-read ctl
/*
* 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
*/
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/*
* 根据firstTask来创建Worker对象
*/
w = new Worker(firstTask);
/*
* 每一个Worker对象都会创建一个线程
*/
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
/*
* 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
/*
* 更新线程池中出现的最大线程数largestPoolSize
*/
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- Worker类:线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker类继承了AbstractQueuedSynchronizer,并实现了Runnable接口,其中firstTask用来保存传入的任务;thread是用来处理任务的线程,是通过ThreadFactory来创建的。newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AbstractQueuedSynchronizer,使用AbstractQueuedSynchronizer来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
- 如果正在执行任务,则不应该中断线程;
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
- 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
- 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
所以,Worker继承自AbstractQueuedSynchronizer,用于判断线程是否空闲以及是否可以被中断。
此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?是因为AbstractQueuedSynchronizer中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。
正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0.
- runWorker 方法:Worker类中的run方法调用了runWorker方法来执行任务
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/*
* 如果线程池正在停止,那么要保证当前线程是中断状态;
* 如果不是的话,则要保证当前线程不是中断状态;
* 这里要考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
STOP状态要中断线程池中的所有线程,而这里使用Thread.interrupted()来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,因为Thread.interrupted()方法会复位中断的状态。
runWorker的执行过程:
- while循环不断地通过getTask()方法获取任务;
- getTask()方法从阻塞队列中取任务;
- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
- 调用task.run()执行任务;
- 如果task为null则跳出循环,执行processWorkerExit()方法;
- runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
- getTask方法:用来从阻塞队列中取任务
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
/*
* 表示上次从阻塞队列中取任务时是否超时
*/
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
* 1. rs >= STOP,线程池是否正在stop;
* 2. 阻塞队列是否为空。
* 如果以上条件满足,则将workerCount减1并返回null。
* 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
/*
* timed变量用于判断是否需要进行超时控制。
* allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
* wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
* 对于超过核心线程数量的这些线程,需要进行超时控制
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
* timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
* 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
* 如果减1失败,则返回重试。
* 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
* 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
* 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
/*
* 如果 r == null,说明已经超时,timedOut设置为true
*/
timedOut = true;
} catch (InterruptedException retry) {
/*
* 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
*/
timedOut = false;
}
}
}
这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。
什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。
getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法。
- processWorkerExit 方法:
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/*
* 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
* 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。
*/
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/*
* 统计完成的任务数
*/
completedTaskCount += w.completedTasks;
/*
* 从workers中移除,也就表示着从线程池中移除了一个工作线程
*/
workers.remove(w);
} finally {
mainLock.unlock();
}
/*
* 根据线程池状态进行判断是否结束线程池
*/
tryTerminate();
int c = ctl.get();
/*
* 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
* 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:
image.png
- tryTerminate方法:根据线程池状态进行判断是否结束线程池
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/*
* 当前线程池的状态为以下几种情况时,直接返回:
* 1. RUNNING,因为还在运行中,不能停止;
* 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了;
* 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task;
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/*
* 如果线程数量不为0,则中断一个空闲的工作线程,并返回
*/
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/*
* 这里尝试设置状态为TIDYING,如果设置成功,则调用terminated方法
*/
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
/*
* terminated方法默认什么都不做,留给子类实现
*/
terminated();
} finally {
/*
* 设置状态为TERMINATED
*/
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
- shutdown 方法:将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers方法请求中断所有空闲的worker,最后调用tryTerminate尝试结束线程池
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/*
* 安全策略判断
*/
checkShutdownAccess();
/*
* 切换状态为SHUTDOWN
*/
advanceRunState(SHUTDOWN);
/*
* 中断空闲线程
*/
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
/*
* 尝试结束线程池
*/
tryTerminate();
}
这里思考一个问题:在runWorker方法中,执行任务时对Worker对象w进行了lock操作,为什么要在执行任务的时候对每个工作线程都加锁呢?
- 在getTask方法中,如果这时线程池的状态是SHUTDOWN并且workQueue为空,那么就应该返回null来结束这个工作线程,而使线程池进入SHUTDOWN状态需要调用shutdown方法;
- shutdown方法会调用interruptIdleWorkers来中断空闲的线程,interruptIdleWorkers持有mainLock,会遍历workers来逐个判断工作线程是否空闲。但getTask方法中没有mainLock;
- 在getTask中,如果判断当前线程池状态是RUNNING,并且阻塞队列为空,那么会调用workQueue.take()进行阻塞;
- 如果在判断当前线程池状态是RUNNING后,这时调用了shutdown方法把状态改为了SHUTDOWN,这时如果不进行中断,那么当前的工作线程在调用了workQueue.take()后会一直阻塞而不会被销毁,因为在SHUTDOWN状态下不允许再有新的任务添加到workQueue中,这样一来线程池永远都关闭不了了;
- 由上可知,shutdown方法与getTask方法(从队列中获取任务时)存在竞态条件;
- 解决这一问题就需要用到线程的中断,也就是为什么要用interruptIdleWorkers方法。在调用workQueue.take()时,如果发现当前线程在执行之前或者执行期间是中断状态,则会抛出InterruptedException,解除阻塞的状态;
- 但是要中断工作线程,还要判断工作线程是否是空闲的,如果工作线程正在处理任务,就不应该发生中断;
- 所以Worker继承自AbstractQueuedSynchronizer,在工作线程处理任务时会进行lock,interruptIdleWorkers在进行中断时会使用tryLock来判断该工作线程是否正在处理任务,如果tryLock返回true,说明该工作线程当前未执行任务,这时才可以被中断。
- interruptIdleWorkers方法:遍历workers中所有的工作线程,若线程没有被中断tryLock成功,就中断该线程。
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
为什么需要持有mainLock?因为workers是HashSet类型的,不能保证线程安全。
- shutdownNow 方法:
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
/*
* 中断所有工作线程,无论是否空闲
*/
interruptWorkers();
/*
* 取出队列中没有被执行的任务
*/
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutdownNow方法与shutdown方法类似,不同的地方在于:
- 设置状态为STOP;
- 中断所有工作线程,无论是否是空闲的;
- 取出阻塞队列中没有被执行的任务并返回。
线程池的监控
- getTaskCount:线程池已经执行的和未执行的任务总数;
- getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;
- getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
- getPoolSize:线程池当前的线程数量;
- getActiveCount:当前线程池中正在执行任务的线程数量。
通过这些方法,可以对线程池进行监控,在ThreadPoolExecutor类中提供了几个空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自ThreadPoolExecutor来进行扩展。
ThreadPoolExecutor的构造函数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
参数解释
- corePoolSize : 核心池大小,即线程的数量。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
- maximumPoolSize : 线程池最大线程数,表示在线程池中最多能创建多少个线程;
- keepAliveTime :表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize:即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize;但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
- unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒 - workQueue:线程池采用的缓冲队列,用来存储等待执行的任务,这个参数的选择会对线程池的运行过程产生重大影响,一般来说,有以下几种选择:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。
- threadFactory:线程工厂,主要用来创建线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
- handler:当线程池中线程数量达到maximumPoolSize时,仍有任务需要创建线程来完成,则handler采取相应的策略,有以下几种策略:
- ThreadPoolExecutor.AbortPolicy;//丢弃任务并抛出RejectedExecutionException异常(默认)。
- ThreadPoolExecutor.DiscardPolicy;//不处理,直接丢弃任务。
- ThreadPoolExecutor.DiscardOldestPolicy;//丢弃队列里前面的任务,并执行当前任务
- ThreadPoolExecutor.CallerRunsPolicy;//只用调用者所在线程来运行任务
从ThreadPoolExecutor的源码我们可以看到,ThreadPoolExecutor类继承了抽象类AbstractExecutorService,而抽象类AbstractExecutorService实现了ExecutorService接口,ExecutorService接口又继承了Executor接口。
Executor是最顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,用来执行传进去的任务的;
线程池的五种状态
- RUNNING:能接受新提交的任务,并且也能处理阻塞队列中的任务;
- SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态.(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
- STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
- TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
- TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
进入TERMINATED的条件如下:- 线程池不是RUNNING状态;
- 线程池状态不是TIDYING状态或TERMINATED状态;
- 如果线程池状态是SHUTDOWN并且workerQueue为空;
- workerCount为0;
-
设置TIDYING状态成功。
线程池的状态转换过程图.png
线程池执行流程
线程池执行流程图.png- 当线程数量小于corePoolSize时,任务来时会创建新的线程来处理,并把该线程加入线程队列中(实际上是一个HashSet)(此步骤需要获取全局锁,ReentryLock);
- 如果当前线程数量达到了corePoolSize,任务来时将任务加入BlockingQueue;
- 如果任务列队满了无法加入新的任务时,会创建新的线程(同样需要获取全局锁);
- 如果线程池数量达到maximumPoolSize,并且任务队列已满,新的任务将被拒绝;
注意:获取全局锁是一个非常影响性能的因素,所以线程池会尽量执行第二步,因为此步骤不需要获取全局锁。
各种任务队列(BlockingQueue)的区别
- ArrayBlockingQueue: 基于数组实现的有界的阻塞队列,该队列按照FIFO(先进先出)原则对队列中的元素进行排序。
- LinkedBlockingQueue:基于链表实现的阻塞队列,该队列按照FIFO(先进先出)原则对队列中的元素进行排序。吞吐量高于ArrayBlockingQueue,Executors.newFixedThreadPool()使用了该队列。
- SynchronousQueue:内部没有任何容量的阻塞队列。在它内部没有任何的缓存空间。对于SynchronousQueue中的数据元素只有当我们试着取走的时候才可能存在。吞吐量高于LinkedBlockingQueue,Executors.newCachedThreadPool()使用了该队列。
- PriorityBlockingQueue:具有优先级的无限阻塞队列。
RejectedExecutionHandler饱和策略
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认)。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列里前面的任务,并执行当前任务。
- ThreadPoolExecutor.DiscardPolicy:不处理,直接丢弃任务。
- ThreadPoolExecutor.CallerRunsPolicy:只用调用者所在线程来运行任务。
提交任务
- 调用execute(Runnable command),无返回值。由于execute方法没有返回值,所以说我们也就无法判定任务是否被线程池执行成功。
- 调用submit(Runnable task),有返回值,返回类型是Future<?>类型。我们可以通过这个future来判断任务是否执行成功,还可以通过future的get方法来获取返回值。如果子线程任务没有完成,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时候有可能任务并没有执行完。
线程池关闭
shutdown()和shutdownNow()是用来关闭线程池的,都是调用了interruptIdleWorkers()方法去遍历线程池中的工作线程,然后去打断它们。
shutdown原理:将线程池状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
shutdownNow原理:将线程池的状态设置成STOP状态,然后中断所有任务(包括正在执行的)的线程,并返回等待执行任务的列表。
中断采用interrupt方法,所以无法响应中断的任务可能永远无法终止。但调用上述的两个关闭之一,isShutdown()方法返回值为true,当所有任务都已关闭,表示线程池关闭完成,则isTerminated()方法返回值为true。当需要立刻中断所有的线程,不一定需要执行完任务,可直接调用shutdownNow()方法。
几种常用的线程池
CachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
从java.util.concurrent.Executors的源码中我们可以发现,CachedThreadPool使用了SynchronousQueue这个没有容量的阻塞队列,线程池的coreSize为0,maxSize为无限大。
特点:
- 工作线程的创建数量几乎没有限制, 这样可灵活的往线程池中添加线程。
- 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
- 在使用CachedThreadPool时,一定要注意控制任务的数量,否则,如果当生产者提供任务的速度大于消费者处理任务的时候,可能会无限创建线程,从而导致系统资源(CPU,内存等)耗竭。
使用示例:
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService cacheThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
cacheThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
程序一次性输出所有的数字,因为可以创建多个线程同时执行任务
FixedThreadPool
创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到阻塞队列中。
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
从java.util.concurrent.Executors的源码中我们可以发现,FixedThreadPool线程池的corePoolSize和maximumPoolSize数量是一样的,且使用的LinkedBlockingQueue是无界阻塞队列,因此达到corePoolSize后不会继续创建线程而是阻塞在队列那了。
特点:
- 所容纳最大的线程数就是我们设置的核心线程数。
- 如果线程池的线程处于空闲状态的话,它们并不会被回收,除非是这个线程池被关闭。如果所有的线程都处于活动状态的话,新任务就会处于等待状态,直到有线程空闲出来。
- 由于FixedThreadPool只有核心线程,并且这些线程都不会被回收,也就是它能够更快速的响应外界请求。
- 阻塞队列的大小没有限制。
使用示例:
public class FixedThreadPool {
public static void main(String[] args) {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
final int index = i;
newFixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
因为线程池的大小为5,每个任务输出index后sleep 2秒,所以每2秒打印5个数字。
SingleThreadExecutor
创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
从java.util.concurrent.Executors的源码中我们可以发现,SingleThreadExecutor线程池的corePoolSize和maximumPoolSize数量都是1,即线程池只创建一个线程,且使用的LinkedBlockingQueue是无界阻塞队列,因此如果线程池中的线程处于活动的,则后面的任务只能到阻塞队列中。
特点:
- 只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
- 任务队列没有大小限制,也就意味着这一个任务处于活动状态时,其他任务都会在任务队列中排队等候依次执行。
- 所有的外界任务统一到一个线程中支持,所以在这个任务执行之间我们不需要处理线程同步的问题。
使用示例:
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
cacheThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
程序每隔两秒依次输出一个数字。
ScheduleThreadPool
创建一个定长的线程池,而且支持定时的以及周期性的任务执行,支持定时及周期性任务执行。
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given core pool size.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
从java.util.concurrent.Executors的源码即ScheduledThreadPoolExecutor的构造函数我们可以发现,ScheduleThreadPool的核心线程数是固定的,对于非核心线程几乎可以说是没有限制的,并且当非核心线程处于闲置状态的时候就会立即被回收。
特点:
- 核心线程数固定,且当非核心线程处于闲置状态会立即被回收。
- 可创建定时执行和延迟执行任务
使用示例:
延迟2秒执行任务
public class ScheduleThreadPool {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 10; i++) {
final int index = i;
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println(index);
}
}, 2, TimeUnit.SECONDS);
}
}
}
延迟2秒执行,每5秒周期执行任务
public class ScheduleThreadPool {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 10; i++) {
final int index = i;
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(index);
}
}, 2,5, TimeUnit.SECONDS);
}
}
}
Executors各个方法的弊端:
- newFixedThreadPool和newSingleThreadExecutor:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
- newCachedThreadPool和newScheduledThreadPool:主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
总结
- 在使用线程池中,建议手动创建线程池,尽量不要使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
- 使用线程池是需要针对具体情况而具体处理,不同的任务类别应采用不同规模的线程池,任务类别可划分为CPU密集型任务、IO密集型任务和混合型任务。以下N为CPU个数。
- 对于CPU密集型任务:线程池中线程个数应尽量少,如配置N+1个线程的线程池;
- 对于IO密集型任务:由于IO操作速度远低于CPU速度,那么在运行这类任务时,CPU绝大多数时间处于空闲状态,那么线程池可以配置尽量多些的线程,以提高CPU利用率,如2*N;
- 对于混合型任务:可以拆分为CPU密集型任务和IO密集型任务,当这两类任务执行时间相差无几时,通过拆分再执行的吞吐率高于串行执行的吞吐率,但若这两类任务执行时间有数据级的差距,那么没有拆分的意义。
线程池面试
什么是线程池
创建一组可供管理的线程,它关注的是如何缩短或调整线程的创建与销毁所消费时间的技术,从而提高服务器程序性能的。
它把线程的创建与销毁分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段以减少服务请求是去创建和销毁线程的时间。
它还显著减少了创建线程的数目。
为什么要使用线程池
当我们在使用线程时,如果每次需要一个线程时都去创建一个线程,这样实现起来很简单,但是会有一个问题:当并发线程数过多时,并且每个线程都是执行一个时间很短的任务就结束时,这样创建和销毁线程的时间要比花在实际处理任务的时间要长的多,在一个JVM里创建太多的线程可能会导致由于系统过度消耗内存或切换过度导致系统资源不足而导致OOM问题。 线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。
如何正确的使用线程池
- 不要对那些同步等待其它任务结果的任务排队。这可能会导致上面所描述的那种形式的死锁,在那种死锁中,所有线程都被一些任务所占用,这些任务依次等待排队任务的结果,而这些任务又无法执行,因为没有空闲的线程可以使用。
- 在为任务时间可能很长的线程使用合用的线程时要小心。如果程序必须等待诸如 I/O 完成这样的某个资源,那么请指定最长的等待时间,以及随后是失效还是将任务重新排队以便稍后执行。
- 理解任务。要有效地调整线程池大小,需要理解正在排队的任务以及它们正在做什么。它们是 CPU 限制的吗?它们是 I/O 限制的吗?你的答案将影响如何调整应用程序。如果有不同的任务类,这些类有着截然不同的特征,那么为不同任务类设置多个工作队列可能会有意义,这样可以相应地调整每个池。
ThreadPoolExecutor构造函数中几个重要参数的解释
- corePoolSize:核心池大小,即线程的数量。
- maximumPoolSize:线程池最大线程数,表示在线程池中最多能创建多少个线程。
- keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。
- unit:参数keepAliveTime的时间单位,有7种取值,具体可查看前面章节。
- workQueue:线程池采用的缓冲队列,用来存储等待执行的任务。
- threadFactory:线程工厂,主要用来创建线程。
- handler:线程的阻塞策略。当线程池中线程数量达到maximumPoolSize时,仍有任务需要创建线程来完成,则handler采取相应的策略。
线程池的状态
- RUNNING:能接受新提交的任务,并且也能处理阻塞队列中的任务;
- SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态.(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
- STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
- TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
- TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
线程池的执行流程
- 当线程数量小于corePoolSize时,任务来时会创建新的线程来处理,并把该线程加入线程队列中(实际上是一个HashSet)(此步骤需要获取全局锁,ReentryLock);
- 如果当前线程数量达到了corePoolSize,任务来时将任务加入BlockingQueue;
- 如果任务列队满了无法加入新的任务时,会创建新的线程(同样需要获取全局锁);
- 如果线程池数量达到maximumPoolSize,并且任务队列已满,新的任务将被拒绝;
常用的几种线程池
- CachedThreadPool: 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
- FixedThreadPool:创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到阻塞队列中。
- SingleThreadExecutor:创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
- ScheduleThreadPool:创建一个定长的线程池,而且支持定时的以及周期性的任务执行,支持定时及周期性任务执行。
整理文章主要为了自己日后复习用,文章中可能会引用到别的博主的文章内容,如涉及到博主的版权问题,请博主联系我。