Code多线程编程Java学习笔记

Java ThreadPoolExecutor 线程池源码分析

2017-03-14  本文已影响552人  永顺

线程池常见实现

线程池一般包含三个主要部分:

调度器并非是必须的, 例如 Java 中实现的 ThreadPoolExecutor 就没有调度器, 而是所有的线程都不断从任务队列中取出任务, 然后执行.
线程池模型可以用下图简单地表示:

线程池实现套路.png

线程池基本概念

线程池大小

ThreadPoolExecutor 有两个参数用于控制线程池中线程的个数: corePoolSizemaximumPoolSize, 根据这两个参数, ThreadPoolExecutor 会自适应地调整线程个数, 以适应不同的任务数.
当我们通过 execute(Runnable) 提交一个任务时:

  1. 如果此时线程池中线程个数小于 corePoolSize, 则此任务不会插入到任务队列中, 而是直接创建一个新的线程来执行此任务, 即使当前线程池中有空闲的线程.
  2. 如果线程数大于 corePoolSize 但是小于 maximumPoolSize:
  1. 如果线程数等于 maximumPoolSize:

线程的 Keep-Alive 时间

在创建一个线程池时, 我们可以指定线程池中的线程的最大空闲(Idle)时间, 线程池会根据我们设置的这个值来动态的减少不必要的线程, 释放系统资源.
当我们的线程池中的线程数大于 corePoolSize 时, 如果此时有线程处于空闲(Idle)状态超过指定的时间(keepAliveTime), 那么线程池会将此线程销毁.

工作队列

工作队列(WorkQueue) 是 一个 BlockingQueue, 它时用于存放那些已经提交的, 但是还没有空余线程来执行的任务. 例如我们在前面 线程池大小 一节中讨论的情况, 如果当前的线程数大于 corePoolSize 并且工作队列的还有剩余空间, 那么新提交的任务就会先放到工作队列中.

根据 Java Docs, 有三种常见的工作队列的使用场景:

任务提交失败处理

因为线程池中维护有一个工作队列, 我们自然地会想到, 当线程池中的工作队列满了, 不能再添加新的任务了, 此时线程池会怎么处理呢?
一般来说, 当我们提交一个任务到线程池中, 如果此时线程池不能再添加任务了, 那么通常会返回一个错误, 或者是调用我们预先设置的一个错误处理 handler, 例如在 Java ThreadPoolExecutor 中, 我们可以通过如下方式实例化一个带有任务提交失败 handler 的线程池:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 100, TimeUnit.SECONDS,
        new LinkedBlockingDeque<>(1),
        new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("Task " + r.toString() + " failed!");
    }
});

活跃线程数与线程池状态

ThreadPoolExecutor 中有一个名为 ctl 的字段, 它是一个 AtomicInteger 类型, ThreadPoolExecutor 复用了此字段来表示两个信息:

ctl 是一个 AtomicInteger 类型, 它的 低29位 用于存放当前的线程数, 因此一个线程池在理论上最大的线程数是 536870911; 高 3 位是用于表示当前线程池的状态, 其中高三位的值和状态对应如下:

线程池的基本使用

创建线程池

前面我们提到, 一个线程池中有 corePoolSize, maximumPoolSize, keepAliveTime, workQueue 之类的概念, 这些属性我们必须在实例化线程池时通过构造器传入. Java 线程池实现类 ThreadPoolExecutor 中提供了不少构造方法, 我们来看一下其中两个常用的构造器:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

可以看到, 在实例化一个 ThreadPoolExecutor 线程池时, 我们需要指定一些线程池的基本属性, 并且可选地, 我们还可以指定当任务提交失败时的处理 handler.

例如我们可以通过如下方式实例化一个带有任务提交失败 handler 的线程池:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 100, TimeUnit.SECONDS,
        new LinkedBlockingDeque<>(1),
        new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("Task " + r.toString() + " failed!");
    }
});

当然, 除了上述使用构造器来直接创建线程池, Java 还提供了几个简便地创建线程池的方法:

例如我们想创建一个有五个线程的线程池, 那么可以调用 Executors.newFixedThreadPool, 这个方法等效于:

new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>())

提交任务

提交任务到线程池中比较简单, 如果是 ThreadPoolExecutor 类型的线程池, 我们直接调用它的 execute 方法即可, 例如:

ExecutorService executorService = ...
executorService.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("OK, thread name: " + Thread.currentThread().getName());
    }
});

