线程池
线程
创建线程有两种方式:
1、继承Thread类
2、实现Runnable接口
区别在于:
主要区别在于在多线程访问同一资源的情况下,用Runnable接口创建的线程可以处理同一资源,而用Thread类创建的线程则各自独立处理,各自拥有自己的资源。
new Thread(new Runnable() {
@Override
public void run() {
//do sth .
}
}).start();
这段代码创建了一个线程并执行,它在任务结束后GC会自动回收该线程,一切看起来如此美妙,是的,它在线程并发不多的程序中确实不错,而假如这个程序有很多地方需要开启大量线程来处理任务,那么如果还是用上述的方式去创建线程处理的话,那么将导致系统的性能表现的非常糟糕,更别说在内存有限的移动设备上,主要的影响如下:
-
线程的创建和销毁都需要时间,当有大量的线程创建和销毁时,那么这些时间的消耗则比较明显,将导致性能上的缺失
-
大量的线程创建、执行和销毁是非常耗cpu和内存的,这样将直接影响系统的吞吐量,导致性能急剧下降,如果内存资源占用的比较多,还很可能造成OOM
-
大量的线程的创建和销毁很容易导致GC频繁的执行,从而发生内存抖动现象,而发生了内存抖动,对于移动端来说,最大的影响就是造成界面卡顿
而针对上述所描述的问题,解决的办法归根到底就是:重用已有的线程,从而减少线程的创建。
所以这就涉及到线程池(ExecutorService)的概念了,线程池的基本作用就是进行线程的复用,下面将具体介绍线程池的使用
ExecutorService
使用线程池管理线程的优点
1、线程的创建和销毁由线程池维护,一个线程在完成任务后并不会立即销毁,而是由后续的任务复用这个线程,从而减少线程的创建和销毁,节约系统的开销
2、线程池旨在线程的复用,这就可以节约我们用以往的方式创建线程和销毁所消耗的时间,减少线程频繁调度的开销,从而节约系统资源,提高系统吞吐量
3、在执行大量异步任务时提高了性能
4、Java内置的一套ExecutorService线程池相关的api,可以更方便的控制线程的最大并发数、线程的定时任务、单线程的顺序执行等
ExecutorService,它是一个接口,可以叫做线程池的服务,因为它提供了众多接口api来控制线程池中的线程,而真正意义上的线程池就是:ThreadPoolExecutor,它实现了ExecutorService接口,并封装了一系列的api使得它具有线程池的特性,其中包括工作队列、核心线程数、最大线程数等。
线程池:ThreadPoolExecutor
继承体系
image.png构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {...}
推荐使用Executors的工厂方法来创建线程池,Executors类是官方提供的一个工厂类,它里面封装好了众多功能不一样的线程池,从而使得我们创建线程池非常的简便,主要提供了如下五种功能不一样的线程池:
1、newFixedThreadPool() :
作用:该方法返回一个固定线程数量的线程池,该线程池中的线程数量始终不变,即不会再创建新的线程,也不会销毁已经创建好的线程,自始自终都是那几个固定的线程在工作,所以该线程池可以控制线程的最大并发数。
栗子:假如有一个新任务提交时,线程池中如果有空闲的线程则立即使用空闲线程来处理任务,如果没有,则会把这个新任务存在一个任务队列中,一旦有线程空闲了,则按FIFO方式处理任务队列中的任务。
2、newCachedThreadPool() :
作用:该方法返回一个可以根据实际情况调整线程池中线程的数量的线程池。即该线程池中的线程数量不确定,是根据实际情况动态调整的。
栗子:假如该线程池中的所有线程都正在工作,而此时有新任务提交,那么将会创建新的线程去处理该任务,而此时假如之前有一些线程完成了任务,现在又有新任务提交,那么将不会创建新线程去处理,而是复用空闲的线程去处理新任务。那么此时有人有疑问了,那这样来说该线程池的线程岂不是会越集越多?其实并不会,因为线程池中的线程都有一个“保持活动时间”的参数,通过配置它,如果线程池中的空闲线程的空闲时间超过该“保存活动时间”则立刻停止该线程,而该线程池默认的“保持活动时间”为60s。
3、newSingleThreadExecutor() :
作用:该方法返回一个只有一个线程的线程池,即每次只能执行一个线程任务,多余的任务会保存到一个任务队列中,等待这一个线程空闲,当这个线程空闲了再按FIFO方式顺序执行任务队列中的任务。
4、newScheduledThreadPool() :
作用:该方法返回一个可以控制线程池内线程定时或周期性执行某任务的线程池。
5、newSingleThreadScheduledExecutor() :
作用:该方法返回一个可以控制线程池内线程定时或周期性执行某任务的线程池。只不过和上面的区别是该线程池大小为1,而上面的可以指定线程池的大小。
获取线程池
通过Executors的工厂方法来获取:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
ScheduledExecutorService singleThreadScheduledPool = Executors.newSingleThreadScheduledExecutor();
我们可以看到通过Executors的工厂方法来创建线程池极其简便,其实它的内部还是通过new ThreadPoolExecutor(…)的方式创建线程池的,我们看一下这些工厂方法的内部实现:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
我们可以清楚的看到这些方法的内部实现都是通过创建一个ThreadPoolExecutor
对象来创建的,正所谓万变不离其宗,所以我们要了解线程池还是得了解ThreadPoolExecutor
这个线程池类,其中由于和定时任务相关的线程池比较特殊(newScheduledThreadPool()、newSingleThreadScheduledExecutor())
,它们创建的线程池内部实现是由ScheduledThreadPoolExecutor
这个类实现的,而ScheduledThreadPoolExecutor
是继承于ThreadPoolExecutor
扩展而成的,所以本质还是一样的,只不过多封装了一些定时任务相关的api,所以我们主要就是要了解ThreadPoolExecutor
,从构造方法开始:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {//...}
我们可以看到它构造方法的参数比较多,有七个,下面一一来说明这些参数的作用:
-
corePoolSize:线程池中的核心线程数量
-
maximumPoolSize:线程池中的最大线程数量
-
keepAliveTime:这个就是上面说到的“保持活动时间“,上面只是大概说明了一下它的作用,不过它起作用必须在一个前提下,就是当线程池中的线程数量超过了corePoolSize时,它表示多余的空闲线程的存活时间,即:多余的空闲线程在超过keepAliveTime时间内没有任务的话则被销毁。而这个主要应用在缓存线程池中
-
unit:它是一个枚举类型,表示
keepAliveTime
的单位,常用的如:TimeUnit.SECONDS
(秒)、TimeUnit.MILLISECONDS
(毫秒) -
workQueue:任务队列,主要用来存储已经提交但未被执行的任务,不同的线程池采用的排队策略不一样,稍后再讲
-
threadFactory:线程工厂,用来创建线程池中的线程,通常用默认的即可
-
handler:通常叫做拒绝策略,1、在线程池已经关闭的情况下 2、任务太多导致最大线程数和任务队列已经饱和,无法再接收新的任务 。在上面两种情况下,只要满足其中一种时,在使用
execute()
来提交新的任务时将会拒绝,而默认的拒绝策略是抛一个RejectedExecutionException
异常
上面的参数理解起来都比较简单,不过workQueue
这个任务队列却要再次说明一下,它是一个BlockingQueue<Runnable>
对象,而泛型则限定它是用来存放Runnable
对象的,刚刚上面讲了,不同的线程池它的任务队列实现肯定是不一样的,所以,保证不同线程池有着不同的功能的核心就是这个workQueue
的实现了,细心的会发现在刚刚的用来创建线程池的工厂方法中,针对不同的线程池传入的workQueue
也不一样,下面我总结一下这五种线程池分别用的是什么BlockingQueue
:
1、newFixedThreadPool()—>LinkedBlockingQueue
2、newSingleThreadExecutor()—>LinkedBlockingQueue
3、newCachedThreadPool()—>SynchronousQueue
4、newScheduledThreadPool()—>DelayedWorkQueue
5、newSingleThreadScheduledExecutor()—>DelayedWorkQueue
这些队列分别表示:
- LinkedBlockingQueue:无界的队列
- SynchronousQueue:直接提交的队列
- DelayedWorkQueue:等待队列
当然实现了BlockingQueue
接口的队列还有:ArrayBlockingQueue
(有界的队列)、PriorityBlockingQueue
(优先级队列)。这些队列的详细作用就不多介绍了。
线程池ThreadPoolExecutor的使用
使用线程池,其中涉及到一个极其重要的方法,即:
execute(Runnable command)
该方法意为执行给定的任务,该任务处理可能在新的线程、已入池的线程或者正调用的线程,这由ThreadPoolExecutor的实现决定。
自定义线程池ThreadPoolExecutor
Java内置只为我们提供了五种常用的线程池,一般来说这足够用了,不过有时候我们也可以根据需求来自定义我们自己的线程池,而要自定义不同功能的线程池,上面我们也说了线程池功能的不同归根到底还是内部的BlockingQueue实现不同,所以,我们要实现我们自己相要的线程池,就必须从BlockingQueue的实现上做手脚,而上面也说了BlockingQueue的实现类有多个,那么这次我们就选用PriorityBlockingQueue来实现一个功能是按任务的优先级来处理的线程池。
1、首先我们创建一个基于PriorityBlockingQueue实现的线程池,为了测试方便,我这里把核心线程数量设置为3,如下:
ExecutorService priorityThreadPool = new ThreadPoolExecutor(3,3,0L,TimeUnit.SECONDS,new PriorityBlockingQueue<Runnable>());
2、然后创建一个实现Runnable接口的类,并向外提供一个抽象方法供我们实现自定义功能,并实现Comparable接口,实现这个接口主要就是进行优先级的比较,代码如下:
public abstract class PriorityRunnable implements Runnable, Comparable<PriorityRunnable> {
private int priority;
public PriorityRunnable(int priority) {
if (priority < 0)
throw new IllegalArgumentException();
this.priority = priority;
}
@Override
public int compareTo(PriorityRunnable another) {
int my = this.getPriority();
int other = another.getPriority();
return my < other ? 1 : my > other ? -1 : 0;
}
@Override
public void run() {
doSth();
}
public abstract void doSth();
public int getPriority() {
return priority;
}
}
3、使用我们自己的PriorityRunnable提交任务,整体代码如下:
ExecutorService priorityThreadPool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
for (int i = 1; i <= 10; i++) {
final int priority = i;
priorityThreadPool.execute(new PriorityRunnable(priority) {
@Override
public void doSth() {
String threadName = Thread.currentThread().getName();
Log.v("zxy", "线程:" + threadName + ",正在执行优先级为:" + priority + "的任务");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
扩展线程池ThreadPoolExecutor
除了内置的功能外,ThreadPoolExecutor也向外提供了三个接口供我们自己扩展满足我们需求的线程池,这三个接口分别是:
- beforeExecute() - 任务执行前执行的方法
- afterExecute() -任务执行结束后执行的方法
- terminated() -线程池关闭后执行的方法
这三个方法在ThreadPoolExecutor内部都没有实现
前面两个方法我们可以在ThreadPoolExecutor内部的runWorker()方法中找到,而runWorker()是ThreadPoolExecutor的内部类Worker实现的方法,Worker它实现了Runnable接口,也正是线程池内处理任务的工作线程,而Worker.runWorker()方法则是处理我们所提交的任务的方法,它会同时被多个线程访问,所以我们看runWorker()方法的实现,由于涉及到多个线程的异步调用,必然是需要使用锁来处理,而这里使用的是Lock来实现的,我们来看看runWorker()方法内主要实现:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
image.png
可以看到在task.run()之前和之后分别调用了beforeExecute和afterExecute方法,并传入了我们的任务Runnable对象
而terminated()则是在关闭线程池的方法中调用,而关闭线程池有两个方法,我贴其中一个:
image.png
所以,我们要扩展线程池,只需要重写这三个方法,并实现我们自己的功能即可,这三个方法分别都会在任务执行前调用、任务执行完成后调用、线程池关闭后调用。
所以,在上面我们的优先级线程池的代码上,我们再扩展一个具有暂停功能的优先级线程池,代码如下:
public class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}
优化线程池ThreadPoolExecutor
虽说线程池极大改善了系统的性能,不过创建线程池也是需要资源的,所以线程池内线程数量的大小也会影响系统的性能,大了反而浪费资源,小了反而影响系统的吞吐量,所以我们创建线程池需要把握一个度才能合理的发挥它的优点,通常来说我们要考虑的因素有CPU的数量、内存的大小、并发请求的数量等因素,按需调整。
通常核心线程数可以设为CPU数量+1,而最大线程数可以设为CPU的数量*2+1。
获取CPU数量的方法为:
Runtime.getRuntime().availableProcessors();
资源回收
考虑到系统资源是有限的,对于线程池超出 corePoolSize 数量的空闲线程应进行回收操作。进行此操作存在一个问题,即回收时机。目前的实现方式是当线程空闲时间超过 keepAliveTime 后,进行回收。除了核心线程数之外的线程可以进行回收,核心线程内的空闲线程也可以进行回收。回收的前提是allowCoreThreadTimeOut属性被设置为 true,通过public void allowCoreThreadTimeOut(boolean) 方法可以设置属性值。
排队策略
当线程数量大于等于 corePoolSize,workQueue 未满时,则缓存新任务。这里要考虑使用什么类型的容器缓存新任务,通过 JDK 文档介绍,我们可知道有3中类型的容器可供使用,分别是同步队列,有界队列和无界队列。对于有优先级的任务,这里还可以增加优先级队列。以上所介绍的4中类型的队列,对应的实现类如下:
image.png拒绝策略
线程数量大于等于 maximumPoolSize,且 workQueue 已满,则使用拒绝策略处理新任务。Java 线程池提供了4中拒绝策略实现类,如下:
image.png以上4个拒绝策略中,AbortPolicy 是线程池实现类所使用的策略。我们也可以通过方法public void setRejectedExecutionHandler(RejectedExecutionHandler)修改线程池决绝策略。
线程的创建与复用
在线程池的实现上,线程的创建是通过线程工厂接口ThreadFactory的实现类来完成的。默认情况下,线程池使用Executors.defaultThreadFactory()方法返回的线程工厂实现类。当然,我们也可以通过
public void setThreadFactory(ThreadFactory)方法进行动态修改。具体细节这里就不多说了,并不复杂,大家可以自己去看下源码。
在线程池中,线程的复用是线程池的关键所在。这就要求线程在执行完一个任务后,不能立即退出。对应到具体实现上,工作线程在执行完一个任务后,会再次到任务队列获取新的任务。如果任务队列中没有任务,且 keepAliveTime 也未被设置,工作线程则会被一致阻塞下去。通过这种方式即可实现线程复用。
说完原理,再来看看线程的创建和复用的相关代码(基于 JDK 1.8),如下:
+----ThreadPoolExecutor.Worker.java
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
// 调用线程工厂创建线程
this.thread = getThreadFactory().newThread(this);
}
// Worker 实现了 Runnable 接口
public void run() {
runWorker(this);
}
+----ThreadPoolExecutor.java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 循环从任务队列中获取新任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行新任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 线程退出后,进行后续处理
processWorkerExit(w, completedAbruptly);
}
}
提交任务
通常情况下,我们可以通过线程池的submit方法提交任务。被提交的任务可能会立即执行,也可能会被缓存或者被拒绝。任务的处理流程如下图所示:
image.png上面的流程图不是很复杂,下面再来看看流程图对应的代码,如下:
+---- AbstractExecutorService.java
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 创建任务
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 提交任务
execute(ftask);
return ftask;
}
+---- ThreadPoolExecutor.java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果工作线程数量 < 核心线程数,则创建新线程
if (workerCountOf(c) < corePoolSize) {
// 添加工作者对象
if (addWorker(command, true))
return;
c = ctl.get();
}
// 缓存任务,如果队列已满,则 offer 方法返回 false。否则,offer 返回 true
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 添加工作者对象,并在 addWorker 方法中检测线程数是否小于最大线程数
else if (!addWorker(command, false))
// 线程数 >= 最大线程数,使用拒绝策略处理任务
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 检测工作线程数与核心线程数或最大线程数的关系
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建工作者对象,细节参考上一节所贴代码
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将 worker 对象添加到 workers 集合中
workers.add(w);
int s = workers.size();
// 更新 largestPoolSize 属性
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 开始执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
关闭线程池
我们可以通过shutdown和shutdownNow两个方法关闭线程池。两个方法的区别在于,shutdown 会将线程池的状态设置为SHUTDOWN,同时该方法还会中断空闲线程。shutdownNow 则会将线程池状态设置为STOP,并尝试中断所有的线程。中断线程使用的是Thread.interrupt方法,未响应中断方法的任务是无法被中断的。最后,shutdownNow 方法会将未执行的任务全部返回。
调用 shutdown 和 shutdownNow 方法关闭线程池后,就不能再向线程池提交新任务了。对于处于关闭状态的线程池,会使用拒绝策略处理新提交的任务。
面试题
1. Java的线程池说一下,各个参数的作用,如何进行的?
2. 按线程池内部机制,当提交新任务时,有哪些异常要考虑。
3. 几种常用的线程池
- newFixedThreadPool (固定数目线程的线程池)
- newCachedThreadPool(可缓存线程的线程池)
- newSingleThreadExecutor(单线程的线程池)
- newScheduledThreadPool(定时及周期执行的线程池)
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
线程池特点
- 核心线程数和最大线程数大小一样
- 没有所谓的非空闲时间,即keepAliveTime为0
- 阻塞队列为无界队列LinkedBlockingQueue
工作机制
image.png- 提交任务
- 如果线程数少于核心线程,创建核心线程执行任务
- 如果线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列
- 如果线程执行完任务,去阻塞队列取任务,继续执行。
使用场景
FixedThreadPool 适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。
newCachedThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
线程池特点
- 核心线程数为0
- 最大线程数为Integer.MAX_VALUE
- 阻塞队列是SynchronousQueue
- 非核心线程空闲存活时间为60秒
当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。
工作机制
image.png- 提交任务
- 因为没有核心线程,所以任务直接加到SynchronousQueue队列。
- 判断是否有空闲线程,如果有,就去取出任务执行。
- 如果没有空闲线程,就新建一个线程执行。
- 执行完任务的线程,还可以存活60秒,如果在这期间,接到任务,可以继续活下去;否则,被销毁。
使用场景
用于并发执行大量短期的小任务。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
线程池特点
- 核心线程数为1
- 最大线程数也为1
- 阻塞队列是LinkedBlockingQueue
- keepAliveTime为0
工作机制
image.png- 提交任务
- 线程池是否有一条线程在,如果没有,新建线程执行任务
- 如果有,讲任务加到阻塞队列
- 当前的唯一线程,从队列取任务,执行完一个,再继续取,一个人(一条线程)夜以继日地干活。
使用场景
适用于串行执行任务的场景,一个任务一个任务地执行。
newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
线程池特点
- 最大线程数为Integer.MAX_VALUE
- 阻塞队列是DelayedWorkQueue
- keepAliveTime为0
- scheduleAtFixedRate() :按某种速率周期执行
- scheduleWithFixedDelay():在某个延迟后执行
工作机制
- 添加一个任务
- 线程池中的线程从 DelayQueue 中取任务
- 线程从 DelayQueue 中获取 time 大于等于当前时间的task
- 执行完后修改这个 task 的 time 为下次被执行的时间
- 这个 task 放回DelayQueue队列中
使用场景
周期性执行任务的场景,需要限制线程数量的场景
4. 线程池都有哪几种工作队列?
- ArrayBlockingQueue
- LinkedBlockingQueue
- DelayQueue
- PriorityBlockingQueue
- SynchronousQueue
ArrayBlockingQueue
ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量。
LinkedBlockingQueue
LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列
DelayQueue
DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。
PriorityBlockingQueue
PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列;
SynchronousQueue
SynchronousQueue(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。
针对面试题:线程池都有哪几种工作队列? 我觉得,回答以上几种ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue等,说出它们的特点,并结合使用到对应队列的常用线程池(如newFixedThreadPool线程池使用LinkedBlockingQueue),进行展开阐述, 就可以啦。
5. 使用无界队列的线程池会导致内存飙升吗?
会的,newFixedThreadPool使用了无界的阻塞队列LinkedBlockingQueue,如果线程获取一个任务后,任务的执行时间比较长(比如,上面demo设置了10秒),会导致队列的任务越积越多,导致机器内存使用不停飙升, 最终导致OOM。
6. 线程池执行流程,即对应execute()方法:
image.png-
提交一个任务,线程池里存活的核心线程数小于线程数corePoolSize时,线程池会创建一个核心线程去处理提交的任务。
-
如果线程池核心线程数已满,即线程数已经等于corePoolSize,一个新提交的任务,会被放进任务队列workQueue排队等待执行。
-
当线程池里面存活的线程数已经等于corePoolSize了,并且任务队列workQueue也满,判断线程数是否达到maximumPoolSize,即最大线程数是否已满,如果没到达,创建一个非核心线程执行提交的任务。
-
如果当前的线程数达到了maximumPoolSize,还有新的任务过来的话,直接采用拒绝策略处理。
7. 四种拒绝策略
-
AbortPolicy(抛出一个异常,默认的)
-
DiscardPolicy(直接丢弃任务)
-
DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池)
-
CallerRunsPolicy(交给线程池调用所在的线程进行处理)
8. 线程池异常处理
- 在任务代码try/catch捕获异常
- 通过Future对象的get方法接收抛出的异常,再处理
- 为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常
- 重写ThreadPoolExecutor的afterExecute方法,处理传递的异常引用
线程池状态
//线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
image.png
RUNNING
- 该状态的线程池会接收新任务,并处理阻塞队列中的任务;
- 调用线程池的shutdown()方法,可以切换到SHUTDOWN状态;
- 调用线程池的shutdownNow()方法,可以切换到STOP状态;
SHUTDOWN
- 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
- 队列为空,并且线程池中执行的任务也为空,进入TIDYING状态;
STOP
- 该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
- 线程池中执行的任务为空,进入TIDYING状态;
TIDYING
- 该状态表明所有的任务已经运行终止,记录的任务数量为0。
- terminated()执行完毕,进入TERMINATED状态
TERMINATED
该状态表示线程池彻底终止