Android开发经验谈Java 杂谈Java

深度剖析 ThreadPoolExecutor 的使用和原理

2019-06-12  本文已影响6人  RunAlgorithm

目录

  1. 概述
  2. 继承关系
    2.1. Executor
    2.2. ExecutorService
    2.3. AbstractExecutorService
  3. 生命周期
  4. 如何使用
    4.1. 核心配置
    4.2. 常用线程池
  5. 源码解析
    5.1. 核心组件
    5.2. 线程池状态记录
    5.3. 执行任务
    5.4. 获取线程池运行情况
    5.5. 关闭线程池

1. 概述

为什么要使用线程池?

线程的创建和销毁涉及到系统调用等等,是比较消耗 CPU 资源的。如果线程本身执行的任务时间短,创建和销毁线程所耗费的资源就会占很大比例,对系统起源和运行时间上是一个瓶颈。

这时候线程池就派上了用场,作为线程的容器,实现对线程的复用。如果线程长时间没有任务可以执行,也是很消耗系统资源的,所以也有对线程的销毁机制。

有了这个中心化的容器,我们不需要关心线程的创建和销毁,由线程池进行合理地调度,还可以对线程的执行情况进行监控。

线程池的优势可以归纳如下:

ThreadPoolExecutor 是大神 Doug Leajava.util.concurrent 包提供的线程池实现,很好地满足了各个应用场景对于线程池的需求。

这里对 ThreadPoolExecutor 的使用和原理进行详细的分析。

2. 继承关系

首先了解一下 ThreadPoolExecutor 的继承关系。如下:

继承关系

2.1. Executor

执行器,只声明了一个execute() 方法用来执行 Runnable 任务。

public interface Executor {
    void execute(Runnable command);
}

执行器子类的实现,可以对任务的执行制定各种策略,比如:

ThreadPoolExecutor 作为它的实现类,实现了具体的执行策略。比如在内部有工作线程队列和任务队列,实现了多线程并发执行任务。

2.2. ExecutorService

增加了对执行器生命周期的管理,并扩展对 Callable 任务的支持,返回 Future 来提供给调用者。

涉及到的生命周期的处理方法有:

可以看到主要是对执行器对一些关闭处理。ThreadPoolExecutor 实现了这些方法,比如使用 shutdown() 来关闭线程池。但是线程池不会马上关闭,所以还可以继续调用 awaitTermination() 方法来阻塞等待线程池正式关闭。

2.3. AbstractExecutorService

抽象类,配置了多任务执行的工具方法。

ExecutorService.submit() 提供默认实现,可供子类直接使用。

3. 生命周期

这里了解一下线程池的生命周期,状态有这 5 种:

这个生命周期如下:

生命周期

4. 如何使用

这里介绍线程池的一些常规配置和使用。

线程池的构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler);

4.1. 核心配置

这里再补充一个配置:

4.2. 常用线程池

Executors 工具类提供了多种比较常规的线程池配置,具体使用哪一种要看实际的业务场景。主要有这几种:

4.2.1. 固定线程数线程池

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

配置如下:

线程一旦创建会一直存在于线程池中,直到达到核心线程数。如果中途有某个线程发生异常退出了,线程池会马上新起一个线程补上,直到使用 shutdown() 关闭线程池。

因为任务队列是无界队列,在核心线程数满了后,新任务会不断地加入到队列中,不存在任务队列满了的情况。我们知道 maximumPoolSize 生效的前提是任务队列满负荷,所以在该线程池中,maximumPoolSize 不生效

超时时间设置 0,如果设置 allowCoreThreadTimeout 的话会对核心线程生效。否则的话这个配置也没有效果。

适用于每个时间段执行的任务数相对稳定,数量不大,执行时间又比较长的场景

使用该线程池的风险在于任务队列没有指定长度,在短时间高密度任务同时并发执行的情况下,可能会发生内存溢出。

4.2.2. 单一线程数线程池

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

配置如下:

和固定线程数相似,只是线程数为固定为 1。

和固定线程池不一样,这个线程池不允许重新配置 maximumPoolSize。为了满足这个需求,内部做了一个代理线程池 FinalizableDelegatedExecutorService,不再提供 setMaxmiumPoolSize 等方法来重新配置线程池。

