java源码学习Java并发

TreadPoolExecutor源码学习

2020-07-13  本文已影响0人  慕北人

Java中的线程池在各种开源库中频频出现,是程序员必须掌握的一个知识点,今日终于下定决心学习一下其源代码。

首先大致说一下线程池的工作原理:

  1. 如果Worker线程的数量小于核心池的容量corePoolSize,那么新建一个Worker线程,并加入到线程池
  2. 如果Worker线程的数量大于等于corePoolSize,且等待队列workerQueue没有满的情况,那么会将该任务放入到等待队列中
  3. 如果Worker线程数大于等于corePoolSize,但小于最大线程容量maxPoolSize,而且等待队里已经满了,那么会新建一个Worker线程放入到线程池中
  4. 如果Worker线程数大于等于maxPoolSize,那么会执行rejecet拒绝策略

相信上面的工作原理,大家从很多地方都看到过,接下来我们就一步一步按照这个思路来看看线程池是如何实现这种思路的。

一、基础开篇

在进行学习时,这里现需要了解几个重要的成员,提前理解这些成员在工作中的作用很有帮助。

1. 状态变量

下面定义的一系列变量用来表明当前线程池所处于的状态

    private static final int COUNT_BITS = Integer.SIZE - 3;  // 其值为29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;   // 高三位为000,剩余位为1
    private static final int RUNNING    = -1 << COUNT_BITS;  // 高三位:111,剩余位为0
    private static final int SHUTDOWN   =  0 << COUNT_BITS;  // 高三位:000,剩余位为0
    private static final int STOP       =  1 << COUNT_BITS;  // 高三位:001,剩余位为0
    private static final int TIDYING    =  2 << COUNT_BITS;  // 高三位:010,剩余位为0
    private static final int TERMINATED =  3 << COUNT_BITS;  // 高三位:011,剩余位为0

    // 从c中解析处高三位
    private static int runStateOf(int c)     { return c & ~CAPACITY; }  // 得到c的高三位
    private static int workerCountOf(int c)  { return c & CAPACITY; }   // 得到c的低29位
    private static int ctlOf(int rs, int wc) { return rs | wc; }        // rs代表runState、wc代表workCount,该方法将他们包装成ctl

关于线程池的不同状态,有如下几个说明需要了解:

关于上面的结论,我们在分析源码的过程中一一进行说明。

需要说明的是,一个int的前三位被用来区分线程池不同的状态,而后面的29位用来保存worker线程的数量,注意,因为用来表示worker线程数量的bit位只有29位了,所以线程池可以持有worker线程理论上最大的数量就是CAPACITY。而后面的三个方法就很明显可以看出其功能分别为:从一个int值中解析出当前的state状态、从一个int值中解析出当前worker线程的数量、将一个表示state的int和一个表示worker数量的int合成一个int。

说了这么多,没错,在线程池中就定义了这么一个int值一上面所说的形式保存了state和worker数量:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

而且可以发现,其初始化就是调用的ctlOf()方法,其初始状态下state为RUNNING、worker线程数量为0,很符合场景。

2. 关键成员

上面一直提到worker线程,这里有必要介绍一下,实际上Worker是一个内部类,并不是一个线程。说来惭愧,直到现在才明白了线程与Runnable的区别。线程就是一个线程,而Runnable(不如说是Task)就是一个“任务”,这个任务我们通常是将它放到线程中去执行,从而营造一种“异步”的错觉;说白了,线程就是程序向进程申请的一块资源,他可以用这个线程去做很多很多的、不同的任务,二者并不能用一个等号相连。在线程池中,二者这种分离的关系非常明显。

在线程池中,Worker的定义如下:

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

   
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }
    }

首先,Worker实现了两个接口:AbstractQueuedSynchronizer、Runnable。这就意味着,Worker有两种角色,首先由于AQS这个接口,Worker自身是一把锁,其可以实现加解锁的功能(Java中的可重入锁、读写锁等都是这个接口的实现类);其次,Worker也是一个Runnable的实现类,这就说其可以作为一个参数传递给Thread的构造函数,从而将其run方法作为Thread的run方法实现。