如果我们获取到一个 ScheduledThreadPoolExecutor 类型的线程池, 那么除了调用 execute 方法外, 我们还可以通过调用 schedule 方法提交一个定时任务, 例如:

ScheduledExecutorService executorService = xxx
executorService.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("OK, thread name: " + Thread.currentThread().getName());
    }
}, 1, TimeUnit.SECONDS);

上面代代码就会在1秒后执行我们的定时任务.

关闭线程池

Java 线程池提供了两个方法用于关闭一个线程池, 一个是 shutdownNow(), 另一个是 shutdown(). 我们可以看一下这两个方法的签名:

void shutdown();
List<Runnable> shutdownNow();

这两个方法除了名字不一样外(废话), 它们的返回值也不太一样.
那么这两个方法到底有什么区别呢? 它们的区别有:

废话了一大堆, 我们来看一下具体的例子吧:

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("OK, thread name: " + Thread.currentThread().getName());
    }
}, 1, TimeUnit.SECONDS);

// 调用此方法关闭线程时, 我们提交的定时任务不会被执行
// executorService.shutdownNow();

executorService.shutdown();

可以看到, 如果我们调用的是 executorService.shutdownNow(), 那么原先提交的未执行的定时任务并不会再被执行, 但是如果我们调用的是 executorService.shutdown(), 那么此调用会阻塞住, 直到所有提交的任务都执行完毕才会返回.

代码分析

线程池的属性字段

在开始深入了解 ThreadPoolExecutor 代码之前, 我们先来简单地看一下 ThreadPoolExecutor 类中到底有哪些重要的字段.

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 这个是一个复用字段, 它复用地表示了当前线程池的状态, 当前线程数信息.
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    // 用于存放提交到线程池中, 但是还未执行的那些任务.
    private final BlockingQueue<Runnable> workQueue;

    // 线程池内部锁, 对线程池内部操作加锁, 防止竞态条件
    private final ReentrantLock mainLock = new ReentrantLock();

    // 一个 Set 结构, 包含了当前线程池中的所有工作线程.
    // 对 workers 字段的操作前, 需要获取到这个锁.
    private final HashSet<Worker> workers = new HashSet<Worker>();

    // 条件变量, 用于支持 awaitTermination 操作
    private final Condition termination = mainLock.newCondition();

    // 记录线程池中曾经到达过的最大的线程数.
    // 这个字段在获取 mainLock 锁的前提下才能操作.
    private int largestPoolSize;

    // 记录已经完成的任务数. 仅仅当工作线程结束时才更新此字段.
    // 这个字段在获取 mainLock 锁的前提下才能操作.
    private long completedTaskCount;

    // 线程工厂. 当需要一个新的线程时, 这里生成.
    private volatile ThreadFactory threadFactory;

    // 任务提交失败后的处理 handler
    private volatile RejectedExecutionHandler handler;

    // 空闲线程的等待任务时间, 以纳秒为单位.
    // 当当前线程池中的线程数大于 corePoolSize 时, 
    // 或者 allowCoreThreadTimeOut 为真时, 线程才有 idle 等待超时时间, 
    // 如果超时则此线程会停止.; 
    // 反之线程会一直等待新任务到来.
    private volatile long keepAliveTime;

    // 默认为 false.
    // 当为 false 时, keepAliveTime 不起作用, 线程池中的 core 线程会一直存活, 
    // 即使这些线程是 idle 状态.
    // 当为 true 时, core 线程使用 keepAliveTime 作为 idle 超时
    // 时间来等待新的任务.
    private volatile boolean allowCoreThreadTimeOut;

    // 核心线程数.
    private volatile int corePoolSize;

    // 最大线程数.
    private volatile int maximumPoolSize;
}

ThreadPoolExecutor 中, 使用到 ctl 这个字段来维护线程池中当前线程数和线程池的状态. ctl 是一个 AtomicInteger 类型, 它的 低29位 用于存放当前的线程数, 因此一个线程池在理论上最大的线程数是 536870911; 高 3 位是用于表示当前线程池的状态, 其中高三位的值和状态对应如下:

提交任务到线程池

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
   
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

上面的代码有三个步骤, 首先第一步是检查当前线程池的线程数是否小于 corePoolSize, 如果小于, 那么由我们前面提到的规则, 线程池会创建一个新的线程来执行此任务, 因此在第一个 if 语句中, 会调用 addWorker(command, true) 来创建一个新 Worker 线程, 并执行此任务. addWorker 的第二个参数是一个 boolean 类型的, 它的作用是用于标识是否需要使用 corePoolSize 字段, 如果它为真, 则添加新任务时, 需要考虑到 corePoolSize 字段的影响. 这里至于 addWorker 内部的实现细节我们暂且不管, 先把整个提交任务的大体脉络理清了再说.