所有任务都顺序执行。适合那些任务执行时间短,实时性要求又不高的场景。

和固定线程数线程池一样,使用无界队列,有内存溢出风险。

4.2.3. 缓存线程数线程池

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

配置如下:

在每个任务到达,没有线程空闲的话,会再创建一个。

线程执行完任务后,没有新任务到达,会有 60s 的保活时间,这段时间没有新任务到达后线程就会直接关闭。

适用于那些任务量大,然后耗时时间比较少的场景。比如 OkHttp 的线程池的配置就和缓存线程数线程池的配置一样。

没有使用无界队列,但是不限制最大线程数量,如果短时间高并发任务执行的话,也是有内存溢出风险。

4.2.4. 定时任务线程池

public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

直接创建一个 ScheduledThreadPoolExecutor

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE,
    DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
    new DelayedWorkQueue(), threadFactory, handler);
}

配置如下:

所以可以用来执行定时任务,或者有固定周期执行的任务。

5. 源码解析

5.1. 核心组件

这里分析一下 ThreadPoolExecutor 内部的主要部分。正是通过这些组件的结构组合和行为交互,实现了线程池的基本功能。

5.1.1. Thread

线程实现类。JVM 的线程和操作系统的线程是对应的。Thread很多方法最终都会执行到 native 方法上。

Thread 非常庞大和复杂,我们这次主要分析它的生命周期,以及正确关闭线程的方式(中断)。

5.1.1.1. 生命周期

一个线程又这几个状态:

完整的线程状态机如下:

状态机
5.1.1.2. 优先级

线程的优先级有 1-10,默认优先级是 5。有这几种类型:

线程优先级体现的是 竞争到 CPU 资源的概率,高优先级概率高。所以,并不能百分白保证高优先级的线程一定优先执行。

在线程初始化的时候就会进行优先级设置,见 Thread.init()

private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc) {
    ...
    Thread parent = currentThread();
    ...
    this.group = g;
    this.daemon = parent.isDaemon();
    this.priority = parent.getPriority();
    ...
    setPriority(priority);
    ...
}

也可以直接调用 Thread.setPriority() 方法设置:

public final void setPriority(int newPriority) {
    ThreadGroup g;
    checkAccess();
    if (newPriority > MAX_PRIORITY || newPriority < MIN_PRIORITY) {
        throw new IllegalArgumentException();
    }
    if((g = getThreadGroup()) != null) {
        if (newPriority > g.getMaxPriority()) {
            newPriority = g.getMaxPriority();
        }
        setPriority0(priority = newPriority);
    }
}
5.1.1.3. 线程中断

中断是退出线程的一种方式。也是最合理的退出方式。

中断首先要设置中断标记,比如调用 Thread.interrupt() 方法后,中断标记会设置为 true。

举个 ReentrantLock 的例子。

我们知道,ReentrantLock 内部是使用 AQS 来实现加锁的。然后有个方法 lockInterruptibly(),从字面上理解,就是在加锁的状态下,能够响应线程中断。它是如何实现的?见 AbstractQueuedSynchronizer.acquireInterruptibly

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

可以看到,这里识别到中断标记后直接抛出 InterruptedException 异常。

Thread 中两种方法来判断是否中断:

这两种方法的区别在于 interruped 会重置中断标记为 false,有清除中断标记的功能。

而 isInterrpted 仅仅只是用来测试线程是否被终止,不会清除中断标记。

比如我们上面提到的 AbstractQueuedSynchronizer.acquireInterruptibly 的方法中,调用 interrupted() 后同时清空了中断标记。

5.1.2. Worker

WorkerThreadPoolExecutor的内部类,每个 Worker 和一个工作线程绑定,提供任务执行方法和对执行状态的监控。

执行状态的监控使用了不可重入锁,后续会进行分析。

5.1.2.1. 成员变量

看 Worker 的成员变量,主要有三个:

我们可以在 ThreadPoolExecutor.getCompletedTaskCount 中看到 completedTasks 的使用:

public long getCompletedTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        long n = completedTaskCount;
        for (Worker w : workers)
            n += w.completedTasks;
            return n;
    } finally {
        mainLock.unlock();
    }
}
5.1.2.2. 不可重入锁

