并发编程(八)线程池详解
2021-01-13 本文已影响0人
Timmy_zzh
前言
- java中线程的创建以及上下文切换是比较消耗性能的,因此引入了偏向锁,轻量级锁等优化技术,目的就是减少用户态和核心态之间的切换频率。
既然线程的创建和销毁非常消耗性能,那有没有可能复用已经被创建好的线程呢?
1.线程池
- 线程池的本质就是为了节省资源和更好的进行资源管理
- 线程的创建需要开辟虚拟机栈,本地方法栈,程序计数器等线程私有的内存空间,在线程销毁时需要回收这些系统资源,频繁地创建销毁线程会浪费大量资源。
- 而通过复用已有线程可以更好地管理和协调线程的工作
- 线程池主要解决两个问题:
- 当执行大量异步任务时线程池能够提供很好的性能
- 线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等。
1.1.线程池体系

源码
// 底层接口Executor,只有一个方法
public interface Executor {
void execute(Runnable var1);
}
// 接口ExecutorService,抽象了线程池操作方法
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;
<T> Future<T> submit(Callable<T> var1);
<T> Future<T> submit(Runnable var1, T var2);
Future<?> submit(Runnable var1);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}
public abstract class AbstractExecutorService implements ExecutorService {
public AbstractExecutorService() {
}
}
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl;
private static final int COUNT_BITS = 29;
private static final int CAPACITY = 536870911;
private static final int RUNNING = -536870912;
private static final int SHUTDOWN = 0;
private static final int STOP = 536870912;
private static final int TIDYING = 1073741824;
private static final int TERMINATED = 1610612736;
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock;
private final HashSet<ThreadPoolExecutor.Worker> workers;
private final Condition termination;
private int largestPoolSize;
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private static final RejectedExecutionHandler defaultHandler = new ThreadPoolExecutor.AbortPolicy();
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
private final AccessControlContext acc;
private static final boolean ONLY_ONE = true;
...
}
- 解析
- Executor是线程池最顶层的接口,在Executor接口中只有一个execute方法,该方法用于执行任务。置语线程的创建,调度等细节由子类实现。
- 接口ExecutorService继承并拓展了Executor,在ExecutorService内部提供了更全面的任务提交机制以及线程池关闭方法。
- ThreadPoolExecutor是ExecutorService的默认实现,所谓的线程池机制也大多封装在此类当中,重点分析
- ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,并实现了ScheduledExecutorService接口
- ForkJoinPool是一种支持任务分解的线程池,一般要配合可分解任务接口ForkJoinTask来实现
2.线程池创建
Executors
- 为方便开发着使用线程池,JDK提供了一个线程池的工厂类-Executors,在Executors中定义了多个静态方法,用来创建不同配置的线程池。如下
2.1.newSingleThreadExecutor
- 创建一个单线程化的线程池,他只会用唯一的工作线程来执行任务,保证所有任务按先进先出的顺序执行。
//使用单线程线程池提交5次操作任务
private static void singleThreadPool() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
int taskId = i;
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("Thread:" +
Thread.currentThread().getName() +
" ,running task:" + taskId);
}
});
}
}
日志打印:
Thread:pool-1-thread-1 ,running task:0
Thread:pool-1-thread-1 ,running task:1
Thread:pool-1-thread-1 ,running task:2
Thread:pool-1-thread-1 ,running task:3
Thread:pool-1-thread-1 ,running task:4
源码:
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}
- 发现newSingleThreadExecutor创建的线程池中,提交的task任务始终在同一个线程中被执行
2.2.newCachedThreadPool
- 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
private static void cacheThreadPool() {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
int taskId = i;
try {
//Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("Thread:" +
Thread.currentThread().getName() +
" ,running task:" + taskId);
try {
// 模拟耗时任务
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
日志打印:
Thread:pool-1-thread-1 ,running task:0
Thread:pool-1-thread-1 ,running task:1
Thread:pool-1-thread-1 ,running task:2
Thread:pool-1-thread-1 ,running task:3
Thread:pool-1-thread-1 ,running task:4
源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,
TimeUnit.SECONDS,
new SynchronousQueue());
}
- 耗时任务执行完毕,则会将空闲的线程从新拿过来使用
- 否则创建新线程执行,且新线程创建的最大数目为Integer.MAX_VALUE个
2.3.newFixedThreadPool
//创建一个固定数目的,可重用的线程池
private static void newFixedThreadPool() {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
int taskId = i;
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("Thread:" +
Thread.currentThread().getName() +
" ,running task:" + taskId);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
日志打印:
> Task :testlib:_05ThreadPool.main()
Thread:pool-1-thread-2 ,running task:1
Thread:pool-1-thread-3 ,running task:2
Thread:pool-1-thread-1 ,running task:0
Thread:pool-1-thread-1 ,running task:3
Thread:pool-1-thread-2 ,running task:5
Thread:pool-1-thread-3 ,running task:4
Thread:pool-1-thread-1 ,running task:6
Thread:pool-1-thread-2 ,running task:7
Thread:pool-1-thread-3 ,running task:8
Thread:pool-1-thread-1 ,running task:9
源码:
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
- 上面创建了一个包含固定数量3的线程池,因此虽然向线程池提交了10个任务,但是这10个任务只会被3个线程分配执行。
2.4.newScheduledThreadPool
private static void newScheduledThreadPool() {
ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(2);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Date now = new Date();
System.out.println("Thread:" + Thread.currentThread().getName()
+ " ,timer:" + now.toString());
}
},500,500, TimeUnit.MILLISECONDS);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduledExecutorService.shutdown();
}
日志打印:
> Task :testlib:_05ThreadPool.main()
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:44 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:44 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:45 CST 2021
Thread:pool-1-thread-2 ,timer:Thu Jan 07 22:23:45 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:46 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:46 CST 2021
Thread:pool-1-thread-2 ,timer:Thu Jan 07 22:23:47 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:47 CST 2021
Thread:pool-1-thread-2 ,timer:Thu Jan 07 22:23:48 CST 2021
Thread:pool-1-thread-2 ,timer:Thu Jan 07 22:23:48 CST 2021
- 上面的代码,创建了一个线程数量为2的定时任务线程池,通过scheduleAtFixedRate方法,指定每隔500毫秒执行一次任务,并且在5秒钟之后通过shutdown关闭定时任务。
3.线程池执行机制
- 通过上面的例子可以方法,上面创建线程的静态方法最后都是创建ThreadPoolExecutor对象实例,
3.1.线程池结构