更关键的是,Worker有一个Thread成员,没错,之所以叫他Worker线程就是因为线程池中的线程就是封装在了一个个Worker中,在线程池自始至终,Worker和线程的数量是一一对应的,所以在理解上可以将二者画上等号。可以看到,在Worker的构造函数中,thread在初始化的时候传递的就是Worker本身,上面刚说过,Worker是Runnable的实现类,所以这个thread的run方法实现就是Worker的方法实现,所以后面可以看到,启动一个Worker只需要简单的调用该Worker内部的thread.start()方法即可,而Worker的run方法实现中的runWorker方法蕴含了线程池复用线程的奥秘

3. 构造函数

ThreadPoolExecutor有多种构造函数,但是归根结底是调用的如下的:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

其中的threadFactory和handler分别可以不显式指定,采用默认值,threadFactory的默认值为:Executors.defaultThreadFactory(),该成员用来创建Worker内部的thread,如果你留心的话会发现上面在介绍Worker时,其构造函数中对thread的初始化就是使用的threadFactory实现的;handler的默认值为:new AbortPolicy(),该类的拒绝策略是直接抛弃对应的task,不做任何处理。

二、工作方法

线程池有一系列public方法供我们使用,接下来就一步一步分析。

1. execute方法

线程池的execute方法用来接收一个Runnable对象,之后针对当前线程池的状态对Runnable对象采取不同的处理:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();

            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 注意:这里第一个参数为null
                addWorker(null, false);
        }
        
        else if (!addWorker(command, false))
            reject(command);

        // 综上可以发现,如果线程池的状态runState >= SHUTDOWN;那么其总会执行拒绝策略
        // 因此只有RUNNING状态才可以addWorker
    }

接下来我们就来分析一下这个方法中所蕴含着的、我们开篇就说到的线程池的工作原理。首先要说明的一点,该方法开始就对command进行了非空校验,所以后面的command都是非空的,这一点对于addWorker()方法执行成功与否有一定影响。

原理一:如果Worker线程的数量小于核心池的容量corePoolSize,那么新建一个Worker线程,并加入到线程池


可以发现第一个if语句:

        if (workerCountOf(c) < corePoolSize) {
            /**
             * add失败原因有两个:
             * 1. 由于其他线程的并发操作,突然使得workCount到达了corePoolSize
             * 2. 由于其他线程的并发操作,突然使得线程池的state >= SHUTDOWN
             */
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

从ctl中得到当前的worker线程的数量,判断如果此时小于corePoolSize,那么会直接进行addWorker新建一个线程并且直接承载该command任务。

注意:这里只是说明线程池的工作思想,大家不要拘泥于execute每一步的结果。因为,在判断worker线程数量小于corePoolSize后,执行的addWorker是有失败的可能的,即返回false。但是后面你就会明白,除了其他地方并发调用了线程池的shutdown()方法,导致线程池不再接收新的task而拒绝添加worker之外;另外的情况是其他地方也是并发调用了addWorker方法并成功添加了,导致此时worker线程数量大于等于corePoolSize了,从而使得本次addWorker调用失败。但是,那个并发调用addWorker成功的场景同样是workerCount < corePoolSize,与线程池的原理一并不矛盾。

(关于addWorker失败的情况我们下文会分析)

原理二:如果Worker线程的数量大于等于corePoolSize,且等待队列workerQueue没有满的情况,那么会将该任务放入到等待队列中


这块儿的内容位于第二个if语句,首先,能够执行到这里的前提是第一个if语句条件不成立,或者第一个if语句addWorker失败,返回了false;不管是哪种条件,都蕴含着当前线程池的状态说明了:workerCount >= corePoolSize


        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();

            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                
                addWorker(null, false);
        }

首先能看到首先对线程池的状态进行判断,如果不是处于RUNNING状态,那么会拒绝接受新的任务,这里正说明了:只有RUNNING状态的线程池才可以接受添加新的worker、接受新的task。如果处于RUNNING状态,那么会首先将其加入任务队列。注意,该if代码块中,虽然有采取addWorker的情况,但是可以发现该addWorker在调用的时候,传递的Runnable参数为null,并不是command,所以线程池的原理二中没说添加一个worker线程是准确的、成立的

读者可能会对这个代码块里面的处理感觉有些迷惑,接下来我们就来详细分析一下。首先来看第一个if语句:

if (! isRunning(recheck) && remove(command))
                reject(command);

首先解释一下为何还要recheck一下线程池的状态,同原理一的时候一样,就怕其他地方并发调用了线程池的shutdown()方法,所以这里的if语句第一个判断就是判断是否还是RUNNING状态,如果此时线程池不是RUNNING了,那么if的第二个条件负责将已经加入到等待队列中的任务移除,这也符合处于SHUTDOWN状态的线程池拒绝接受新的任务的设定

