ThreadPoolExecutor实现原理

2020-03-24  本文已影响0人  刘文琼

转载自:https://blog.csdn.net/qq_34436819/article/details/103137347?fps=1&locationNum=2

前言

无论是在工作中,还是在书本中,我们都可以听到或者看到关于线程在使用时的一些建议:不要在代码中自己直接创建线程,而是通过线程池的方式来使用线程。使用线程池的理由大致可以总结为以下几点:

  1. 降低资源消耗。线程是操作系统十分宝贵的资源,当多个人同时开发一个项目时,在互不知情的情况下,都自己在代码中创建了线程,这样就会导致线程数过多,而且线程的创建和销毁,在操作系统层面,需要由用户态切换到内核态,这是一个费时费力的过程。而使用线程池可以避免频繁的创建线程和销毁线程,线程池中线程可以重复使用。
  2. 提高响应速度。当请求到达时,由于线程池中的线程已经创建好了,使用线程池,可以省去线程创建的这段时间。
  3. 提高线程的可管理性。线程是稀缺资源,当创建过多的线程时,会造成系统性能的下降,而使用线程池,可以对线程进行统一分配、调优和监控。

线程池的使用十分简单,但是会用不代表用得好。在面试中,基本不会问线程池应该怎么用,而是问线程池在使用不当时会造成哪些问题,实际上就是考察线程池的实现原理。因此搞明白线程池的实现原理是很有必要的一件事,不仅仅对面试会有帮助,也会让我们在平时工作中避过好多坑。

实现原理

在线程池中存在几个概念:核心线程数、最大线程数、任务队列。核心线程数指的是线程池的基本大小;最大线程数指的是,同一时刻线程池中线程的数量最大不能超过该值;任务队列是当任务较多时,线程池中线程的数量已经达到了核心线程数,这时候就是用任务队列来存储我们提交的任务。
与其他池化技术不同的是,线程池是基于生产者-消费者模式来实现的,任务的提交方是生产者,线程池是消费者。当我们需要执行某个任务时,只需要把任务扔到线程池中即可。线程池中执行任务的流程如下图如下:


  1. 先判断线程池中线程的数量是否超过核心线程数,如果没有超过核心线程数,就创建新的线程去执行任务;如果超过了核心线程数,就进入到下面流程。
  2. 判断任务队列是否已经满了,如果没有满,就将任务添加到任务队列中;如果已经满了,就进入到下面的流程。
  3. 再判断如果创建一个线程后,线程数是否会超过最大线程数,如果不会超过最大线程数,就创建一个新的线程来执行任务;如果会,则进入到下面的流程。
  4. 执行拒绝策略。

在没看线程池的具体实现之前,我一直存在这样的疑惑:为什么是先判断任务队列有没有满,再判断有没有超过最大线程数?正常逻辑不是应该先尽可能的创建线程,让线程去处理任务吗?当任务实在是太多了,线程处理不过来了,再将任务添加到任务队列吗?知道我看了线程池的具体代码实现后,我才知道答案。问题答案在文末。

ThreadPoolExecutor

在JUC包下,已经提供了线程池的具体的实现:ThreadPoolExecutor。ThreadPoolExecutor提供了很多的构造方法,其中最复杂的构造方法有7个参数。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
}

在使用ThreadPoolExecutor时,可以使用execute(Runnable task)方法向线程池中提交一个任务,没有返回值。也可以使用submit()方法向线程池中添加任务,有返回值,返回值对象是Future。submit()方法有三个重载的方法。

源码分析

了解了ThreadPoolExecutor的基本用法,下面将结合源码来分析下线程池的代码实现。从上面的线程池的原理中,我们可以发现,线程池的原理相对比较简单,代码实现起来应该不难,看源码主要是为了学习他人写的优秀代码,尤其是编程大师Doug Lea写的代码。