Worker 使用 AQS 的方法实现了简单的锁。任务执行过程中,执行前会去获取锁,执行后会去释放锁。这里的锁可以用来表示线程是否在执行任务。执行任务中的 Worker 是不可以被中断的,这也是记录正在执行的任务的线程的意义

看下 Worker 的锁操作方法:

ReentrantLock 是不是很相似?为什么不直接使用?

那是因为 不需要 ReentrantLock 的可重入特性

为了理解这个点,我们看什么时候会去中断线程。中断意味着退出线程, ThreadPoolExecutor 关闭线程的方法:

还有修改配置的方法:

这些方法都有一个共同点,会触发 ThreadPoolExecutor.interruptIdleWorkers 方法中断空闲线程。方式就是调用 tryLock 去尝试获取锁,空闲的线程是可以获取到锁的。

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

如果我们允许可重入锁的话会发生什么?

我们正在执行的任务的线程,因为可重入的特性,可以反复获取到锁。如果这时候线程调用关闭或者配置线程池的方法,触发 interruptIdleWorkers() 方法,获取锁后,把自己给中断了,线程发生异常退出

所以 Worker 这里实现的是不可重入锁 ,避免了上述情况的发送。

我们来对比一下 WorkerReentrantLock 的对是否可重入特性的具体实现。先看 ReentrantLock.tryLock()

public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

sync 就是 ReentrantLockAbstractQueuedSynchronizer 的实现。nofairTryAcquire() 方法如下:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

getState() 的值,0 表示没有锁,0 以上表示已经被锁了。然后 ReentrantLock 会继续判断持有锁的线程是否是当前线程 current == getExclusiveOwnerThread() ,如果是的话还可以继续拿到锁,并增加 state 的值,实现了可重入的特性。

再看 Worker.tryLock()

public boolean tryLock()  { return tryAcquire(1); }

也是调用 AQS 的 tryAcquire(),这里的实现很简单:

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

可以看到没有可重入的概念。

已经持有锁的线程,到新的临界区,还是要继续阻塞等待,不能马上执行。

这样也限制了线程池中的工作线程,是不允许主动调用关闭或配置线程池的方法。

5.1.3. ThreadFactory

线程工厂,用来生产线程,定义如下:

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

Worker 的构造函数里,使用了该类来创建新线程:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

对新创建线程的配置:

工具类 Executors 内置了一个线程工厂的默认实现 DefaultThreadFactory

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" + 
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

这样运行起来的线程会有这样的名字

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
...

可以看到这里的线程计数方式,来创建线程名时使用了一个 AtomicInteger 来实现序号递增的原子操作。

线程优先级为NORM_PRIORITY

5.1.4. BlockingQueue

阻塞队列,也是我们的任务队列,定义如下:

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);
    boolean offer(E e);
    void put(E e) throws InterruptedException;
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    E take() throws InterruptedException;
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
    int remainingCapacity();
    boolean remove(Object o);
    public boolean contains(Object o);
    int drainTo(Collection<? super E> c);
    int drainTo(Collection<? super E> c, int maxElements);
}

任务队列的使用是经典的生产者消费者模型:

工作线程在阻塞等待新任务的时间被称为 空闲时间

如果受到超时限制,比如非核心线程,并且 keepAliveTime 有配置,线程会调用 BlockingQueue.poll() 方法获取新任务,阻塞时间超过了 keepAliveTime 会直接返回 null。

E poll(long timeout, TimeUnit unit)

如果不受超时限制,比如核心线程,且没有设置 allowCoreThreadTimeOut = true , 会调用 BlockingQueue.take() 方法阻塞等待直到加入新任务,或者发生中断。

E take() throws InterruptedException;

阻塞队列 JDK 提供了多种阻塞队列的实现,都可以应用到线程池中。我们比较常用的有这三种:

5.1.4.1. ArrayBlockingQueue

有界队列,FIFO,内部使用数组实现。

创建时必须指定容量 capacity。它使用的数组定义如下:

final Object[] items;

内部使用了 ReentrantLockCondition 来实现生成者消费者模型。

