Java并发编程-线程池
1.简介
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务,在Java中可以通过线程池来达到这样的效果。
2.Java中的ThreadPoolExecutor类
线程池相关类:ThreadPoolExecutor、AbstractExecutorService、ExecutorService、Executor、Executors(线程池工厂类)
几个类的关系如下:
(1)ThreadPoolExecutor extends AbstractExecutorService
(2)AbstractExecutorService implements ExecutorService
(3)public interface ExecutorService extends Executor
(4)Executors(线程池工厂类),Exectors工厂类提供了线程池的初始化接口,可以初始化四种不同的线程池。
Executor是一个接口,它是Executor框架的基础,它将任务的提交和任务的执行分离开来。Android中的线程池来源于Java中的Executor、Executor是一个接口,真正的线程池实现是在ThreadPoolExecutor,ThreadPoolExecutor提供一系列参数来配置线程池,通过不同的参数可以创建不同的线程池,线程池主要分为4类,这四类线程池可以通过Executors所提供的工厂方法来的得到。线程池都是直接或者间接通过配置ThreadPoolExecutor来实现的。从线程池的上层API来看,再多种的线程池,无非是参数的不同,让它们呈现出了不同的特性。那么这些特性到底依赖什么样的原理实现,就更值得去深究。ThreadPoolExecutor实现了线程池所需的最小功能集,已能hold住很多场景。
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。线程池都是直接或者间接通过配置ThreadPoolExecutor来实现的,并且ThreadPoolExecutor有四个构造函数,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
ThreadPoolExecutor共七个参数:核心线程数,最大线程数,keepAliveTime,以及keepAliveTime时间单位,阻塞队列、线程工厂、拒绝策略。
我们先看下四种不同的线程池相关参数:
FixThreadPool
//适用于负载比较重的服务器。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
线程数:固定数量的线程,核心线程和最大线程数一样
阻塞队列:LinkedBlockingQueue基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
线程工厂ThreadFactory:默认的线程工厂;
拒绝策略:ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
CachedThreadPool:大小无界的线程池,适用于执行很多短期异步任务的小程序,或者负载较轻的服务器。阻塞队列:SynchronousQueue一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;线程工厂和拒绝策略与FixThreadPool相同。
SingleThreadExecutor:适用于需要保证任务顺序执行的各个任务。核心线程和最大线程都是1,阻塞队列LinkedBlockingQueue:LinkedBlockingQueue基于链表结构的阻塞队列,阻塞队列大小为Integer.MAX_VALUE。线程工厂和拒绝策略也和FixThreadPool一样。
ScheduledThreadPoolExecutor:适用于需要多个后台线程执行周期任务。阻塞队列:DelayedWorkQueue,默认大小是16。线程工厂和拒绝和FixThreadPool一样。
构造函数中各参数含义:
corePoolSize:线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;
keepAliveTime:线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用;
unit:keepAliveTime的单位;
阻塞队列workQueue
四种线程池用到了三种阻塞队列:LinkedBlockingQueue、SynchronousQueue、DelayedWorkQueue。阻塞队列用来存储等待执行的任务。四种线程池用到的线程工厂和拒绝策略都一样,都是默认的。
JDK提供的阻塞队列有:
(1)ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;基于数组的先进先出队列,此队列创建时必须指定大小;
(2)LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
(3)SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
(4)PriorityBlockingQuene:具有优先级的无界阻塞队列;线程池的排队策略与BlockingQueue有关。 其中ArrayBlockQueue和LinkedBlockingQueue都可以指定初始容量。
线程工厂threadFactory:可以使用Executors中默认线程工厂,也可以通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。
线程池拒绝策略handler:线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务;
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
综合以上,线程池阻塞队列、线程工厂、线程拒绝策略都可以自定义,根据自己的需求进行自定义。线程池各个参数都可以进行自定义。ThreadFactory和拒绝策略都很容易进行自定义。
如何存储线程池状态以及线程数量(5种状态和线程个数)?
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 值为29 Integer.SIZE=32
private static final int COUNT_BITS = Integer.SIZE - 3;
// 高三位全为0,低29位全为1,因此线程数量的表示范围为 0 ~ 2^29
//1左移29位然后再减1,这样后29全为1,所以线程池中线程最大个数为2的29次数。
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl 获取线程池状态,参数C为ctl最新状态值。 ~CAPACITY 高三位为111,低29为全为0.这样就可以得到线程池状态。
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取工作线程数量, 参数C为ctl最新状态值 CAPACITY 高三位为000,低29全为1,按位与就可以得到当前线程个数。
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
}
ctl用来控制线程池的状态,并用来表示线程池线程数量。ctl类型为AtomicInteger,那用一个基础如何表示以上五种状态以及线程池工作线程数量呢?int型变量占用4字节,共32位,因此采用位表示,可以解决上述问题。5种状态使用5种数值进行表示,需要占用3位,余下的29位就可以用来表示线程数。因此,高三位表示进程状态,低29位为线程数量,ctl是原子操作类,进行自增自减时都是线程安全的,原子操作类是基于CAS实现的。采用了int分位表示线程池状态和线程数量,如何获取线程状态与数量见代码注释。
在Java读写锁中,读写锁状态也是使用一个int来标识读写锁不同状态,高16位中存储读锁状态,低16位存储写锁状态。
如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
3.任务提交与任务执行
线程池框架提供了两种方式提交任务,根据不同的业务需求选择不同的方式。通过Executor.execute()方法提交的任务,必须实现Runnable接口,该方式提交的任务不能获取返回值,因此无法判断任务是否执行成功。通过ExecutorService.submit()方法提交的任务,可以获取任务执行完的返回值。用于提交需要返回值的任务,线程池会返回一个future类型的对象,通过future对象可以判断任务是否执行成功,Future接口和实现Future接口的FutureTask类,代表异步计算结果。任务执行:我们以Executor.execute()方法提交的任务为例,对任务执行进行介绍,该部分是线程池的核心,代码如下,相关内容在代码中已添加注释。
public void execute(Runnable command) {
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果线程数小于corePoolSize,则执行addWorker方法创建新的线程(核心线程)执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果线程池处于RUNNING状态,且把提交的任务成功放入阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次检查线程池的状态,如果线程池没有RUNNING,且成功从阻塞队列中删除任务,则执行reject方法处理任务;
if (! isRunning(recheck) && remove(command))
reject(command);
//当前线程个数是0, 则执行addWorker方法创建新的线程(非核心线程)执行任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//执行addWorker方法创建新的线程执行任务,如果addWoker执行失败,则执行reject方法处理任务
//如果任务队列已满,并且核心线程都已经启动,则创建非核心线程去执行任务,如果创建非核心线程失败,则拒绝任务
else if (!addWorker(command, false))
reject(command);
}
//addWorker实现
//从方法execute的实现可以看出:addWorker主要负责创建新的线程并执行任务
private boolean addWorker(Runnable firstTask, boolean core) {
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//线程池的工作线程通过Woker类实现,当前提交的任务firstTask作为参数传入Worker的构造方法;
//Worker实现了Runnable接口,可以将自身作为一个任务在工作线程中执行;
w = new Worker(firstTask);
//w.thread在Worker构造函数中使用ThreadFactory创建线程,this.thread = getThreadFactory().newThread(this);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//执行start方法启动线程thread时,本质是执行了Worker的run方法启动线程,然后调用runWorker()方法。
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
//runWorker()方法
//runWorker方法是线程池的核心
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;
//线程复用就是在线程执行完之后,不断的从任务队列中取新的任务,直到线程池中的全部任务执行完,以此来完成线程复用
//然后当线程没有任务执行时,超过存活时间之后,线程终止。
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//在执行任务的前后,可以根据业务场景自定义beforeExecute和afterExecute方法;
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行通过Executor.execute()方法提交的任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//在执行任务的前后,可以根据业务场景自定义beforeExecute和afterExecute方法;
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 线程池已没有任务了,工作线程达到了可退出的状态,Worker退出
processWorkerExit(w, completedAbruptly);
}
}
//getTask()方法
private Runnable getTask() {
try {
//workQueue.take:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
//workQueue.poll:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;
//所以,线程池中实现的线程可以一直执行由用户提交的任务。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
4.线程池原理
以上我们已经对线程池中参数和执行流程进行了讲解,下面通过示例来总结一下线程池工作原理。
假设:corePoolSize=5,maxPoolSize=10,blockQueueSize=10,依次提交6个比较耗时的任务,线程池是如何执行的?
答:依次提交6个耗时任务,可以理解为正在执行的线程都没执行完,前面提交的5个任务,肯定有coresize=5的5个线程全部执行,第6个任务是继续创建线程呢,还是放在队列里?第6个任务会放在队列里,等待前面5个任务有某一个执行完之后,再执行,maxsize线程只有阻塞队列满之后才会继续创建线程,然后将任务队列中的所有任务执行完之后,非核心线程的消息时间达到才会关闭线程,但是五个核心线程是不会被回收掉的。
核心点:提交任务时,如果当前线程个数小于核心线程数,那么提交新任务就会创建新的线程,如果阻塞队列还没有满就不会创建非核心线程,非核心线程的创建条件是任务队列必须满,并且有任务。
5.线程池相关问题
(1)非核心线程延迟死亡,如何实现?
通过阻塞队列poll(),让线程阻塞等待一段时间,如果没有取到任务,则线程死亡。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
(2)核心线程为什么不死?
线程池里的线程从阻塞队列里拿任务,如果存在非核心线程,假设阻塞队列里没有任务,那么非核心线程也要在等到keepAliveTime时间后才会释放。如果当前仅有核心线程存在,如果允许释放核心线程的话,也就和非核线程的处理方式一样,反之,则通过take()一直阻塞直到拿到任务,这也就是线程池里的核心线程为什么不死的原因。
(3)如何释放核心线程?
将allowCoreThreadTimeOut设置为true。
(4)核心线程和非核心线程区别?
并没有发现有明显的标志来标志核心线程与非核心线程,而是以线程数来表达线程身份。0 ~ corePoolSize 表示线程池里只有核心线程,corePoolSize ~ maximumPoolSize 表示线程池里核心线程满,存在非核心线程。线程池实际并不区分核心线程与非核心线程,是根据当前的总体并发状态来决定怎样处理线程任务。实际上并不存在核心和非核心线程,大家都是线程,超过核心线程需要销毁线程时,当前再获取任务的线程先销毁就行了。
(5)非核心线程能成为核心线程吗?
线程池不区分核心线程于非核心线程,只是根据当前线程池容量状态做不同的处理来进行调整,因此看起来像是有核心线程于非核心线程,实际上是满足线程池期望达到的并发状态。
(6)Runnable在线程池里如何执行?
线程执行Worker,Worker不断从阻塞队列里获取任务来执行。如果任务加入线程池失败,则在拒绝策略里,还有处理机会。
(7)如何释放线程?
在线程没有拿到任务后,退出线程,通过processWorkerExit()可以证实上述所言。释放工作线程也并没有区分核心与非核心,也是随机进行的。所谓随机,就是在前面所说的区间范围内,根据释放策略,哪个线程先达到获取不到任务的状态,就释放哪个线程。在线程复用进行死循环执行任务的时候,如线程通过take或者poll拿不到任务,即线程池已经没有任务了,在线程达到最大存活时间时就会销毁线程。
(8)线程池如何线程复用?
线程池复用是一个非常巧妙的设计方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程去任务缓存队列里面取任务来执行。
//线程执行完任务之后,就会主动去任务缓存队列中取任务去执行,直到任务执行完毕
while (task != null || (task = getTask()) != null) {
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
}
线程执行完任务之后,会反复的从阻塞队列中获取任务来执行,以此来实现线程复用。当非核心线程超多最大存活时间之后就会销毁非核心线程。
(9)线程数如何做选择?
cpu密集型任务核心线程数选择小一点,io密集型任务选择核心线程数大一点,以及大部分是线程等待状态。
6.AsyncTask源码中的线程池
AsyncTask是一种轻量级的异步任务类,可以在线程池中执行耗时任务,然后把执行的进度和最终结果传递给主线程并更新UI;
AsyncTask详细内容请参考:AsyncTask源码解析,AsyncTask使用的线程池如下代码所示:
public abstract class AsyncTask<Params, Progress, Result> {
private static final String LOG_TAG = "AsyncTask";
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// We want at least 2 threads and at most 4 threads in the core pool,
// preferring to have 1 less than the CPU count to avoid saturating
// the CPU with background work 核心线程个数是Math.min(CPU_COUNT - 1, 4)
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
/**
* An {@link Executor} that can be used to execute tasks in parallel.
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
}
通过以上代码可以发现核心线程个数为Math.min(CPU_COUNT - 1, 4),最大线程数CPU_COUNT * 2 + 1,提交到AsyncTask的耗时任务最终在THREAD_POOL_EXECUTOR中执行,并且所有提交到AsyncTask的耗时任务都是串行执行的。只需要一个线程就可以完成AsyncTask功能,所以AsyncTask中的线程池中只会有一个线程。
那么AsyncTask完成可以使用SingleThreadExecutor线程池,但是为什么AsyncTask不采用SingleThreadExecutor,猜测有以下两个原因:
第一:AsyncTask之前的版本中,提交到AsyncTask的任务都是并行执行,并不是像现在这样串行执行,所以不能使用SingleThreadExecutor线程池。
第二:可以直接将多个任务直接抛到AsyncTask.ThreadPoolExecutor线程池中执行,这样可以绕过AsyncTask串行排队策略,从而实现并发执行。
如果大家有其他的一些看法,希望可以一起讨论一下。
参考资料
深入分析java线程池的实现原理
关于Java面试,你应该准备这些知识点
Java并发编程:线程池的使用
你了解线程池吗
《Java并发编程的艺术》