理解线程池
类继承结构
image.png
相关源码
Executor
public interface Executor {
void execute(Runnable command);
}
ExecutorService
public interface ExecutorService extends Executor {
//关闭线程池,已提交的任务继续执行,不接受继续提交新任务
void shutdown();
//关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务
//跟上面区别:会去停止当前正在进行的任务
List<Runnable> shutdownNow();
//线程池是否已经关闭
boolean isShutdown();
//如果调用了shutdown()或者shutdownNow()后,所有任务结束了,那么返回true
//这个方法必须在调用shutdown或shutdownNow之后调用才会返回true
boolean isTerminated();
//先调用shutdown或者shutdownNow,然后调用这个方法等待所有线程真正完成,返回值意味着有无超时
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
//提交一个callable任务
<T> Future<T> submit(Callable<T> task);
//提交一个runnable任务,第二个参数将放在future中作为返回值,因为runnable的run方法并不会返回任何东西
/**
* runnable的void run()是没有返回值的,所以,通常,我们需要的话,会在submit中指定第二个参数为返回值
* 其实到时候会通过这两个参数,将其包装成Callable,和Runnable的区别在于run()没有返回值,Callale的call
* 方法有返回值,同时,如果运行出现异常,call()会抛出异常
*/
<T> Future<T> submit(Runnable task,T result);
//提交一个runnable任务
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
//在超时时间内执行所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit
) throws InterruptedException;
//只要其中有一个任务结束了,就可以返回,返回执行完的那个任务的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit) throws InterruptedException, TimeoutException;
}
FutureTask
摘抄部分方法
public class FutureTask<V> implements RunnableFuture<V> {
/**
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
//如果执行过程中出错,走这一步,future有,future.get()抛出异常
throw new ExecutionException((Throwable)x);
}
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
//获取执行结果
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
protected void done() { }
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
}
线程池主要组件
image.png
上图没有考虑队列是否有界,提交任务时队列满了怎么办?什么情况下会创建新的线程?提交任务时线程池满了怎么办?空闲线程怎么关掉
我们经常会使用 Executors 这个工具类来快速构造一个线程池,开发者不需要关注太多的细节,只要知道自己需要一个线程池,仅仅提供必需的参数就可以了,其他参数都采用作者提供的默认值
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
//最终指向的方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池主要属性
- corePoolSize
- maximumPoolSize
线程池允许创建的最大线程数 - keepAliveTime
空闲线程的存活时间,如果某线程的空闲时间超过这个值都没有任务给它做,那么该线程可以被关闭了。注意这个值并不会对所有线程起作用,如果线程池中的线程数少于等于核心线程数 corePoolSize,那么这些线程不会因为空闲太长时间而被关闭,当然,也可以通过调用 allowCoreThreadTimeOut(true)使核心线程数内的线程也可以被回收 - workQueue
任务队列,BlockingQueue 接口的某个实现(常使用 ArrayBlockingQueue 和LinkedBlockingQueue) - threadFactory
用于生成线程,一般我们可以用默认的就可以了。通常,我们可以通过它将我们的线程的名字设置得比较可读一些,如 Order-Thread-1, Product-Thread-2 类似这样。 - handler
当线程池已经满了,但是又有新的任务提交的时候,该采取什么策略由这个来指定。有几种方式可供选择,像抛出异常、直接拒绝然后返回等,也可以自己实现相应的接口实现自己的逻辑
其他属性
用32位整数来保存线程池运行状态(高3位)和线程池线程数量(低29位)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//计数位:一个int整数是4个字节,一个字节占用8位,所以共占用32位 32-3=29
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池容量1左移29位,为1*2^29-1=536,870,911
//00000000 00000000 00000000 00000001->001 0000 00000000 00000000 00000000
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池运行状态保存在32位中的高3位
//-1在Java底层是由32个1表示的,左移29位的话,即111 00000 00000000 00000000 00000000
//也就是低29位全部为0,高3位全部为1的话,表示RUNNING状态,即-536870912;
private static final int RUNNING = -1 << COUNT_BITS;
//0在Java底层是由32个0表示的,无论左移多少位,还是32个0,即000 00000 00000000 00000000 00000000
//也就是低29位全部为0,高3位全部为0的话,表示SHUTDOWN状态,即0
private static final int SHUTDOWN = 0 << COUNT_BITS;
//1在Java底层是由前面的31个0和1个1组成的,左移29位的话,即001 00000 00000000 00000000 00000000
//也就是低29位全部为0,高3位为001的话,表示STOP状态,即536870912
private static final int STOP = 1 << COUNT_BITS;
//2在Java底层是由前面的30个0和1个10组成的,左移29位的话,即010 00000 00000000 00000000 00000000
//也就是低29位全部为0,高3位为010的话,表示TIDYING状态,即1073741824
private static final int TIDYING = 2 << COUNT_BITS;
//3在Java底层是由前面的30个0和1个11组成的,左移29位的话,即011 00000 00000000 00000000 00000000
//也就是低29位全部为0,高3位为011的话,表示TERMINATED状态,即1610612736
private static final int TERMINATED = 3 << COUNT_BITS;
//~是按位取反的意思,CAPACITY表示的是高位的3个0,和低位的29个1
//而~CAPACITY则表示高位的3个1,29个低位的0,然后再与入参c执行按位与操作,即高3位保持原样
//低29位全部设置为0,也就获取了线程池的运行状态runState
private static int runStateOf(int c) { return c & ~CAPACITY; }
//也就是与000 11111 11111111 11111111 11111111进行与操作,c的前三位通过与000进行与操作
//无论c前三位为何值,最终都会变成000,也就是舍弃前三位的值,而c的低29位与29个1进行与操作
//c的低29位还是会保持原值,这样就从AtomicInteger ctl中解析出了workerCount的值
private static int workerCountOf(int c) { return c & CAPACITY; }
//或操作:规则:即 0 | 0= 0 , 1 | 0= 1 , 0 | 1= 1 , 1 | 1= 1
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
线程池的各个状态和状态变化转换过程
- running:正常状态,接受新的任务,处理等待队列中的任务
- shutdow:不接受新任务提交,但是会继续处理等待队列中任务
- stop:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行的任务
- tidying:所有任务都销毁了,workCount=0,线程池状态在转换为这个状态的时候,会执行terminated()
- terminated:terminated()方法结束后,线程池的状态变为这个
源码解读
内部类worker
Doug Lea 把线程池中的线程包装成了一个个 Worker(工人),就是线程池中做任务的线程。所以到这里,我们知道任务是 Runnable(内部变量名叫 task 或 command),线程是 Worker
注意:worker构造方法创建线程传入的参数是自身对象
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
//执行任务的线程
final Thread thread;
//runnable是任务,之所以叫firstTask是因为线程池中线程可以执行多个任务
//初始化如果传了这里有,如果没传就在任务队列中取任务getTask()
Runnable firstTask;
//存放此线程完成的任务数
volatile long completedTasks;
//初始化这个worker,传入参数为第一个任务,也可以为空
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//创建当前worker所需要的线程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
//...下面省略,就是用 AQS 操作,来获取这个线程的执行权,用了独占锁
}
AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
//我们这里调用submit方法,传入一个期望的返回值
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
//将执行结果包装为RunnableFuture
RunnableFuture<T> ftask = newTaskFor(task, result);
//具体的实现在子类ThreadPoolExecutor中
//又因为runnableFuture实现了runnable接口,因此可以传入
execute(ftask);
return ftask;
}
/**下面方法来自FutureTask类*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
/**下面方法来自Excutors工具类,即将result和runnable包装为callable并返回*/
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
execute
下面开始分析ThreadPoolExecutor实现的execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取的时候-536870912
int c = ctl.get();
//workerCountOf(c)=0
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
//外层retry循环
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//时刻牢记,双与意思是执行右边的前提是左边必须成立!
/*
* 判断线程池是否已经关闭
*
* 当线程池运行状态为shutdown,stop,tidying或者terminated的时候
* (运行状态=shutdown,同时第一个任务为空,同时任务队列不为空)不成立的话
*/
if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))
//直接返回创建失败
return false;
//内层循环
for (;;) {
int wc = workerCountOf(c);
//如果当前线程池中工作线程数 ≥ 临界值了
// 当前要创建的是核心线程:当前线程池工作线程数 ≥ 核心线程数
// 当前要创建的不是核心线程:当前线程池工作线程数 ≥ 最大线程数
if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))
//直接返回创建失败
return false;
//如果CAS增加工作线程数目失败,即有其他线程已经更改过工作线程数目
if (compareAndIncrementWorkerCount(c))
//退出外层循环
break retry;
//重新获取一下线程运行状态,如果两个不一致就退出外层循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
上面调用t.start以后开始执行下面,因为
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
所以实际执行的是内部类worker的run()
public void run() {
runWorker(this);
}
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//获取当前工人的第一个任务,如果有的话
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 适当w工人当前持有的锁
boolean completedAbruptly = true;
try {
//循环获取:当第一个任务不为空或者任务队列中获取的任务不为空的时候
while (task != null || (task = getTask()) != null) {
w.lock();
//runStateAtLeast: c1>=c2 || (Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)
//当前线程没有被中断
if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())
//通知当前线程应当中断
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行这个worker被分配的第一个任务
task.run();
} catch (RuntimeException | Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
//执行完
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//worker进程退出
processWorkerExit(w, completedAbruptly);
}
}
getTask
//从任务队列中拉取任务
private Runnable getTask() {
//拉取任务超时标记
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//运行状态>=STOP或者 运行状态=SHUTDOWN+队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//当前线程池工作线程数量
int wc = workerCountOf(c);
//是否允许超时=是否允许核心线程超时回收||当前线程池工作线程数量>核心线程数量
//如果当前线程池数量小于核心线程数量且不允许核心线程超时则false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//只要(条件1:当前线程池中线程数量>设置的最大线程池容量)或者(条件2:超时设置为真并且之前拉取任务出现超时情况)就可以执行到右边
//如果是条件2过来的则判断当前池中线程数量是否大于1
//如果是条件1过来的则判断当前任务队列是否为空
if (
(wc > maximumPoolSize || (timed && timedOut))
&&
(wc > 1 || workQueue.isEmpty())
) {
//扣减当前工人数量
if (compareAndDecrementWorkerCount(c))
//意味着关闭线程
return null;
continue;
}
try {
//任务队列中拉取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
//拉取到,直接返回这个任务
return r;
timedOut = true; //没拉取到,超时
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
Executors
- 生成固定大小线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
最大线程数设置为与核心线程数相等,此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,无界队列
过程分析:刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为 nThreads。
- 生成只有一个线程的固定线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
这个更简单,和上面的一样,只要设置线程数为 1 就可以了:
- 生成一个需要的时候就创建新的线程,同时可以复用之前创建的线程(如果这个线程当前没有任务)的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue
这种线程池对于任务可以比较快速地完成,有比较好的性能。如果线程空闲了 60 秒都没有任务,那么将关闭此线程并从线程池中移除。所以如果线程池空闲了很长时间也不会有问题,因为随着所有的线程都会被关闭,整个线程池不会占用任何的系统资源
ScheduledExecutorService
周期执行任务的线程池
常用方法:
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
创建并执行在给定延迟后启用的单次操作。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
创建并执行在给定延迟后启用的单次操作。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
创建并执行在给定的初始延迟之后,以给定的时间间隔执行周期性动作。即在 initialDelay 初始延迟后,initialDelay+period 执行第一次,initialDelay + 2 * period 执行第二次,依次类推。并且如果任务的任何一个执行遇到异常,则后续执行都会被取消
public class MyRunable implements Runnable {
private AtomicInteger atomicInteger;
private Random random;
public MyRunable() {
atomicInteger = new AtomicInteger(0);
random = new Random();
}
@Override
public void run() {
try {
String threadName = Thread.currentThread().getName();
System.out.println("start-任务执行开始:" + new Date() + ":" + threadName);
/**使用随机延时[0-3]秒来模拟执行任务*/
int sleepNumber = random.nextInt(5);
TimeUnit.SECONDS.sleep(sleepNumber);
System.out.println(atomicInteger.get());
if (atomicInteger.getAndAdd(1) == 3) {
int error = 10 / 0; //模拟抛出异常
}
System.out.println("end-任务执行完毕:" + new Date() + ":" + threadName);
} catch (Exception e) { //最大范围包裹异常保证后续任务正常执行
e.printStackTrace();
}
}
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
System.out.println("3秒后开始执行计划线程池服务..." + new Date());
/**每间隔4秒执行一次任务*/
scheduledExecutorService.scheduleAtFixedRate(new MyRunable(),
3, 4, TimeUnit.SECONDS);
}
}
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
fixedDelay它的间隔时间是根据上次的任务结束的时候开始计时的。比如一个方法上设置了fixedDelay=5*1000,那么当该方法某一次执行结束后,开始计算时间,当时间达到5秒,就开始再次执行该方法