Java 线程池剖析
概念
线程 & 线程池
- 什么是线程?
提到线程,不得不提进程。进程是操作系统的最小资源单位,一个进程能使用多少虚拟内存,能打开多少文件标识符。线程是操作系统的最小调度单元,每个进程默认有一个主线程,它用来执行真正的任务。
- 在什么情况下,需要使用线程?
多个任务并发执行,提高 CPU 利用率,响应速度,执行效率。例如:主线程负责接收请求,工作线程负责处理请求。
- 在什么情况下使用线程池?
当你需要使用多个线程来执行相同类型任务时,你不希望自己来管理线程的创建和销毁,维护任务队列,分配任务,管理定时任务。ok,交给线程池吧,你只需要向线程池提交任务就好了。
Java 线程池体系
Java 库提供了一套完整的线程池解决方案,支持多种定制化配置,满足你的需求。java.util.concurrent 库与线程池相关的接口,类的继承关系如下所示:
Executors
接口
Executor
支持提交任务
voidexecute(Runnablecommand) #提交 Runnable 任务
ExecutorService
继承 Executor,支持提交异步任务和关闭线程池。
<T>Future<T>submit(Runnabletask, Tresult)
Future<?>submit(Runnabletask)
void shutdown()
ScheduledExecutorService
继承 ExecutorService, 增加支持定时任务和延迟任务
ScheduledFuture<?> schedule(
Runnable command,
long delay,
TimeUnit unit)
<V> ScheduledFuture<V> schedule(
Callable<V> callable,
long delay,
TimeUnit unit)
ScheduledFuture<?> scheduleAtFixedRate(
Runnable command,
long initialDelay,
long period,
TimeUnit unit)
ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command,
long initialDelay,
long delay,
TimeUnit unit)
实现类
ThreadPoolExecutor
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
实现 ExecutorService 接口功能外,提供了以下几个配置项:
- 线程池大小
corePoolSize 线程池保留线程个数,maximumPoolSize 线程池最大线程个数。
默认情况下,当提交任务时,才会创建线程。当提交任务时,如果当前 running 线程数小于 corePoolSize 的话,则创建一个线程;如果当前 running 线程数大于 corePoolSize 小于 maximumPoolSize 的话,当且仅当任务队列满的情况下,才会创建一个线程;
- 线程工厂(threadFactory, 用来创建线程)
默认情况下,使用 Executors.defaultThreadFactory 来创建线程,它们同属于一个线程组,有相同的优先级,non-daemon 状态。
- 线程空闲多久销毁(keepAliveTime)
当线程个数超过 corePoolSize 之后,如果有线程空闲时间超过 keepAliveTime 的话,线程池就会释放该线程。
- 任务队列(workQueue)
用来保存未提交和已提交的任务。任务队列的使用情况与线程池内部线程个数有关,如果线程个数小于 corePoolSize 的话,则不会缓存任务,而是创建线程执行任务;如果线程个数大于 corePoolSize 的话,则先缓存任务;如果队列满了,如果线程个数小于 maximumPoolSize 的话,则创建新线程;否则拒绝该任务。
目前提供 3 种队列:(1)Direct handoffs (2)Unbounded queues 默认(3) Bounded queues。详细参考:ThreadPoolExector
- 当任务队列满了之后的处理策略
ThreadPoolExecutor.AbortPolicy 抛出异常 RejectedExecutionException(默认)
ThreadPoolExecutor.DiscardPolicy 直接丢掉
ThreadPoolExecutor.DiscardOldestPolicy 丢掉队列中最老的任务
ScheduledThreadPoolExecutor
继承 ThreadPoolExecutor,并实现 ScheduledExecutorService 接口行为。
工具接口和类
ThreadFactory
接口,定义创建线程
Thread newThread(Runnable r)
Executors
用来创建 ExecutorService, ScheduledExecutorService, ThreadFactory, Callable。
public static <T> Callable<T>
callable(
Runnable task,
T result)
public static ThreadFactory
defaultThreadFactory()
public static ExecutorService
newCachedThreadPool(
ThreadFactory threadFactory)
public static ExecutorService
newFixedThreadPool(
int nThreads,
ThreadFactory threadFactory)
最佳实践
- 使用 Executors 工具类创建 ThreadPoolExecutor
Executors.newFixedThreadPool(5);
Executors.newScheduledThreadPool(3);
- 自定义 ThreadFactory,定义 Thread 的名称和设置 Thread 的 daemon 状态。
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = Executors
.defaultThreadFactory()
.newThread(r);
t.setName("myname");
t.setDaemon(true);
return t;
}
}
- 可以设置有固定大小的队列
遗憾的是,Executors 并没有提供支持定制化队列的接口,所以只能使用 ThreadPoolExecutor。
BlockingQueue<Runnable> queue =
new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(
n,
n,
0L,
TimeUnit.MILLISECONDS,
queue);
- 记住优雅的关闭 ThreadPoolExecutor
shutdown 不接受新的任务,保证已经接受的任务提交执行,但是并不保证任务执行结束
awaitTermination 保证已经执行的任务完整执行结束。
shutdownNow 关闭正在执行的任务,但不保证正在执行的任务正常关闭。未执行的任务直接返回。
cxnResetExecutor.shutdown();
try {
if (cxnResetExecutor.awaitTermination(
5, TimeUnit.SECONDS)) {
cxnResetExecutor.shutdownNow();
}
} catch (Exception ex) {
logger.error("Interrupted while waiting for connection reset executor " +
"to shut down");
}
- 监控 ThreadPoolExecutor 状态
public long getCompletedTaskCount() #获取已经完成的 task 总个数
public int getActiveCount() #获取正在活的线程个数
public int getLargestPoolSize() #获取线程池曾经最大的线程数
public BlockingQueue<Runnable> getQueue() #获取任务队列中任务个数
参考
Executors
Executor
ThreadPoolExecutor
ExecutorService-10个要诀和技巧
flume 源码
Java线程池分析
欢迎大家访问本人网站 程序员工具箱 致力于贡献便捷的工具,帮助程序员写出更优秀的代码,同时也欢迎大家一起来贡献。