对于一个线程池,除了上面介绍的几个重要属性以外,我们还需要一个变量来表示线程池状态,线程池也需要有运行中、关闭中、已关闭等状态。如果要我们去实现一个线程池,可能第一反应就是用一个单独的变量来表示线程池的状态,再用另一个变量来表示线程池中线程的数量。这样的确可以,不过Doug Lea并不是这样实现的,他将两者用一个变量来表示(不得不感叹下,大佬就是大佬,想法果然和他人不一样)。那么问题来了,如何用一个变量来表示两个值?阅读过读写锁源码的朋友可能就能立想到另外这一中方案(关于读写锁的介绍可以阅读这一篇文章: 读写锁ReadWriteLock的实现原理)。在读写锁的实现中将一个int型的数值,按照高低位来拆分,高位表示一个数,低位再表示另一个数,在线程池的实现中,Doug Lea再次使用了按位拆分的技巧。

在线程池中,使用了一个原子类AtomicInteger的变量来表示线程池状态和线程数量,该变量在内存中会占用4个字节,也就是32bit,其中高3位用来表示线程池的状态,低29位用来表示线程的数量。线程池的状态一共有5中状态,用3bit最多可以表示8种状态,因此采用高3位来表示线程池的状态完全能满足需求。示意图如下:


在看线程池的核心实现逻辑之前,先简单看下ThreadPoolExecutor中关于各种变量和方法的定义。因为在核心逻辑中,经常会用到它们,而且这些方法和变量大量用到了位运算,看起来不是特别直观,所以提前熟悉它们的功能对于看核心逻辑有很大的帮助。相关源码和注释如下。

// 高2位表示线程池的状态,其他29位表示线程的数量,即worker的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 计数的位数,29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池的最大大小,2^29 -1,
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 线程池的状态,高三位表示线程的运行状态,
// RUNNING: 111
private static final int RUNNING    = -1 << COUNT_BITS;
// SHUTDOWN: 000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// STOP: 001
private static final int STOP       =  1 << COUNT_BITS;
// TIFYING(整理): 010
private static final int TIDYING    =  2 << COUNT_BITS;
// TERMINATED: 011
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// 计算线程池的状态,计算结果的低29位全为0,因此最终结果就是线程池的状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 工作线程的数量,计算结果的高3位全是0,因此最终结果就是工作线程的数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 根据线程池的状态和工作线程的数量,计算ctl,实际上就是将两者合并成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

/*
 * Bit field accessors that don't require unpacking ctl.
 * These depend on the bit layout and on workerCount being never negative.
 */

// 线程池的状态是否小于s
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

// 线程池的状态大于等于s
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

// 判断线程池是否处于运行状态
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

有了前面的基础,接下来分析下核心实现。再使用线程池时我们可以通过execute(Runnable task)来提交一个任务到线程池,因此我们从核心入口execute(Runnable task)方法开始分析。execute()方法的源码如下。在源码中我添加了部分注释,以供参考:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    // workerCountOf(c)方法时计算工作线程的数量,
    // 1. 工作线程数是否小于核心线程数,如果小于核心线程数,就创建新的线程去执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        // 任务执行失败后,重新获取ctl的值,供下面的逻辑计算
        c = ctl.get();
    }
    // 2. 当工作线程数大于等于核心线程数时,线程池是运行状态,且任务添加到队列成功
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果线程池状态不是RUNNING状态,就执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果线程池中线程的数量为0,就调用addWorker()方法创建新的worker线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 线程池非运行状态,或者任务入队失败,就尝试创建新的worker线程
    else if (!addWorker(command, false))
        // 4. 如果创建新的worker线程失败,就执行拒绝策略。
        reject(command);
}

execute()方法的逻辑大致分为4部分,分别对应线程池原理部分所提到的4个步骤。

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