如果前面的判断不满足, 那么会将此任务插入到工作队列中, 即 workQueue.offer(command). 当然, 为了健壮性考虑, 当插入到 workQueue 后, 我们还需要再次检查一下此时线程池是否还是 RUNNING 状态, 如果不是的话就会将原来插入队列中的那个任务删除, 然后调用 reject 方法拒绝此任务的提交; 接着考虑到在我们插入任务到 workQueue 中的同时, 如果此时线程池中的线程都执行完毕并终止了, 在这样的情况下刚刚插入到 workQueue 中的任务就永远不会得到执行了. 为了避免这样的情况, 因此我们由再次检查一下线程池中的线程数, 如果为零, 则调用 addWorker(null, false) 来添加一个线程.
如果前面所分析的情况都不满足, 那么就会进入到第三个 if 判断, 在这里会调用 addWorker(command, false) 来将此任务提交到线程池中. 注意到这个方法的第二个参数是 false, 表示我们在此次调用 addWorker 时, 不考虑 corePoolSize 的影响, 即忽略 corePoolSize 字段.

关于 addWorker 方法

前面我们大体分析了一下 execute 提交任务的流程, 不过省略了一个关键步骤, 即 addWorker 方法. 现在我们就来揭开它的神秘面纱吧.
首先看一下 addWorker 方法的签名:

private boolean addWorker(Runnable firstTask, boolean core)

这个方法接收两个参数, 第一个是一个 Runnable 类型的, 一般来说是我们调用 execute 方法所传输的参数, 不过也有可能是 null 值, 这样的情况我们在前面一小节中也见到过.
那么第二个参数是做什么的呢? 第二个参数是一个 boolean 类型的变量, 它的作用是标识是否使用 corePoolSize 属性. 我们知道, ThreadPoolExecutor 中, 有一个 corePoolSize 属性, 用于动态调整线程池中的核心线程数. 那么当 core 这个参数是 true 时, 则表示在添加新任务时, 需要考虑到 corePoolSzie 的影响(例如如果此时线程数已经大于 corePoolSize 了, 那么就不能再添加新线程了); 当 core 为 false 时, 就不考虑 corePoolSize 的影响(其实代码中是以 maximumPoolSize 作为 corePoolSize 来做判断条件的), 一有新任务, 就对应地生成一个新的线程.
说了这么多, 还不如来看一下 addWorker 的源码吧:

private boolean addWorker(Runnable firstTask, boolean core) {
    // 这里一大段的 for 语句, 其实就是判断和处理 core 参数的.
    // 当经过判断, 如果当前的线程大于 corePoolSize 或 maximumPoolSize 时(根据 core 的值来判断), 
    // 则表示不能新建新的 Worker 线程, 此时返回 false.
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 当 core 为真, 那么就判断当前线程是否大于 corePoolSize
            // 当 core 为假, 那么就判断当前线程数是否大于 maximumPoolSize
            // 这里的 for 循环是一个自旋CAS(CompareAndSwap)操作, 用于确保多线程环境下的正确性
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : ma))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

首先在 addWorker 的一开始, 有一个 for 循环, 用于判断当前是否可以添加新的 Worker 线程. 它的逻辑如下:

如果条件符合, 那么在 for 循环内, 又有一个自旋CAS 更新逻辑, 用于递增当前的线程数, 即 compareAndIncrementWorkerCount(c), 这个方法会原子地更新 ctl 的值, 将当前线程数的值递增一.

addWorker 接下来有一个 try...finally 语句块, 这里就是实际上的创建线程、启动线程、添加线程到线程池中的工作了.
首先可以看到 w = new Worker(firstTask); 这里是实例化一个 Worker 对象, 这个类其实就是 ThreadPoolExecutor 中对工作线程的封装. Worker 类继承于 AbstractQueuedSynchronizer 并实现了 Runnable 接口, 我们来看一下它的构造器:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

它会把我们提交的任务(firstTask) 设置为自己的内部属性 firstTask, 然后呢, 使用 ThreadPoolExecutor 中的 threadFactory 来创建一个新的线程, 并保存在 thread 字段中, 而且注意到, 创建线程时, 我们传递给新线程城的 Runnable 其实是 Worker 对象本身(this), 因此当这个线程启动时, 实际上运行的是 Worker.run() 中的代码.