好了,如果线程池不处于RUNNING状态,且成功的将已经添加到等待队列中的任务吐了出来,那么就可以执行reject策略了。再来看看else if的处理:

else if (workerCountOf(recheck) == 0)
     addWorker(null, false);

在这里我们主要分析两个点:

首先回答第一个疑惑,我们回想一下上面的if语句,何时回进入到elseif的判断。答案是有以下几种情况:

对于第一种情况,我们冷静地想一想,当前线程池处于RUNNING,但是workerCount >= corePoolSize,为什么还要有workerCount==0这么一个判断啊?答案就是,线程池的corePoolSize允许被设置为0!!所以,如果corePoolSize真的被设置为了0,又是首次执行execute方法,那么当前workerCount就真的为0,所以需要添加一个worker来处理这个task。那样就会产生了问题二,为啥不将addWorker的第一个参数设置为command,而是null?答案还在于并发操作的问题,在进行完if判断之后,可能由于并发操作,线程池突然不是RUNNING状态了,这就等价于情况二了,我们接下来分析。

对于情况二产生一种后果,那就是:由于线程不处于RUNNING状态,本应该拒绝接受新的task,所以该从等待队列中移除的task没有被成功移除。对于runState > SHUTDOWN的情况我们不用担心,线程池在调用相应的改变线程池状态的方法时会处理掉等待队列中的task。唯有SHUTDOWN状态是个特殊点,如果您记忆好的话,应该会记得上面在介绍线程池的状态的时候,对于SHUTDOWN状态,虽然不会接受新的task,但是他会等待队列中待处理的任务都处理完毕,没错,这里既然有一个没有被移除的task,如果你是SHUTDOWN状态的话需要处理掉该task。如果此时workerCount不为0的话,那么该task只要等待就好,早晚会轮到他;但是如果此时workerCount为0的话,那么就需要调用addWorker来专门添加一个worker线程来处理这个漏网之鱼了。

接下来就来回答第二个问题,为啥addWorker的时候Runnable不传递command而是null?答案就在addWorker方法的实现中,在方法实现中会有判断:如果当前是SHUTDOWN状态,那么只有worker的firstTask为null才可以将worker线程添加到线程池中,才能执行该方法链:worker.thread.run()->worker.runWorker()->getTask().run();从而完成这个漏网之鱼

不仅感叹作者思维之缜密!!

原理三:如果Worker线程数大于等于corePoolSize,但小于最大线程容量maxPoolSize,而且等待队里已经满了,那么会新建一个Worker线程放入到线程池中


该原理蕴含在最后一个elseif语句中,首先同样说一下,能够执行到这里的隐含条件:“线程池状态>=SHUTDOWN || workerQueue.offer()返回false,即等待队列已满”

        /**
         * 首先,能够执行到这里的条件是:
         * 1. 线程池的状态 >= SHUTDOWN
         * 2. 线程池的状态为RUNNING,但是workQueue的offer方法为false,即队列已满
         * 
         * 其次,else if返回true的条件,也即最终执行拒绝策略的情况:
         * 1. 线程池的状态 >= SHUTDOWN
         * 2. workerCount >= maxPoolSize
         * 
         * 结合上面两块分析,可以发现如果workQueue是满的导致第二个if中offer方法失败
         * 但是workerCount < maxPoolSize的情况下,addWorker方法会成功增加一个Worker
         * 线程来处理任务
         */
        else if (!addWorker(command, false))
            reject(command);

可以很直观的看到,addWorker方法的副作用就是新创建一个Worker线程并且承载当前的command任务,所以原理三也是正确的。

实际上原理三和原理一是一模一样的,addWorker第二个参数为true说明将workerCount与corePoolSize进行比较,为false说明与maxPoolSize进行比较;同样addWorker失败的场景也是:并发导致线程池的状态>=SHUTDOWN或者并发导致workerCount>=maxPoolSize;但这些都是execute步骤的具体结果,同原理一一样,这里也同样不与原理三的思想相违背。

原理四:如果Worker线程数大于等于maxPoolSize,那么会执行rejecet拒绝策略


原理四同样蕴含在原理三中,正是原理三elseif条件失败的结果,执行reject策略。失败的结果有两种:workerCount>=maxPoolSize或者当前线程池的状态>=SHUTDOWN。

2. addWorker方法