ArrayBlockingQueue.take() 方法如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

ArrayBlockingQueue.put() 方法如下:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

ArrayBlockingQueue 读写方法都使用同一个锁 lock 来实现,所以对记录队列中任务数量的 count 不存在并发问题。

为什么没有区分读写锁,因为每次都是对整个数组操作,需要获取数组的整体状态,比如 length

线程被唤醒的时候会检查 count 判断是否队列是否又任务,没有的话调用 notEmpty.await() 继续等待。

有任务的话调用 dequeue 获取任务。

5.1.4.2. LinkedBlockingQueue

可以是有界或无界,FIFO,内部使用链表实现。每个节点定义如下:

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

LinkedBlockingQueue.take() 方法:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

LinkedBlockingQueue.put() 方法:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

也是使用 ReentrantLockCondition 来完成生产者消费者模型。

ArrayBlockingQueue 不一样,读写分别使用了两个锁来实现 takeLockputLock ,做到了读写锁的分离。

所以 count 的计数可能会被多个线程同时触发,这里使用 AtomicInteger 来实现计数的线程安全。

工具类 Executors 创建的固定线程数线程线程池 newFixedThreadPool和单线程线程池 newSingleThreadPool使用的就是无界的 LinkedBlockingQueue

5.1.4.3. SynchronousQueue

该队列没有实际大小,capacity 始终为空,作用就是做个中转,把任务从生产者转移给消费者。

SynchronousQueue.take() 方法:

public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

内部使用了 Transferer 进行任务的传输。

工具类 Executors 创建的缓存线程池 newCacheThreadPool 使用的就是该队列。

5.1.5. RejectExecutionHandler

在调用 ThreadPoolExecutor.execute() 执行新任务时,线程池已满,队列已满,或者线程池已经进入关闭阶段,会拒绝执行该任务。

然后把任务 Runnable 传递给 RejectExecutionHandler ,根据具体的拒绝策略进行处理。

看 RejectExecutionHandler 的定义:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

JDK 提供了一些实现类,实现了具体的拒绝策略。

CallerRunsPolicy ,让调用者去执行。见 CallerRunsPolicy.rejectedExecution()

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

AbortPolicy,线程池默认使用该策略,直接抛出 RejectedExecutionException 异常。见 AbortPolicy.rejectedExecution()

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

DiscardPolicy ,空实现,正如它的名字所言,任务被直接忽略。见 DiscardPolicy.rejectedExecution()

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}

DiscardOldestPolicy ,先抛弃线程池任务队列中尚未执行的任务,然后再尝试调用 execute 方法。如果线程池已经是 SHUTDOWN 状态,也不处理。见 DiscardOldestPolicy.rejectedExecution()

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

5.2. 线程池状态记录

平时涉及到状态机的状态记录,我们一般会直接用 Integer,有多少状态就有多少 Integer。

ThreadPoolExecutor 只用了一个 Integer,使用位记录状态,通过位运算来获取状态。这个在 C&C++ 中是很常见的。同时这些在 JDK 或者 Android 源码中经常可以见到。

特点是效率高、内存小但可读性差。

这里来理解一下 ThreadPoolExecutor 对线程状态的记录方式。

5.2.1. 记录控制量

在每个 ThreadPoolExecutor 实例都会持有一个成员 ctl:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

这个 ctl 是个 AtomicInteger ,使用 CAS 的方式来实现原子操作。初始化为 RUNNING 状态,Worker 数量为 0。

ctl 的 32 位被分割成两个部分

可以这样表示:

  0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
|runState|       workerCount                                                                       |
+--------+-----------------------------------------------------------------------------------------+

所以一个 AtomicInteger 记录了两种变量,有效工作线程数量和线程池状态。

runState 有 3 位可以表示 2^3 = 8 种状态。这里使用它来表示线程池生命周期中的 5 种状态:

状态 位移计算(Int) 高三位二进制表示
RUNNING -1 << COUNT_BITS 111
SHUTDOWN 0 << COUNT_BITS 000
STOP 1 << COUNT_BITS 001
TIDYING 2 << COUNT_BITS 010
TERMINATED 3 << COUNT_BITS 100

5.2.2. 计算控制量

