线程池-一文弄懂Java里面的线程池ThreadPoolExec
前言
工作中难免会使用线程池。对线程池的使用要格外的小心,说不定某天就出现了难搞的生产问题(OOM)。每次在使用的时候,我都会网上找找资料,今天我就自己全部整理了一篇,不足或错误之处,希望大家看完后多多补充,提提意见。
1、为什么要使用多线程
我们使用多线程的本质是为了提升程序的性能。程序的性能我们可以用2个指标来度量:
延迟:发出请求到收到响应这个过程的时间;延迟越短,意味着程序执行得越快,性能也就越好。
吞吐量:在单位时间内能处理请求的数量;吞吐量越大,意味着程序能处理的请求越多,性能也就越好。
同等条件下,延迟越短,吞吐量越大。但是由于它们隶属不同的维度(一个是时间维度,一个是空间维度),并不能互相转换。
我们所谓提升性能,从度量的角度,主要是降低延迟,提高吞吐量。
要想“降低延迟,提高吞吐量”,对应的方法呢,基本上有两个方向,一个方向是优化算法,另一个方向是将硬件的性能发挥到极致。前者属于算法范畴,后者则是和并发编程息息相关了。那计算机主要有哪些硬件呢?主要是两类:一个是 I/O,一个是 CPU。简言之,在并发编程领域,提升性能本质上就是提升硬件的利用率,再具体点来说,就是提升 I/O 的利用率和 CPU 的利用率。
在单核时代,多线程主要就是用来平衡 CPU 和 I/O 设备的。如果程序只有 CPU 计算,而没有 I/O 操作的话,多线程不但不会提升性能,还会使性能变得更差,原因是增加了线程切换的成本。但是在多核时代,这种纯计算型的程序也可以利用多线程来提升性能。为什么呢?因为利用多核可以降低响应时间。
2、创建线程几种方式
继承Thread类:Thread是类,有单继承的局限性。
实现Runnable接口:任务和线程分开,不能返回执行结果。
实现Callable接口:利用FutureTask执行任务,能取到执行结果。
package com.top.test.threads;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class ThreadPoolTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Thread a = new A();
Thread b = new Thread(new B());
FutureTask<String> futureTask = new FutureTask<>(new C());
Thread c = new Thread(futureTask);
a.start();
b.start();
c.start();
System.out.println(futureTask.get());
}
}
class A extends Thread {
@Override
public void run() {
System.out.println("继承Thread的线程任务");
}
}
class B implements Runnable {
@Override
public void run() {
System.out.println("实现Runnable的线程任务");
}
}
class C implements Callable<String> {
@Override
public String call() throws Exception {
return "实现Callable的线程任务";
}
}
但是我们工作中一般不这样来创建线程。原因:虽然在 Java 语言中创建线程看上去就像创建一个对象一样简单,只需要 new Thread() 就可以了,但实际上创建线程远不是创建一个对象那么简单。创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了,所以线程是一个重量级的对象,应该避免频繁创建和销毁。
那应该怎样创建线程呢?你应该立刻想到了用线程池。利用线程池把资源池化,使得线程资源能服用,可以避免频繁地创建和销毁。
3、生产者-消费者模式
线程池的设计采用了生产者-消费者模式。线程池的使用方是生产者,线程池本身是消费者,阻塞队列来存储要处理的任务。简单画了个图:
4、Java中的线程池
从jdk1.5版本开始,在java.uitl.concurrent包下面定义定义了一些列与并发相关的类,其中线程池最核心的一个类是ThreadPoolExecutor。
查看ThreadPoolExecutor的源码,看下基本的继承关系:
public class ThreadPoolExecutor extends AbstractExecutorService {
…
}
public abstract class AbstractExecutorService implements ExecutorService {
…
}
public interface ExecutorService extends Executor {
…
}
public interface Executor {
void execute(Runnable command);
}
我们可以看出,Executor接口中定义了execute方法,execute是用来执行我们提交的任务的。
但是类ThreadPoolExecutor源码注释中,是推荐我们使用类Executors的工程方法来创建线程池的:
* <p>To be useful across a wide range of contexts, this class
* provides many adjustable parameters and extensibility
* hooks. However, programmers are urged to use the more convenient
* {@link Executors} factory methods {@link
* Executors#newCachedThreadPool} (unbounded thread pool, with
* automatic thread reclamation), {@link Executors#newFixedThreadPool}
* (fixed size thread pool) and {@link
* Executors#newSingleThreadExecutor} (single background thread), that
* preconfigure settings for the most common usage
* scenarios.
看下源码,Executors提供四种线程池,分别为:
- newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
- newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
对于上面创建四种线程池,我这里就不一一写例子了,网上多的是例子。
既然JDK提供了这么好的工具类,我们是不是就肯定选择它呢?并不是,在阿里开发手册中有这样一条:
需要阿里开发手册的同学,可在公众号“Java尖子生”,回复“alibaba”领取。
看来,最终都有可能导致OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列并执行最大线程数。
5、自定义线程池ThreadPoolExecutor
从上面的源码,我们也看到的四种线程池的最终方式也是调用的ThreadPoolExecutor的构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
ThreadPoolExecutor 的构造函数非常复杂,现在我们自己通过ThreadPoolExecutor来创建线程池,那么我们就有必要详细了解下ThreadPoolExecutor的构造函数的每一个参数。
5.1 corePoolSize
核心池(核心线程数)的大小。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列(阻塞队列)当中。
核心线程数的选择很重要,那创建多少个线程是合理的呢?
对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
对于 I/O 密集型计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关的,我们可以总结出这样一个公式【理论值】:
最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]
但实际【经验值】告诉我们应该为:2 * CPU 的核数 + 1
I/O 耗时和 CPU 耗时的比值是一个关键参数,不幸的是这个参数是未知的,而且是动态变化的,所以工程上,我们要估算这个参数,然后做各种不同场景下的压测来验证我们的估计。不过工程上,原则还是将硬件的性能发挥到极致,所以压测时,我们需要重点关注 CPU、I/O 设备的利用率和性能指标(响应时间、吞吐量)之间的关系。
工作中都是按照逻辑核数来的,理论值和经验值只是提供个指导,实际上还是要靠压测!!!
5.2 maximumPoolSize
线程池最大线程数,表示在线程池中最多能创建多少个线程。当阻塞队列装满时,继续提交任务,会创建救急(非核心)线程来处理。注意: 不是先前创建的线程是核心线程,后面创建的线程是非核心线程,线程是没有核心非核心的概念的。
5.3 不同场景下提交任务,线程池的表现形式
【其中workerThread表示工作线程数】。
当workerThread等于0时,提交任务,创建线程
当workerThread小于corePoolSize时,提交任务,创建线程,不管其他工作线程是不是闲置的。
当workerThread大于等于corePoolSize 且 workerThread小于maxinumPoolsize时,将任务添加到队列中,当队列满了后,创建线程。当一个线程完成任务时,它会从队列中取下一个任务来执行。
当workerThread等于maxinumPoolsize时,提交任务,既不能加入队列,也不能创建新的线程,将RejectedExecutionHandler的rejectedExecution方法执行拒绝策略。
下面是ThreadPoolExecutor 的execute()方法的源码及注释:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
5.4 keepAliveTime和unit
keepAliveTime表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
5.5 workQueue
一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
线程池的排队策略与BlockingQueue有关。
关于队列、阻塞队列,后面我会写一篇文章。
5.6 threadFactory
通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
给线程赋予一个有意义的名字很重要,我这里整理了几种方法:
方法一:自己实现ThreadFactory并制定给线程池,在实现的ThreadFactory中设定计数和调用Thread.setName。
方法二:guava的ThreadFactoryBuilder.setNameFormat可以指定一个前缀,使用%d表示序号;例如:
ThreadFactory myThreadFactory =(new ThreadFactoryBuilder()).setNameFormat("my-pool-thread-%d").build();
ThreadFactory是JDK自带的,下面我们详细看下如何用ThreadFactory来实现。
ThreadFactory是一个只有一个newThread方法的接口:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
我们利用这个方法我们可以做什么呢?可以给线程命名,查看线程数,指定是否守护线程,设置线程优先级等等。
下面是我写的完整例子,你可以拿这个例子做其他场景的测试(对shutdown等方法不太懂的,往下看):
package com.top.test.threads;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class TestThreadFactory {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8, 100,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(100),
new CustomThreadFactory(false, 1), new ThreadPoolExecutor.AbortPolicy());
System.out.println("开始往线程池中提交任务");
// 可以把10改成其他值来测试其他场景哦-_-
for (int i = 0; i < 10; i++) {
threadPool.submit(() -> {
// 可以在此睡眠一下
/*try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
System.out.println("我是子线程:" + Thread.currentThread().getName());
});
}
System.out.println("一共往线程池提交了10个任务");
System.out.println("线程池是否已关闭:" + threadPool.isShutdown());
// 发起关闭线程池
threadPool.shutdown();
System.out.println("线程池是否已关闭:" + threadPool.isShutdown());
System.out.println("线程池是否已终止:" + threadPool.isTerminated());
threadPool.awaitTermination(1, TimeUnit.SECONDS);
if (threadPool.isTerminated()) {
System.out.println("线程池是否已终止:" + threadPool.isTerminated());
} else {
System.out.println("线程池未终止(超时)就退出");
}
}
}
class CustomThreadFactory implements ThreadFactory {
// 原子类 CAS 保证线程安全
private AtomicInteger threadCount = new AtomicInteger();
private boolean isDaemon;
private int priority;
public CustomThreadFactory(boolean isDaemon, int priority) {
this.isDaemon = isDaemon;
this.priority = priority;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("My_Custom_Thread_" + threadCount.incrementAndGet());
thread.setDaemon(isDaemon);
thread.setPriority(priority);
return thread;
}
}
执行结果:
Connected to the target VM, address: '127.0.0.1:53514', transport: 'socket'
开始往线程池中提交任务
一共往线程池提交了10个任务
线程池是否已关闭:false
线程池是否已关闭:true
线程池是否已终止:false
我是子线程:My_Custom_Thread_1
我是子线程:My_Custom_Thread_1
我是子线程:My_Custom_Thread_1
我是子线程:My_Custom_Thread_2
我是子线程:My_Custom_Thread_3
我是子线程:My_Custom_Thread_4
我是子线程:My_Custom_Thread_5
我是子线程:My_Custom_Thread_6
我是子线程:My_Custom_Thread_7
我是子线程:My_Custom_Thread_8
线程池是否已终止:true
Disconnected from the target VM, address: '127.0.0.1:53514', transport: 'socket'
Process finished with exit code 0
5.6 Handler
通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。
至于拒绝的策略,你可以通过 handler 这个参数来指定。ThreadPoolExecutor 已经提供了以下 4 种策略。
- CallerRunsPolicy:提交任务的线程自己去执行该任务。
- AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
- DiscardPolicy:直接丢弃任务,没有任何异常抛出。
- DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
注意:使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。下面我给出一个完整的示例代码:
public static class CallerRunsEnhancedPolicy implements RejectedExecutionHandler {
public CallerRunsEnhancedPolicy() {}
/**
* Executes task r in the caller's thread, unless the executor has been shut down, in which case the task is
* discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 可以在此加一条日志
if (!e.isShutdown()) {
r.run();
}
}
}
public static class AbortEnhancedPolicy implements RejectedExecutionHandler {
public AbortEnhancedPolicy() {}
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 可以在此加一条日志
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
}
}
public static class RetryPolicy implements RejectedExecutionHandler {
public RetryPolicy() {}
/**
* 提醒并进入后补线程池排队执行
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 可以在此加一条日志
if (!e.isShutdown()) {
// 可以在此加一条日志
try {
// ThreadPool.getAlternateExecutor()返回的是一个线程池,这个线程池应该是个全局的线程池,最好只初始化一次,保证其线程的安全性(可以用单例模式来初始化)
ThreadPool.getAlternateExecutor().execute(r);
} catch (Exception ex) {
// 可以在此加一条日志
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()
+ "; failure retry execute task in " + ThreadPool.getAlternateExecutor().toString());
}
}
}
}
public static class WaitPolicy implements RejectedExecutionHandler {
public WaitPolicy() {}
/**
* 提醒并等待,直到加入线程池队列中
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 可以在此加一条日志
if (!e.isShutdown()) {
// 可以在此加一条日志
try {
e.getQueue().put(r);
// 可以在此加一条日志
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
// 可以在此加一条日志
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()
+ "; waiting put task to pool interrupted");
} catch (Exception ex) {
// 可以在此加一条日志
throw new RejectedExecutionException(
"Task " + r.toString() + " rejected from " + e.toString() + "; failure put task to pool");
}
}
}
}
下面是ThreadPool的定义:
public final class ThreadPool {
public static final int LOGICAL_CPU_CORE_COUNT = Runtime.getRuntime().availableProcessors();
public static final int MAX_POOLSIZE_THRESHOLD = 2000;
private static Object alternateExecutorLock = new Object();
private static volatile ListenableThreadPoolExecutor alternateExecutor = null;
public static ThreadPoolExecutor getAlternateExecutor() {
synchronized (alternateExecutorLock) {
if (alternateExecutor == null) {
initAlternateExecutor();
} else if (alternateExecutor.isShutdown()) {
boolean term = false;
while (!term) {
try {
term = alternateExecutor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
initAlternateExecutor();
}
return alternateExecutor;
}
}
private static void initAlternateExecutor() {
int threadCount =
getRecommendedThreadPoolSizeByTaskIntensiveType(TaskIntensiveType.CPU, MAX_POOLSIZE_THRESHOLD);
int maxThreadCount = getMaxThreadPoolSizeByTaskIntensiveType(TaskIntensiveType.CPU, MAX_POOLSIZE_THRESHOLD);
alternateExecutor = newThreadPool(threadCount, maxThreadCount, 60L, maxThreadCount * 3,
new WaitPolicy(), (new ThreadFactoryBuilder()).setNameFormat("alternate-pool-thread-%d").build());
alternateExecutor.allowCoreThreadTimeOut(true);
}
public static int getRecommendedThreadPoolSizeByTaskIntensiveType(TaskIntensiveType taskIntensive,
int poolSizeThreshold) {
int poolSize;
switch (taskIntensive) {
case IO:
poolSize = 2 * LOGICAL_CPU_CORE_COUNT + 1; // NOSONAR
break;
case CPU:
default:
poolSize = LOGICAL_CPU_CORE_COUNT + 1;
break;
}
if (poolSize > poolSizeThreshold) {
return poolSizeThreshold;
}
return poolSize;
}
public static int getMaxThreadPoolSizeByTaskIntensiveType(TaskIntensiveType taskIntensive, int poolSizeThreshold) {
int poolSize;
switch (taskIntensive) {
case IO:
poolSize = 2 * LOGICAL_CPU_CORE_COUNT * 5 + 1; // NOSONAR
break;
case CPU:
default:
poolSize = LOGICAL_CPU_CORE_COUNT * 5 + 1;
break;
}
if (poolSize > poolSizeThreshold) {
return poolSizeThreshold;
}
return poolSize;
}
public enum TaskIntensiveType {
IO(0), CPU(1);
private int value;
private TaskIntensiveType(int value) {
this.value = value;
}
public static TaskIntensiveType valueOf(int value) {
for (TaskIntensiveType oneVal : TaskIntensiveType.values()) {
if (oneVal.value == value) {
return oneVal;
}
}
return null;
}
public int getValue() {
return value;
}
}
6、ThreadPoolExecutor常见方法
6.1 execute()
方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
6.2 submit()
方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。submit可以提交Runnable和 Callable。
6.3 6.3 shutdown()
将线程池状态置为SHUTDOWN。平滑的关闭ExecutorService,当此方法被调用时,ExecutorService停止接收新的任务并且等待已经提交的任务(包含提交正在执行和提交未执行)执行完成。当所有提交任务执行完毕,线程池即被关闭。所以手动调用shotdown方法,可以不必担心存在剩余任务没有执行的情况。
6.4 shutdownNow()
将线程池状态置为STOP。跟shutdown()一样,先停止接收外部提交的任务,忽略队列里等待的任务,尝试将正在跑的任务interrupt中断,返回未执行的任务列表。
对于那些正在执行的task,并不能保证他们就一定会直接停止执行,或许他们会暂停,或许会执行直到完成,但是ExecutorService会尽力关闭所有正在运行的task。
6.5 awaitTermination(long timeout, TimeUnit unit)
awaitTermination方法接收timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。
第一个参数指定的是时间,第二个参数指定的是时间单位(当前是秒)。返回值类型为boolean型。
如果等待的时间超过指定的时间,但是线程池中的线程运行完毕,那么awaitTermination()返回true。执行分线程已结束。
如果等待的时间超过指定的时间,但是线程池中的线程未运行完毕,那么awaitTermination()返回false。不执行分线程已结束。
如果等待时间没有超过指定时间,等待!
可以用awaitTermination()方法来判断线程池中是否有继续运行的线程。
下面是ThreadPoolExecutor的awaitTermination源码:
/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
// 等待时长
long nanos = unit.toNanos(timeout);
//加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
// condition条件等待
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
6.6 其它方法
isShutdown():线程池是否已关闭
isTerminated:线程池是否已终止
7、线程池是否需要关闭呢?
这里说下两种情况下的线程池:全局线程池、局部线程池。
全局线程池:其实我们工作中使用线程池都是全局的,当然我们也可能为不同业务建立不同的全局线程池。那全局线程池是否需要关闭呢?分两种场景:(一)如果每次在程序的结尾都去关闭线程池,那每次有新的任务进来,都要重新建立一个线程池,这样难免也太消耗资源了,而且原本这就是一个全局的。(二)如果一个线程池在某段时间内处理了大量的任务,创建了大量的线程,这个时间段之后可能一下子也没有新的任务进来,那这个线程池需要关闭嘛 ?我个人理解也是没必要的,我们通过参数keepAliveTime可以给线程设置在没有任务处理时的生存时间,并且调用allowCoreThreadTimeOut(boolean)方法,这样充分减少资源的浪费。
局部线程池:这种线程池工作中也该尽量避免使用。如果每个请求进来都创建一个新的线程池,当请求多的时候,线程池增加,那难免会有OOM。如果你真的使用了,那请记住一定要调用shutdown()来关闭。