addWorker方法在执行execute的时候在一些场景使用到了,该方法的作用就是创建一个Worker线程,并且交给线程池维护,返回true说明操作成功:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);

                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;

                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {

                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();

                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

该方法的代码很长,但是可以分成两个部分:

retry双重锁检查

retry是一个嵌套的自旋代码,外层自旋负责检查线程池的状态runState,内层循环负责检查workerCount是否满足限制、并且能够在被允许的情况下更新线程池的ctl的值

retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

首先我们来看看,外层自旋是如何通过线程池的runState来进行判断的:

for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            ...
        }

这个if条件写的太过抽象,我们将它等价的转化一下:

(rs > SHUTDOWN || (rs == SHUTDOWN && firstTask != null) || rs == SHUTDOWN && workQueue.isEmpty())

很明显,如果当前线程池的状态为上面的三个中的一个,那么直接会返回false表示addWorker失败,接下来我们就挨个分析这三个条件。

第一个是rs>SHUTDOWN:这正是第一部分介绍的线程池的状态runState>SHUTDOWN都拒绝接受新的task的来源。

第二个是(rs == SHUTDOWN && firstTask != null),初看大家可能会觉得有些摸不着头脑,但是结合我们上面分析execute方法时的场景就会明白了。在上面我们说过了,SHUTDOWN状态的线程池按理说是不会接受新的task的,而通过addWorker传递的task就表明线程池准备接受该task,所以一旦addWorker最终返回true了,那么就真的表明SHUTDOWN状态下的线程池允许接受新的task;这显然就矛盾了,所以就直接返回false,并不会往下执行给你机会。

注意:对于execute方法传递的Runnable任务,该任务的归宿有三种:被rejecet策略处理;放入等待队列中等待worker处理,虽然这样会出现被空闲的Worker线程立刻拿去处理,但是我们还是将它当做是先进入等待队列中等待;被一个新的Worker线程承载直接处理

第三个就是线程池处于SHUTDOWN状态,同时当前等待队列为空,此时就更不允许addWorker成功了,直接返回false。

看完了外层的自旋锁检查,接下来就看看内层的自旋锁是干什么的:

for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  
                if (runStateOf(c) != rs)
                    continue retry;
            }

内层自旋锁的检查很简单,就是看不同情况下当前worker线程的数量是否达到了上限,值得注意的是最后的一个if代码,这里再次检查就是为了怕别的地方并发操作更改了线程池的runState。

经过了这么一趟折腾,只是知道了线程池可以准许添加新的worker线程,并且已经修改好了线程池的ctl字段,并没有一些实质性的进展;就好比盖房子之前要向有关部门办理很多证明一样,此时还未开工。

新建Worker并交给线程池维护

上面的两个自旋锁进行新建Worker的申请,接下来就是新建Worker线程并且交给线程池处理的代码了:

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {

                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();

                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;

注意,可以看到该方法最终返回的是workerStarted,这就是说明虽然两重自旋锁获取了线程池的准许,但是并不能保证一定能成功,原因还是因为其他地方并发调用了线程池的某些方法导致线程池的状态发生了改变;注意,由于上面的双自旋检查中通过CAS操作更改了线程池的workerCount,也就是本次addWorker操作已经预定了一个Worker的位置,所以不会存在其他地方并发操作导致workerCount的数量突然超出了限制,从而因此本次addWorker的失败的情况。因此,这里的情况一般是由于其他地方调用了shutdown、shutdownNow等方法改变了线程池的runState。

上述代码可以简化成下面的伪代码:

新建Worker;
lock();
    if(符合条件):
        将Worker放入线程池;
        设置标记为true;
unlock();
if(Worker放入线程池成功):
    启动Worker;
if(Worker放入线程池失败):
    恢复线程池的状态ctl中的workerCount

实际上根据上面的伪代码就很容易明白了,首先这里着重给出了加锁和解锁两个动作,这很重要,因为想要将Worker线程放入线程池中,需要检查线程池的runState状态,此时需要获取线程池的全局锁,而能够改变线程池的runState的方法都需要获取线程池的全局锁,所以这里加锁就能保证在将Worker线程添加入线程池的过程中不会发生线程池runState的突变

其次,这里的if条件为:

(rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null))

