JavaJavaConcurrent

ThreadPoolExecutor(2) —— 线程池源码分析

2019-11-30  本文已影响0人  若琳丶

一、前言

分析源码就像旦总的生活,朴实无华且枯燥。
                                                                                   —— 若琳

分析源码虽然是枯燥的,但是我们带着一颗求知的好奇心,是不会感到特别枯燥。就像我们去慢慢挖掘底层的真相,会别有一番收获,别有一番欣喜。

二、ThreadPoolExecutor 的 execute

在向线程池提交任务时,会通过两个方法:execute和submit。
本文着重讲解execute方法。submit方法放在下次和Future、Callable一起分析。

  public void execute(Runnable command) {
      if (command == null)
          throw new NullPointerException();
      // clt记录着runState和workerCount
      int c = ctl.get();
      //workerCountOf方法取出低29位的值,表示当前活动的线程数
      //然后拿线程数和 核心线程数做比较
      if (workerCountOf(c) < corePoolSize) {
          // 如果活动线程数<核心线程数,则添加一个Worker
         //addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断
         if (addWorker(command, true))
             // 如果成功则返回
             return;
         // 如果失败则重新获取 runState和 workerCount
         c = ctl.get();
     }
     // 如果当前线程池是运行状态并且任务添加到队列成功
     // 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。
     // 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
     if (isRunning(c) && workQueue.offer(command)) {
         // 重新获取 runState和 workerCount
         int recheck = ctl.get();
         // 如果不是运行状态并且移除任务失败
         if (!isRunning(recheck) && remove(command))
             reject(command);
         else if (workerCountOf(recheck) == 0)
             //第一个参数为null,表示在线程池中创建一个线程,但不去启动
             // 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize
             addWorker(null, false);
     }
     //再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize
     else if (!addWorker(command, false))
         //如果失败则拒绝该任务
         reject(command);
 }

总结一下它的工作流程:

  1. 当workerCount < corePoolSize,创建线程执行任务。
  2. 当workerCount >= corePoolSize&&阻塞队列workQueue未满,把新的任务放入阻塞队列。
  3. 当workQueue已满,并且workerCount >= corePoolSize,并且workerCount < maximumPoolSize,创建线程执行任务。
  4. 当workQueue已满,workerCount >= maximumPoolSize,采取拒绝策略,默认拒绝策略是直接抛异常。

对照上一篇文章的图来理解:


线程池执行

三、addWorker方法

该方法的核心作用是 —— 主要工作是在线程池中创建一个新的线程并执行

参数定义
firstTask: the task the new thread should run first (or null if none). (指定新增线程执行的第一个任务或者不执行任务)
core: if true use corePoolSize as bound, else maximumPoolSize.(core如果为true则使用corePoolSize绑定,否则为maximumPoolSize。 (此处使用布尔指示符而不是值,以确保在检查其他状态后读取新值))

3.1 源码

runWorker 方法可以分为两部分。

第一部分 —— 是线程池安全的(通过CAS保证线程安全)增加工作线程数
 private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 外层自旋
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价
        // (rs > SHUTDOWN) || 
        // (rs == SHUTDOWN && firstTask != null) || 
        // (rs == SHUTDOWN && workQueue.isEmpty())
        // 1. 线程池状态大于SHUTDOWN时,直接返回false
        // 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
        // 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        // 内层自旋
        for (;;) {
            int wc = workerCountOf(c);
            // worker数量超过容量,直接返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 使用CAS的方式增加worker数量。
            // 若增加成功,则直接跳出外层循环进入到第二部分
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            // 线程池状态发生变化,对外层循环进行自旋
            if (runStateOf(c) != rs)
                continue retry;
            // 其他情况,直接内层循环进行自旋即可
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // ...
}
第二部分 —— 是线程池安全的(通过ReetrantLock)创建并添加Worker实例,然后启动Worker去工作
private boolean addWorker(Runnable firstTask, boolean core) {
    // ...
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // worker的添加必须是串行的,因此需要加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 这儿需要重新检查线程池状态
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // worker已经调用过了start()方法,则不再创建worker
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // worker创建并添加到workers成功
                    workers.add(w);
                    // 更新`largestPoolSize`变量
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 启动worker线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

这里做一下插曲:Worker 我们可以把它简单理解为执行任务的工人,它本身也持有线程对象,我们之后再细讲它。

 private final class Worker
       extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;

3.2 addWorker 方法中的几个关键节点

3.2.1 线程池内是如何保存线程的
  1. 在 addWorker 方法第二部分的第 7 行代码,先是创建了一个 Worker,并把 Runnable 对象交给它。
  2. 如果状态条件正确,则在第二部分的第 26 行将当前 Worker 添加到线程池的线程集合中,也就是 workers(HashSet<Worker>)
  3. 成功添加 Worker 之后,在第二部分的第 38 行启动 Worker 中的 thread(为什么这里需要启动 Worker 中的 thread,后面的文章再讲)

3.2.2 for 循环

方法中有两个 for 循环,唯一能继续进行下去的出口就是第一部分的第 32 行,成功增加了线程池内工作线程数。

四、总结

这里大体写了一下线程池中上层调度的源码,有些地方写的不是特别详细,接下来会慢慢进行完善。
如果有哪些地方理解的不对,或者缺漏,还望留言,大家一起交流,一起学习,一起进步。

参考:
https://www.jianshu.com/p/389b58856894
https://www.cnblogs.com/javazhiyin/p/10605511.html

下一节:ThreadPoolExecutor(三) —— 干活的人 Worker

上一篇 下一篇

猜你喜欢

热点阅读