现在来看 ThreadPoolExecutor 是如何计算 ctl 的:

5.2.2.1. ctl 的打包与拆包

打包表示把 runState 和 workerCount 合并成 ctl 的值。拆包表示分别读出这两个值。

获取 runState 的值,使用 CAPACITY 的反做掩码对 c 进行与操作获得:

private static int runStateOf(int c) {
    return c & ~CAPACITY;
}

获取 workerCount 的值,使用 CAPACITY 做掩码对 c 进行与操作获得:

private static int workerCountOf(int c) {
    return c & CAPACITY;
}

合并 runState 和 workerCount 为 c ,采用或操作:

private static int ctlOf(int rs, int wc) {
    return rs | wc;
}

所以我们对 ctl 的初始化 ctlOf(RUNNING, 0) ,就是初始化为 RUNNING,并且线程数为 0。

5.2.2.2. runState 的状态判断

因为运行状态在高 3 位,所以后面低 29 位不会影响判断。

private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
5.2.2.3. workerCount 的增减

直接对 ctl 进行加和减操作,允许多线程并发,AtomicInteger 使用 CAS 确保线程安全。

private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}

5.3. 执行任务

5.3.1. execute()

线程池执行任务的入口。任务都被封装在 Runnable 中。线程池 execute 方法的源码代码加上我理解后的注释如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    // 获取控制量的值,控制量 ctl = runState | workerCount。
    int c = ctl.get();

    // 判断工作线程数是否小于核心线程数。
    if (workerCountOf(c) < corePoolSize) {
        
        // 创建核心工作线程,当前任务作为线程第一个任务,使用 corePoolSize 作为最大线程限制。
        if (addWorker(command, true))
            return;
            
        // 创建核心工作线程失败,重新获取控制量
        c = ctl.get();
    }
        
    // 判断线程池是否是运行中的状态,是的话将线程加入任务队列
    if (isRunning(c) && workQueue.offer(command)) {

        // 再次获取控制量的值,double-check
        int recheck = ctl.get();

        // 判断线程是否已经不是运行状态了,不是运行状态尝试从任务队列中把任务移出
        if (!isRunning(recheck) && remove(command))
            
            // 移出任务成功后,使用执行拒绝策略
            reject(command);
        else if (workerCountOf(recheck) == 0)
        
            // 工作线程数量为 0,创建工作线程
            addWorker(null, false);

    // 线程不在运行状态或任务无法加入队列,创建新工作线程
    } else if (!addWorker(command, false))
        
        // 创建工作线程失败,执行拒绝策略
        reject(command);
    }

整个执行过程中,主要面临三种情况:

所以一个任务能否被线程池执行,需要考虑到多个方面的因素,比如:

所以对传入线程池的任务,根据上面的各种因素综合判断,会有三种操作:

为了便于理解,假设线程池始终是在运行状态,就会得到这样的流程图:

execute

5.3.2. addWorker()

该方法用来创建新的 Worker。内部会根据线程池的状态和工作线程数量约束来决定是否要创建。

有两个参数:

addWorker 方法可以分成两个阶段来解读。第一阶段是通过自旋和 CAS 的方式,来增加工作线程数。第二部阶段则是创建并且启动工作线程。

5.3.2.1. 阶段一:增加 workerCount