第一个线程池的runState状态为RUNNING很显然符合线程池的原理;而第二个条件就很有趣了,正是我们在分析execute的原理二的时候说到的,此时线程池的状态为SHUTDOWN,但是此时addWorker调用时传递的Runnable为null,这保证了虽然会新建一个Worker线程,但是这个线程并没有直接承载task任务,而是用来处理等待队列中哪些已经存在的、处于等待状态的task,并不违背SHUTDOWN状态下的线程池不允许接受新task的原则

注意当添加Worker失败之后,finally代码块中会执行一个方法addWorkerFailed():

private void addWorkerFailed(Worker w) {

        // 该方法也需要获取全局锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

这里的decrementWorkerCount方法将双自旋检查中对workerCount的增1操作回滚了。关于该方法的其他代码这里先不做分析。

Worker线程的启动

第一部分介绍Worker这个类的时候提到过,Worker实现了Runnable接口,其被作为一个参数传递给了ThreadFactory的newThread方法,所以其run方法就是thread的run方法的实现,因此这里启动Worker的代码就是简单的一句t.start()

if (workerAdded) {
       t.start();
       workerStarted = true;
 }

3. runWorker方法

runWorker方法是线程池中Worker线程的工作方法,其不停地从等待队列中取出任务处理:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {

            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

很显眼,Worker线程不停地从等待队列中取task就是通过一个自旋实现的;值得注意的是,在自旋代码之前有一个w.unlock(),因此,在没有任务执行期间,一个Worker线程是允许被中断的;而由于得到一个任务之后,worker就加锁了,所以在一个Worker执行任务期间是无法被中断的

while循环中的getTask就是不停的从等待队列中取task,如果队列为空的话,那么该Worker线程会阻塞在getTask()处,并且成为了一个空闲的Worker线程

第一个if语句负责在线程池的runState>=STOP的时候中断Worker内部的线程。

需要说的一句话是,上面有两个方法beforeExecute和afterExecute,线程池并没有实现这两个方法,开发人员可以根据需要实现这两个方法做一些处理。

而如果该方法能从while循环中跳出去,说明getTask()方法返回了null,在后面分析getTask()方法时会知道,返回null说明该线程空闲太多时间,需要被清除掉了。而负责清除Worker线程的方法就是finally块中的processWorkerExit方法,该方法负责将参数中的Worker线程从线程池的容器中移除。

4. getTask方法

该方法负责从等待队列中取出一个task交给空闲的Worker线程处理,其返回null的情况有以下几种:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                   Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

首先可以看出,这里又是一个无限循环。但是值得注意的一点是:getTask会阻塞并不是因为这个无限循环,而是workQueue的take方法;对于需要考虑超时的情况下workQueue.poll返回null紧接着就会导致getTask返回null从而超时的空闲Worker会被回收;但是对于那些没有超时限制的Worker,当队列为空时他们会阻塞在workQueue.take()方法处

对于大部分情况的代码很简单,唯一需要分析的只有对于空闲超时的处理,其中的timed变量的作用是表示当前是否需要处理空闲超时的问题,如果timed为false,那么即使出现了空闲超时的Worker线程那么也不会去理会。而timed为true的情况有两种:

分析了超时的情况,再来看看超时是怎么产生的。

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
 if (r != null)
    return r;
timedOut = true;

如果没能return成功,说明r为null,说明超时了。

5. shutdown方法

上面很多地方都要进行线程状态runState的判断,而SHUTDOWN状态又是一个很特殊的状态,我们这就看看线程池关闭时的动作有哪些:

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); 
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

可以看到,调用shutdown方法需要获取全局锁,因此addWorker中往线程池中添加Worker线程的步骤不可能与shutdown并发,所以保证了二者的操作是安全的。

其中chechShutdownAccess用来检查各个Worker线程的中thread是否是可访问的。

而advanceRunState方法使用自旋锁+CAS的方式将runState设置为SHUTDOWN

interruptIdleWorkers方法负责将空闲的Worker线程中的thread执行interrupt操作,有意思的是该方法内部对Worker线程是否是空闲状态的判断为执行worker的tryLock()方法,上面在分析runWorker方法是提到,对于有task需要执行的Worker线程,在执行task之前Worker会进行加锁操作,因此可以通过tryLock()方法的返回值是true判断Worker处于工作状态。

onShutdown方法是一个空方法,留给开发者根据需求自己实现。

回收空闲线程的原理

shutdown方法会调用interruptIdleWorkers方法来回收空闲线程:

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;

                // 对于处于工作状态的Worker线程,tryLock会失败
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

代码符合我们上面的描述,但是问题来了,这是怎么回收的呢?因为上述代码只是将所有空闲的Worker的线程执行了interrupt方法,只是将其中断标记为设置为了true,并不影响线程的运行啊?

答案就在getTask()这个方法中,如下代码会抛出中断异常,导致在getTask的自旋锁中重新开始进行判断

        try {
                   workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }

还记得getTask方法返回null的场景中一个是runState为SHUTDOWN而且此时等待队列中没有task,没错,由于执行了shutdown方法,此时线程池的状态已经变为了SHUTDOWN,而空闲Worker之所以空闲,不就是因为此时等待队列为空吗,所以getTask返回null的条件直接成立,而在runWorker中如果getTask返回的是null,那么会执行processWorkerExit方法将Worker移除,绕了一圈shutdown方法是通过getTask返回null来触发processWorkerExit方法移除空闲线程的。

6. shutdownNow

该方法就比较狠了,就连等待队列中的任务也会被舍弃。

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);

            // 这个方法比较狠:
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