public class ThreadPoolExecutor extends AbstractExecutorService {
//根据AtomicInteger 的低29位,计算线程池线程的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程中包含的状态
//默认状态,接收新任务并处理排队任务
private static final int RUNNING = -1 << COUNT_BITS;
//不接收新任务,但处理排队任务,调用shotdown 会处于该状态
private static final int SHUTDOWN = 0 << COUNT_BITS;
//不接收新任务,也不处理排队任务,并中断正在执行的任务,调用showdownNow()
private static final int STOP = 1 << COUNT_BITS;
//所有任务都已终止,workerCount为零时,线程会切换到TIDYING状态,并运行terminate方法
private static final int TIDYING = 2 << COUNT_BITS;
//terminate方法完成后线程池切换为该状态
private static final int TERMINATED = 3 << COUNT_BITS;
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
// 线程池重要方法
//1.计算当前线程池运行状态
private static int runStateOf(int c) {
return c & -CAPACITY;
}
//2.计算当前线程数量
private static int workerCountOf(int c) {
return c & CAPACITY;
}
//3.通过状态和线程数生成ctl
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
}
- 线程池类中重要属性:
- workers集合:保存所有的核心线程和非核心线程,本质是一个HashSet
- workQueue队列:等待任务队列,当核心线程的个数达到corePoolSize时,新提交的任务会被先保存在等待队列中,本质上是一个阻塞队列BlockingQueue
- ctl:AtomicInteger类型,二进制高3位用来标识线程池的状态,低29位用来记录线程池中线程的数量
- 线程池构造函数参数说明
- corePoolSize:表示核心线程数量
- maximumPoolSize:线程池最大能够容纳同时执行的线程,必须大于或等于1。如果和maximumPoolSize相等即是固定大小线程池。
- keepAliveTime:表示线程池中的线程空闲时间,当空闲时间达到此值时,线程会被销毁直到剩下corePoolSize核心线程数量个线程的时间单位,有MILLISECONDS,SECONDS,MINUTES等
- workQueue:等待队列,BlockingQueue类型,当请求任务数大于corePoolSize时,任务会被缓存到此阻塞队列中
- threadFactory:线程工厂,线程池中使用它来创建线程,如果传入的是null,则使用默认工厂类DefaultThreadFactory
- handler:执行拒绝策略的对象,当workQueue满了之后,并且活动线程数大于maximumPoolSize的时候,线程池通过该策略处理新的请求
3.2.线程池执行流程

- 当我们调用execute或者submit,将一个任务提交给线程池,线程池收到这个任务请求后,有以下几种处理情况:
- 当线程池中运行的线程数量还没有达到corePoolSize大小时,线程池会创建一个新线程执行提交的任务,无论之前创建的线程是否处于空闲状态
- 当线程池中运行的线程数量已经达到corePoolSize大小时,线程池会把任务加入到等待队列中,直到某一个线程空闲了,线程池会根据我们设置的等待队列规则,从队列中取出一个新的任务执行
- 如果线程数大于corePoolSize数量,但是还没有达到最大线程数manimumPoolSize,并且等待队列已满,则线程池会创建新的线程来执行任务
- 最后如果提交的任务,无法被核心线程直接执行,又无法加入等待队列,又无法创建”非核心线程“直接执行,线程池根据拒绝处理器定义的策略处理这个任务。
- 如果没有在线程池中设置拒绝策略,线程池会抛出RejectedExecutionException 异常,即线程池拒绝接受这个任务。