从上面的分析中,可以看出,在好几处逻辑中均调用了addWorker()方法,这说明该方法十分重要。事实也是如此,该方法不仅重要,代码实现还十分复杂。该方法需要两个参数,第一个参数就是我们传入的Runnable任务,第二参数是一个boolean值,传入true表示的是当前线程池中的线程数还没有达到核心线程数,传false表示当前线程数已经大于等于核心线程数了。addWorker()方法的源码很长,这里我将其分为两个部分,下面先看前面一部分的源码。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 1. 当线程池的状态大于SHUTDOWN时,返回false。因为线程池处于关闭状态了,就不能再接受任务了
        // 2. 当线程池的状态等于SHUTDOWN时,firstTask不为空或者任务队列为空,返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 1. 线程数大于等于理论上的最大数(2^29-1),则直接返回false。(因为线程数不能再增加了)
            // 2. 根据core来决定线程数应该和谁比较。当线程数大于核心线程数或者最大线程数时,直接返回false。
            // (因为当大于核心线程数时,表示此时任务应该直接添加到队列中(如果队列满了,可能入队失败);当大于最大线程数时,肯定不能再新创建线程了,不然设置最大线程数有毛用)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 线程数+1,如果设置成功,就跳出外层循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 再次获取线程池的状态,并将其与前面获取到的值比较,
            // 如果相同,说明线程池的状态没有发生变化,继续在内循环中进行循环
            // 如果不相同,说明在这期间,线程池的状态发生了变化,需要跳到外层循环,然后再重新进行循环
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 省略后半部分代码
    // ......
}

在这部分代码中,我们可以看到一个很陌生的语法:retry...break retry...continue retry。这种写法真的是太少见了,少见到我第一眼看到的时候,以为是我不小心碰到键盘,把源码给改了。这种语法有点类似于C语言里面的goto(已经被遗弃),其作用就是在for循环之前定义一个retry,然后在循环中,使用break retry时,就会让代码跳出循环,并不再进入循环;当使用continue retry时,表示跳出当前循环,立马进入到下一次循环。由于这里使用了两层for循环,因此为了方便在内层循环中一下子跳出到外层循环,就使用了retry这种语法。需要说明的是,这里的retry并不是Java里面的关键字,而是随机定义的一个字符串,我们也可以写成a,b,c等,但是后面的break和continue后面的字符串需要和前面定义的这个字符串对应。
我们可以看到,前半部分的核心逻辑就是使用了两个无限for和一个CAS操作来设置线程池的线程数量。如果线程池的线程数修改成功,就中断循环,进入后半部分代码的逻辑,如果修改失败,就利用for循环再一次进行修改,这样的好处是,既实现了线程安全,也避免使用锁,提高了效率。在这一部分代码中,进行了很多判断,这些判断主要是校验线程池的状态以及线程数,个人认为不是特别重要,我们主要抓住核心逻辑即可。
当成功修改线程数量以后,就会执行addWorker()方法的后半部分代码,其源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    // 省略前半部分代码
    // ......
    
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建一个新的worker线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 再次获取线程池的状态,因为在获取锁期间,线程池的状态可能改变了
                int rs = runStateOf(ctl.get());

                // 如果线程池状态时运行状态或者是关闭状态但是firstTask是空,就将worker线程添加到线程池
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 判断worker线程是否已经启动了,如果已经启动,就抛出异常
                    // 个人觉得这一步没有任何意义,因为worker线程是刚new出来的,没有在任何地方启动
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 启动线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {

        // 如果启动失败,就将worker线程从线程池移除,并将线程数减1
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

在这一部分代码中,通过new Worker(firstTask)创建了一个Worker对象,Worker对象继承了AQS,同时实现了Runnable接口,它是线程池中真正干活的人。我们提交到线程的任务,最终都是封装成Worker对象,然后由Worker对象来完成任务。先简单看下Woker的构造方法,其源码如下。在构造方法中,首先设置了同步变量state为-1,然后通过ThreadFactory创建了一个线程,注意在通过ThreadFactory创建线程时,将Worker自身也就是this,传入了进去,也就是说最后创建出来的线程对象,它里面的target属性就是指向这个Worker对象。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable{

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 将this传入进去,最后就是使线程的target属性等于当前的worker对象
        this.thread = getThreadFactory().newThread(this);
    }
}