该方法的大致流程与shutdown方法一样,只不过这里是将线程池的runState设置为STOP

中断所有线程的原理

最大的不同就在于这里执行的是interruptWorkers方法,该方法负责将所有的Worker中断,即使你是active的,这样符合我们在开篇中介绍线程池不同runState含义时的描述。

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

由于涉及到中断工作状态的Worker,所以该方法内部调用的是worker的interruptIfStarted方法,而不是调用thread.interrupt()方法。

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

可见,结合interruptWorkers和interruptIfStarted方法实现将所有Worker线程执行interrupt操作。

而shutdownNow方法中断所有线程的原理也是利用getTask方法返回null的特点,只不过这里所有的Worker线程都被中断了,而且此时线程池的runState为STOP,所以直接满足getTask返回null的第一个条件,所以回收所有Worker的原理为:“Worker线程被中断-getTask中等待队列取出任务的代码发生中断异常-getTask自旋检查时发现runState为STOP直接返回null-触发runWorker中的processWorkerExit方法移除Worker线程”

而最后的drainQueue()方法负责将等待队列中等待的task取出,并最终返回给开发者。

7. Worker的回收

上面讲过,回收Worker是使用的processWorkerExit方法,而在我们的场景中,通常是runWorker方法中的getTask方法返回null时最终会回调processWorkerExit方法,此时传递的第二个参数为false

    final void runWorker(Worker w) {
        ...
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                ...
            }
            completedAbruptly = false;
        } finally {
            // 用来将没有剩余价值的worker从线程池中移除
            processWorkerExit(w, completedAbruptly);
        }
    }

接下来我们就看一看processWorkerExit方法是如何回收Worker线程的:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

可见,该方法首先做的事情是将该Worker线程从线程池中移除,之后调用了tryTerminate()方法,最后判断是否需要返回;注意,这里并没有维护WorkerCount的值的处理,是因为该处理在getTask方法中return null之前做了。值得提醒的就是最后这一段判断:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        ...
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

不管是调用shutdown方法还是日常维护Worker线程的数量,这里的min参数在一些情况下会保证Worker线程的数量在corePoolSize。这在平常的场景是必要的,但是如果我们调用了shutdown方法,那么就会产生一些问题,当队列中只剩下一个task时,还有两个Worker线程去getTask请求,都到达了等待队列的take方法处,此时会有一个一直阻塞到这里,那么也就导致被阻塞的Worker永远阻塞,同时也无法被回收了。

没错,针对该问题,解决的方案就在tryTerminate方法中:

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

首先这是一个无限循环,然后我们看看开始的两个if语句是如何判断终止terminate的:

该方法正是通过不停的中断一个空闲的Worker线程来解决上面的阻塞在队列的take方法的Worker线程无法回收的问题的。

接下来我们来看看后面的一堆代码的工作:注意,代码能够执行到这里,隐含了上述三个条件都为false的情况,此时线程池的状态为:runState是SHUTDOWN、等待队列为空、workerCount为0,这种状态明显线程池没有任何任务、也不会有新的任务、同时Worker线程都被回收了,终于到了结束线程池的时候了!!

final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }

第一个if就是更改线程池的ctl状态,然后执行terminated方法,最后更改线程池的状态为TERMINATED表明线程池已经结束了。terminated方法是一个空方法,留给开发者自己根据需求实现。

至此,线程池的生命算是结束了。

上一篇下一篇

猜你喜欢

热点阅读