第一阶段会根据线程池的状态,来判断是否可以创建或者启动新的线程。如果可以的话,会先增加 workerCount。源码和我的根据理解添加的注释如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (; ; ) {

        // 读取线程池状态
        int c = ctl.get();
        int rs = runStateOf(c);

        /*
         * 检查线程池状态:
         * 1. 如果已经是 TIDYING 或者 TERMINATED,不创建线程,返回失败;
         * 2. 如果是 SHUTDOWN 状态,首个任务为空,任务队列不为空,会继续创建线程;
         * 3. 如果是 SHUTDOWN 状态,首个任务不为空,不创建线程,返回失败;
         * 4. 如果是 SHUTDOWN 状态,首个任务为空,任务队列为空,不创建线程,返回失败;
         *
         * 所以在 SHUTDOWN 状态下,不会再创建线程首先去运行 firstTask,只会去创建线程把任务队列没执行完的任务执行完。
         */
        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;

        // 自旋加 CAS 操作增加工作线程数
        for (; ; ) {
                
        // 获取有效工作线程数
        int wc = workerCountOf(c);

        /*
         * 检查有效工作线程数量是否达到边界:
         * 1. 如果有效工作线程数大于最大工作线程数 2^29-1,不创建线程,返回失败;
         * 2. 判断是否达到最大线程限制,core 为 true 的时候为核心线程数 corePoolSize,false 为最大线程数 maximumPoolSize。
         */
        if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
            return false;

        /*
         * 增加工作线程数,CAS 操作:
         * 增加失败说明已经有其他线程修改过了,进行自旋重试;
         * 增加成功,跳出自旋,进入下一个环节,去创建新线程。
         */
         if (compareAndIncrementWorkerCount(c))
            break retry;

        c = ctl.get();

        // 重新获取状态,如果运行状态改变了,从 retry 段重新开始
        if (runStateOf(c) != rs)
            continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    ...
}

addWorker 来创建线程,在不同状态下的执行情况如下:

如果添加新线程,通过 CAS 加自旋的方式,增加 workerCount 的值。失败的话说明这段时间内线程池发生了新变化,还会从头再来一次。

for (;;) {
  ...
  if (compareAndIncrementWorkerCount) {
    break retry;
  }
  ...
}
5.3.2.2. 阶段二:创建并运行新线程

第一阶段增加 workerCount,这里进入第二阶段,创建并且启动工作线程。源码和注释如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    ...
       
    // 标记值,表示已经启动 Worker
    boolean workerStarted = false;
    
    // 标记值,表示添加 Worker
    boolean workerAdded = false;
    Worker w = null;
    try {

        // 创建新的 worker,并且传入 firstTask
        w = new Worker(firstTask);
        final Thread t = w.thread;
        
        // 判断线程是否创建成功
        if (t != null) {
            
            // 接下来要访问工作线程队列,它是 HashSet 类型,非线程安全集合需要加锁访问
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 再次读取线程池状态
                int rs = runStateOf(ctl.get());

                /*
                 * 再次判断线程池状态是否满足下面的条件之一:
                 * 1. 处于 RUNNING 状态
                 * 2. 处于 SHUTDOWN 状态,但是首任务为空。这里开线程来跑任务队列的剩余任务。
                 */
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    
                    /*
                     * 判断新创建的线程状态,如果线程已经是启动后的状态,就无法再次启动执行任务;
                     * 这个是个不可恢复的异常,会抛出 IllegalThreadStateException 异常。
                     */
                     if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();

                        // 加入队列
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                
                // 根据标记值得知线程已经成功创建,启动线程,更新标记 workerStarted 表示线程已经启动。
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
        
            /*
             * 有这种情况,线程没有启动
             * 1. ThreadFactory 返回的线程为 null
             * 2. 线程池进入了 SHUTDOWN 状态,并且传入的任务不为空。
             * 这是因为这段代码执行期间线程状态发生了改变,比如 RUNNING 的时候进来,
             * 准备创建核心线程的时候,线程池被关闭了,这个任务就不会执行。
             * 所以即使是在创建核心线程的时候调用了 shutdown,任务也是不执行的。
             * 3. ThreadFactory 返回的线程已经被启动了,抛出 IllegalThreadStateException 异常
             */
            if (!workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

在调用调用 t.start() 启动线程时,会调用 Worker.run 方法。这是因为创建 Worker实例的时候,会把 Worker(也实现了 Runnable)传入 Thread

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

所以 t.start() 后执行:

/**
 * Delegates main run loop to outer runWorker.
 */
public void run() {
    runWorker(this);
}

调用了ThreadPoolExecutor.runWorker()ThreadPoolExecutor.runWorker() 会从任务队列中取任务给工作线程执行。

如果创建线程失败了,会执行 ThreadPoolExecutor.addWorkerFailed 方法:

5.3.3. addWorkerFailed()

做一些线程创建失败的善后工作。

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    if (w != null)
        workers.remove(w);
    decrementWorkerCount();
    tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

从队列中移出 Worker ,调用 ThreadPoolExecutor.decrementWorkerCount() 恢复 workerCount ,这里还是使用自旋锁加 CAS 的方式:

private void decrementWorkerCount() {
    do {
    } while (!compareAndDecrementWorkerCount(ctl.get()));
}

然后尝试进入到 TERMINATED 状态。如果线程池正在关闭,这个 Worker 可能会停止了线程池的终止过程,所以这里尝试再调用tryTerminate() 重新尝试终止。

5.3.4. runWorker()

addWorker() 成功创建并启动后,会执行 ThreadPoolExecutor.runWorker() 方法。

runWorker() 会启动一个无限循环,线程不断地从任务队列中取任务执行。如果线程被中断结束或者因为异常结束后,还调会用了 processWorkerExit 处理线程退出的一些逻辑。代码如下:

final void runWorker(Worker w) {
    
    // 获取线程
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    
    // 解锁 Worker,表示没有执行任务,空闲中可以被中断
    w.unlock();

    // 标记变量,记录是否因为异常终止
    boolean completedAbruptly = true;
    try {
        
        /* 循环获取任务:
         * 1. 有传入任务且还没有执行,先执行;
         * 2. 从任务队列获取任务执行。
         */
        while (task != null || (task = getTask()) != null) {

            // 加锁 Worker,表示该线程已经在执行任务了。
            w.lock();

            /*
             * 这里中断标记的处理:
             * 1. STOP 状态,设置中断。
             * 2. 不是 STOP 状态,先调用 interrupted 清除中断标记。
             * 3. 清除前如果不是中断状态,不设置中断。清除前是中断状态,有可能在这段时间内,线程池可能调用了 shutdownNow 方法,所以再判断一下运行状态。如果这时候是 STOP 状态,并且之前设置的中断已经清了,这时候要恢复设置中断。 
             */
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
                
            try {
                
                // 钩子方法,执行任务前
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    
                    // 运行任务,
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    // 钩子方法,执行任务后。因为放在 finally 块中,出现异常也会执行该钩子方法。
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                
                // 解锁 Worker,表示任务执行完
                w.unlock();
            }
        }
        // 异常终止标记位设为 false,表示执行期间没有因为异常终止
        completedAbruptly = false;
    } finally {
        
        // 线程退出处理
        processWorkerExit(w, completedAbruptly);
    }
}

线程池提供了任务处理前和处理后的钩子方法:

需要监控线程池执行,或者需要在执行前或执行后做一些特殊处理的的,继承 ThreadPoolExecutor 然后实现这两个方法即可。

5.3.5. getTask()

从任务队列中取出任务,包含空闲线程的超时处理:

private Runnable getTask() {

    // 标记变量,标记 poll 方法是否超时
    boolean timedOut = false;

    for (; ; ) {
    
        // 读取线程池状态
        int c = ctl.get();
        int rs = runStateOf(c);

        /*
         * 1. 如果是 RUNNING 状态,进入下一步;
         * 1. 如果是 SHUTDOWN 状态,并且任务队列为空,返回 null,减少 workerCount;
         * 2. 如果是 STOP,TIDYING 或者 TERMINATED 状态,直接返回 false,减少 workerCount。
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        // 获取有效线程数
        int wc = workerCountOf(c);

        /*
         * 标记变量,表示当前 worker 是否要超时退出
         * 1. allowCoreThreadTimeOut 设置 true 的话,所有的线程超时都要退出;
         * 2. 否则,只有当有效线程数大于核心线程数,需要减少线程池的数量,要设置超时退出。
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 根据是否超时、线程数大小、任务队列大小的情况,判断是否要退出
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            
            // 如果可以减少 workerCount,返回 null,否则进入自旋,进入下一个循环。
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            /*
             * 如果要超时退出,当前使用 poll 方法,并设置了超时时间,超时后会退出
             * 如果不需要设置超时,使用 take 方法,一直阻塞直到队列中有任务
             */
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

runWorker() 中,如果传入要执行的任务,是不会调用 getTask() 的。

如果线程取不到任务,返回 null 的同时,会调用 decrementWorkerCount 减少有效工作线程数量 workerCount。

什么时候会取不到任务返回 null 让线程关闭呢?作者的方法注释中总结如下:

我们从上面的 runWorker() 解析可以看到,当 getTask() 返回 null 的话,runWorker() 的死循环会被打破,然后进入线程退出处理方法 processWorkerExit

所以线程进入关闭流程的重要条件,就是 getTask() 返回了 null。

5.3.6. processWorkerExit()

如何处理工作线程的退出?会调用 ThreadPoolExecutor.processWorkerExit。这个方法有两个参数:

代码如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        
    // 如果是异常退出的话,恢复 workerCount
    if (completedAbruptly)
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        
        // 把之前 Worker 的完成任务数收集起来,然后从工作线程队列中移除。
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        /*
         * 如果还在 RUNNING 或者 SHUTDOWN 状态,这里还处理
         * 1. 如果是异常退出,再启动一个新的线程替换。
         * 2. 如果不是异常退出,确定一个最少存在的线程数:
         * 如果设置了 allowCoreThreadTimeout 的话,并且任务队列还有值,min = 1,
         * 如果设置了 allowCoreThreadTimeout 的话,min = corePoolSize,
         * 然后如果当前工作线程数比 min 小的话,会再启动一个新线程替换。
         */
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

主要如下:

5.4. 获取线程池运行情况

线程池启动后,可以通过一些接口获取整体运行情况。

5.5. 关闭线程池

现在来看看如何关闭线程池。

5.5.1. shutdown()

该方法会触发线程池关闭流程,但不会马上关闭。

什么时候才能知道线程池已经执行完关闭流程?需要使用 awaitTermination 去判断。见 ThreadPoolExecutor.shutdown()

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

执行 checkShutdownAccess(),如果没有权限的话会抛出 SecurityException。

调用 advanceRunState() 改变线程池状态为 SHUTDOWN。内部使用 CAS 加自旋。

调用 interruptIdleWorkers() 方法去中断空闲的线程。

然后留了一个钩子方法 onShutDown(),如果有需要的话,可以继承 ThreadPoolExecutor 实现该回调,做一些关闭的其他处理。

最后会调用 tryTerminate() 方法,尝试终止线程池。如果任务队列还有任务没有执行完,这个尝试是不成功的。

5.5.2. shutdownNow()

这个方法和 shutdown() 类似,但会去关闭所有线程,不会再去执行任务队列中的未执行的任务,把这些未执行的队列返回。

ThreadPoolExecutor.shutdownNow()

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

可以看到,和 shutdown() 流程基本相同。状态修改为 STOP

我们之前线程取任务执行的 ThreadPoolExecutor.getTask() 方法中,有对 STOP 状态的判断:

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    decrementWorkerCount();
    return null;
}

可以看到,只要是大于等于 STOP 状态,直接返回 null,触发线程关闭。

如果有一些线程不是空闲线程,比如处于 sleep() 或者 wait() 的线程,会去中断这些线程。所以设计运行任务 Runnable 的时候,有 sleep 或者 wait 操作,或者内部有一个循环,需要响应中断并进行退出处理。在 runWorker() 可以看到中断这些线程的逻辑:

if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
    wt.interrupt();

调用 drainQueue() 方法把未执行的任务取出,会返回给调用者。

调用 tryTerminate() 方法,尝试终止线程池。

5.5.3. tryTerminate()

前面分析的 addWorkerFailed()processWorkerExit()shutdown()shutdownNow() 都有调用 tryTerminate() 。源码如下:

final void tryTerminate() {
    for (; ; ) {
        int c = ctl.get();
        if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

通过该方法,让线程池进入到 TIDYINGTERMINATED

当然,这里只是尝试,对照代码我们可以知道,这些情况下不会直接退出尝试。

如果上面的条件都通过了,要进入 TIDYINGTERMINATED 状态,必须工作线程都关闭,workerCount 为 0。还有线程未关闭,调用 interruptIdleWorkers(ONLY_ONE) 去关闭。

if (workerCountOf(c) != 0) { // Eligible to terminate
    interruptIdleWorkers(ONLY_ONE);
    return;
}

线程关闭完毕,状态切换为 TIDYING,线程池会再调用一次钩子方法 terminated()

最后直接设置状态为 TERMINATED

上一篇下一篇

猜你喜欢

热点阅读