当通过new Worker(firstTask)创建完worker对象后,此时线程已经被创建好了,在启动线程之前,先通过t.isAlive()判断线程是已经启动,如果没有启动,才会调用线程的start()方法来启动线程。这里有一点需要注意的是,创建完worker对象后,调用了mainLock.lock()来保证线程安全,因为这一步workers.add(w)存在并发的可能,所以需要通过获取锁来保证线程安全。
当调用线程的start()方法之后,如果线程获取到CPU的执行权,那么就会执行线程的run()方法,在线程的run()方法中,会执行线程中target属性的run()方法。这里线程的target属性就是我们创建的worker对象,因此最终会执行到Worker的run()方法。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable{

    public void run() {
        runWorker(this);
    }
}

在Worker类的run()中,直接调用了runWorker()方法。所以Worker执行任务的核心逻辑就是在runWorker()方法中实现的。其源码如下。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 如果worker线程的firstTask不为空或者能从任务队列中获取到任务,就执行
        // 否则就会一直阻塞到getTask()方法处
        while (task != null || (task = getTask()) != null) {
            // 保证worker串行的执行线程
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // beforeExecute()是空方法,由子类具体实现
                // 该方法的目的是为了让任务执行前做一些其他操作
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 真正执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // afterExecute()空方法,由子类具体实现
                    // 该方法的目的是为了让任务执行后做一些其他操作
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker()的源码看起来也比较长,但核心逻辑就一行,即:task.run(),最终执行了我们提交的Runnable任务。在runWorker()方法中通过一个while循环来让Worker对象一直来执行任务。当传入的task对象不为空或者通过getTask()方法能从任务队列中获取到任务时,worker就会一直执行。否则将在finally语句块中调用processWorkerExit退出,让线程中断,最终销毁。
getTask()方法是一个阻塞的方法,当能从任务队列中获取到任务时,就会立即返回一个任务。如果获取不到任务,就会阻塞。它支持超时,当超过线程池初始化时指定的线程最大存活时间后,就会返回null,从而导致worker线程退出while循环,最终线程销毁。
到这儿线程池的execute()方法就分析完了。最后简单分析下线程池的shutdown()方法和shutdownNow()方法。当调用shutdown()方法时,会令线程池的状态为SHUTDOWN,然后中断空闲的线程,对于已经在执行任务的线程并不会中断。当调用shutdownNow()方法时,会令线程池的状态为STOP,然后在中断所有的线程,包括正在执行任务的线程。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 权限校验
        checkShutdownAccess();
        // 将线程池的状态设置为SHUTDOWN状态
        advanceRunState(SHUTDOWN);
        // 中断空闲的worker线程
        interruptIdleWorkers();
        // 空方法,由子类具体去实现,例如ScheduledThreadPoolExecutor
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 尝试将线程池的状态设置为TERMINATED
    tryTerminate();
}
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 线程池状态设置为STOP
        advanceRunState(STOP);
        // 中断所有线程,包括正在执行任务的线程
        interruptWorkers();
        // 清除任务队列中的所有任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

总结

推荐

管程:并发编程的基石
初识CAS的实现原理
Unsafe类的源码解读以及使用场景
队列同步器(AQS)的设计原理
队列同步器(AQS)源码分析
可重入锁(ReentrantLock)源码分析
公平锁与非公平锁的对比
Condition源码分析
读写锁ReadWriteLock的实现原理
Semaphore的源码分析以及使用场景
并发工具类CountDownLatch的源码分析以及使用场景
并发工具类CyclicBarrier的源码分析以及使用场景
wait()和notify()一定成对出现吗?如何解释Thread.join()

上一篇下一篇

猜你喜欢

热点阅读