回过头来再看一下 addWorker 方法. 当创建好 Worker 线程后, 就会将这个 worker 线程存放在 workers 这个 HashSet<Worker> 类型的字段中. 而且注意到, 正如我们在前面所提到的, mainLock 是 ThreadPoolExecutor 的内部锁, 我们对 ThreadPoolExecutor 中的字段进行操作时, 为了保证线程安全, 因此都需要在获取到 mainLock 的前提下才能操作的.

最后别忘啦, 新建了一个线程后, 需要调用它的 start() 方法后, 这个线程才真正地运行, 因此我们可以看到, 在 addWorker 方法的最后, 调用了 t.start(); 来启动这个新建的线程.

任务的分配与调度

我们已经分析了工作线程的创建和任务插入到 wokerQuque 的过程, 那么根据本文最开头的线程池工作模型可知, 光有工作线程和工作队列还不行啊, 还需要有一个调度器, 把任务和工作线程关联起来才是一个真正的线程池.

在 ThreadPoolExecutor 中, 调度器的实现很简单, 其实就是每个工作线程在执行完一个任务后, 会再次中 workQueue 中拿出下一个任务, 如果获取到了任务, 那么就再次执行.
我们来看一下具体的代码实现吧.
在前面一小节中, 我们讲到 addWorker 中会新建一个 Worker 对象来代表一个 worker 线程, 接着会调用线程的 start() 来启动这个线程, 我们也提到了当启动这个线程后, 会运行到 Worker 中的 run 方法, 那么这里我们就来看一下 Worker.run有什么玄机吧:

public void run() {
    runWorker(this);
}

Worker.run 方法很简单, 只是调用了 ThreadPoolExecutor.runWorker 方法而已.
runWorker 方法比较关键, 它是整个线程池任务分配的核心:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker 方法是整个工作线程的核心循环, 在这个循环中, 工作线程会不断的从 workerQuque 中获取新的 task, 然后执行它.
我们注意到在 runWorker 一开始, 有一个 w.unlock();, 咦, 这是为什么呢? 其实这是 Worker 类玩的一个小把戏. 回想一下, Worker 类继承于 AbstractQueuedSynchronizer 并实现了 Runnable 接口, 它的构造器如下:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

setState(-1); 方法是 AbstractQueuedSynchronizer 提供的, 初始化 Worker 时, 会先设置 state 为 -1, 根据注释, 这样做的原因是为了抑制工作线程的 interrupt 信号, 直到此工作线程正是开始执行 task. 那么在 addWorker 中的 w.unlock(); 就是允许 Worker 的 interrupt 信号.
接着在 addWorker 中会进入一个 while 循环, 在这里此工作线程会不断地从 workQueue 中取出一个任务, 然后调用 task.run() 来执行这个任务, 因此就执行到了用户所提交的 Runnable 中的 run() 方法了.

工作线程的 idle 超时处理

工作线程的 idle 超出处理在底层依赖于 BlockingQueue 带超时的 poll 方法, 即工作线程会不断地从 workQueue 这个 BlockingQueue 中获取任务, 如果 allowCoreThreadTimeOut 字段为 true, 或者当前的工作线程数大于 corePoolSize, 那么线程的 idle 超时机制就生效了, 此时工作线程会以带超时的 poll 方式从 workQueue 中获取任务. 当超时了还没有获取到任务, 那么我们就知道此线程一个到达 idle 超时时间, 因此终止此工作线程.
具体源码如下:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

从源码中就可以看到, 一开始会判断当前的线程池状态, 如果不是 SHUTDOWNSTOP 之类的状态, 那么接着获取当前的工作线程数, 然后判断工作线程数量是否已经大于了 corePoolSize. 当 allowCoreThreadTimeOut 字段为 true, 或者当前的工作线程数大于 corePoolSize, 那么线程的 idle 超时机制就生效, 此时工作线程会以带超时的 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方式从 workQueue 中获取任务; 反之会以 workQueue.take() 方式阻塞等待任务, 直到获取一个新的任务.
当从 workQueue 获取新任务超时时, 那么就会调用 compareAndDecrementWorkerCount 将当前的工作线程数减一, 并返回 null. getTask 方法返回 null 后, 那么 runWorker 中的 while 循环自然也就结束了, 因此也导致了 runWorker 方法的返回, 最后自然整个工作线程的 run() 方法执行完毕, 工作线程自然就终止了.

上一篇下一篇

猜你喜欢

热点阅读