线程 程序员multiThread

ThreadPoolExecutor学习笔记

2017-05-26  本文已影响125人  time_fly

Java有两个线程池类:ThreadPoolExecutor和ScheduledThreadPoolExecutor,且均继承于ExecutorService。Java API提供了Executors工厂类来帮助创建各种线程池。
Java线程池ExecutorService继承树:


ThreadPoolExecutor

ThreadPoolExecutor相关工作流程

1.ThreadPoolExecutor添加任务:


ThreadPoolExecutor 添加任务的流程

2.ThreadPoolExecutor关闭线程池:线程池的关闭分为两种:平缓关闭(shutdown)和立即关闭(shutdownNow)。

构造方法参数讲解

ThreadPoolExecutor提供4个构造函数,最终均会调用该构造函数

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

创建完线程池后,可通过submit或execute方法提交任务

线程池状态

    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
 private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//线程池最大线程数=536870911(2^29-1)

事实上COUNT_BITS =29,而上面的5重线程状态实际上是使用32位中的高3位来表示,低29位存线程数,这样线程池的状态和线程数量就由一个变量存储,即:

AtomicInteger ctl

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int ctlOf(int rs, int wc) { return rs | wc; }

初始化值为线程数0,状态为RUNNING。ctl变量是整个类的核心,AtomicInteger保证了对这个变量的操作是原子的,保证多线程同步问题,用这个变量保存了两个内容:

以下是关于ctl的一些操作:

/**
 * 这个方法用于取出runState的值 因为CAPACITY值为:00011111111111111111111111111111
 * ~为按位取反操作,则~CAPACITY值为:11100000000000000000000000000000
 * 再同参数做&操作,就将低29位置0了,而高3位还是保持原先的值,也就是runState的值
 * 
 * @param c 该参数为存储runState和workerCount的int值
 * @return runState的值
 */
private static int runStateOf(int c) {
    return c & ~CAPACITY;
}


/**
 * 这个方法用于取出workerCount的值
 * 因为CAPACITY值为:00011111111111111111111111111111,所以&操作将参数的高3位置0了
 * 保留参数的低29位,也就是workerCount的值
 * 
 * @param c  ctl, 存储runState和workerCount的int值
 * @return workerCount的值
 */
private static int workerCountOf(int c) {
    return c & CAPACITY;
}

/**
 * 将runState和workerCount存到同一个int中
 * “|”运算的意思是,假设rs的值是101000,wc的值是000111,则他们位或运算的值为101111
 * 
 * @param rs runState移位过后的值,负责填充返回值的高3位
 * @param wc  workerCount移位过后的值,负责填充返回值的低29位
 * @return 两者或运算过后的值
 */
private static int ctlOf(int rs, int wc) {
    return rs | wc;
}

// 只有RUNNING状态会小于0
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

Worker

内部类Worker是对任务的封装,实现Runnable接口,并继承AbstractQueuedSynchronizer(AQS,队列同步器)实现了一个简单的不可重入(也就是说该锁只能被一个线程获取一次)的互斥锁,因此每个线程实际上关联了一个互斥锁。当线程执行任务时,需要首先获得关联的 Worker 锁,执行完任务之后再释放该锁。

 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;//当前worker对象关联的线程
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;// 线程创建后的初始任务
        /** Per-thread task counter */
        volatile long completedTasks;// 线程完成的任务数量

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            //禁止中断直到执行了 runWorker 方法,在shutdownNow 中断线程之前,会首先判断 state 是否大于  等于 0
        // 所以这里将 state 设为 -1,可以防止当前线程被中断
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                // 成功获得锁
                return true;
            }
            // 线程进入等待队列
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

Worker继承了AQS,使用AQS来实现独占锁的功能。与ReentrantLock区别在于,它是不允许重入的,而ReentrantLock是允许重入的:

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
  2. 如果正在执行任务,则不应该中断线程;
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
  5. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
    所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。

worker中断:

execute和submit

通过 execute 或者 submit 方法都可以向线程池中添加一个任务,submit 会返回一个 Future 对象来获取线程的返回值:

 public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

submit 中只是将 Runnable 对象包装了一下,最终还是调用了 execute 方法。 execute 方法的实现:

  public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
 
        int c = ctl.get();
        // 线程数量少于 corePoolSize,将改任务分配给一个新建的线程
        if (workerCountOf(c) < corePoolSize) {
            // true 表示将当前线程添加为核心线程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
      //线程数量大于等于 corePoolSize,首先尝试将任务添加到任务队列
      // workQueue.offer 会将任务添加到队列尾部
        if (isRunning(c) && workQueue.offer(command)) {
            // 重新检查状态
            int recheck = ctl.get();
          // 如果发现当前线程池不是处于 Running 状态,就移除之前的任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
             // 1. 当前线程池处于 Running 状态,但是工作线程数量为 0,需要创建新的线程
            // 2. 移除任务失败,但是工作线程数量为 0,需要创建新的线程来完成移除失败的任务
                addWorker(null, false);
        }
          //如果workQueue满了,那么这时候可能还没到线程池的maxnum,所以尝试增加一个Worker
          //线程池不是```RUNNING```状态
        else if (!addWorker(command, false))
             // 如果Worker数量到达上限,那么就拒绝此线程
            reject(command);
    }
  1. 再次检查此时的线程池是否还处于RUNNING状态,如果不是的话,将之前插入队列的那个任务移除,然后调用reject(command)拒绝此任务的提交。
  2. 当前线程池处于 RUNNING状态,但我们插入任务到 workQueue 中的同时, 如果此时线程池中的线程都执行完毕并终止了, 在这样的情况下刚刚插入到 workQueue 中的任务就永远不会得到执行了. 为了避免这样的情况, 因此我们由再次检查一下线程池中的线程数, 如果为零, 则调用 addWorker(null, false) 来添加一个线程。

addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
    // 这里一大段的 for 语句, 其实就是判断和处理 core 参数的.
    // 当经过判断, 如果当前的线程大于 corePoolSize 或 maximumPoolSize 时(根据 core 的值来判断), 
    // 则表示不能新建新的 Worker 线程, 此时返回 false.
   // retry 类似于 goto,continue retry 跳转到 retry 定义, 而 break retry 跳出 retry
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 当 core 为真, 那么就判断当前线程是否大于 corePoolSize
            // 当 core 为假, 那么就判断当前线程数是否大于 maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : ma))
                return false;
            // 使用 CAS 方式将线程数量增加,如果成功就跳出 retry,如果失败,证明有竞争,那么重新到retry。
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
             // 如果线程池运行状态发生了改变就从 retry(外层循环)处重新开始,
            if (runStateOf(c) != rs)
                continue retry;

            // 程序执行到这里说 CAS 没有成功,那么就再次执行 CAS
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 程序用一个 HashSet 存储线程,而 HashSet 不是线程的安全的, 所以将线程加入 HashSet 的过程需要加锁。
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                // 1. rs < SHUTDOWN 说明程序在运行状态
                // 2. rs == SHUTDOWN 说明当前线程处于平缓关闭状态,而 firstTask == null
                // 说明当前创建的线程是为了处理任务队列中剩余的任务(故意传入 null)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
// 等价于
if(rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))

runWorker

当创建了线程并成功启动之后,会执行 Worker 的 run 方法,而该方法最终调用了 ThreadPoolExecutor 的 runWorker 方法,并且将自身作为参数传进去了,下面是 runWorker 方法的实现:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 这里将 Worker 中的 state 设为 0,以便其他线程可以获得锁
    // 从而可以中断当前线程
    w.unlock(); // allow interrupts
    // 用来标记线程是正常退出循环还是异常退出
    boolean completedAbruptly = true;
    try {
        // 如果任务不为空,说明是刚创建线程,如果任务为空,则从队列中取任务
        // 如果队列没有任务,线程就会阻塞在这里
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            try {
                // 任务执行之前做一些处理,空函数,需要用户定义处理逻辑
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    // 因为 runnable 方法不能抛出 checkedException ,所以这里
                    // 将异常包装成 Error 抛出
                    throw new Error(x);
                } finally {
                    // 任务执行完之后做一些处理,默认空函数
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //处理worker退出的逻辑
        processWorkerExit(w, completedAbruptly);
    }
}

w.unlock();

woker在创建时调用setState(-1),将state设为-1,抑制工作线程的 interrupt 信号, 直到此工作线程正是开始执行 task. 那么在 addWorker() ``中的`` w.unlock() 就是允许 Worker 的 interrupt 信号。unlock()方法最终会调用setState(0)将状态设为0

private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())

总结一下runWorker方法的执行过程:

  1. while循环不断地通过getTask()方法获取任务;
  2. getTask()方法从阻塞队列中取任务;
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
  4. 调用task.run()执行任务;
  5. 如果task为null则跳出循环,执行processWorkerExit()方法;
  6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。
completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。

getTask

getTask方法用来从阻塞队列中取任务,代码如下:

private Runnable getTask() {
      //主要是判断后面的poll是否要超时
        boolean timedOut = false; // Did the last poll() time out?

        retry: for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 1.rs > SHUTDOWN 所以rs至少等于STOP,这时不再处理队列中的任务
            // 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,这时还需要处理队列中的任务除非队列为空
            // 这两种情况都会返回null让runWoker退出while循环也就是当前线程结束了
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 递减workerCount值
                decrementWorkerCount();
                return null;
            }

            // timed 用于判断是否需要对线程进行超时控制
            boolean timed;

            // 1.RUNING状态
            // 2.SHUTDOWN状态,但队列中还有任务需要执行
            for (;;) {
                int wc = workerCountOf(c);

                 // 1. allowCoreThreadTimeOut: 为 true 说明可以对 core 线程进行超时控制
                // 2. wc > corePoolSize: 说明线程池中有非 core 线程
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                //线程数量不大于 maximumPoolSize 并且没有超时,则退出循环,否则workerCount递减,返回null,结束当前thread
                if (wc <= maximumPoolSize && !(timedOut && timed))
                    break;

                // workerCount递减,结束当前thread
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get(); // Re-read ctl
                // 需要重新检查线程池状态,因为上述操作过程中线程池可能被SHUTDOWN
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                  // 如果允许超时控制,则执行 poll 方法,该方法响应超时,当 keepAliveTime 时间内
                  // 仍然没有获取到任务,就返回 null。take 方法不响应超时操作,当获取不到任务时会一直等待。
                  // 另外不管 poll 还是 take 方法都会响应中断,如果没有新的任务添加到队列中,会直接抛出 InterruptedException
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;// 超时
            } catch (InterruptedException retry) {
                timedOut = false;// 线程被中断重试
            }
        }
    }

*什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。

runWorker()getTask()这两个方法完成了任务的获取和阻塞线程的工作,大致流程:

  1. 通过 while 循环不断的从任务队列中获取任务,如果当前任务队列中没有任务,就阻塞线程。
  2. 如果 getTask 返回 null,表明当前线程应该被回收,执行回收线程的逻辑。
  3. 如果成功获取任务,首先判断线程池的状态,根据线程池状态设置当前线程的中断状态
  4. 在执行任务之前做一些预处理(用户实现)
  5. 执行任务
  6. 在执行任务之后做一些后处理(用户实现)

processWorkerExit

getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法。

   private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
    // 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。 
        if (completedAbruptly) 
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //统计完成的任务数
            completedTaskCount += w.completedTasks;
            // 从workers中移除,也就表示着从线程池中移除了一个工作线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        // 根据线程池状态进行判断是否结束线程池
        tryTerminate();

        int c = ctl.get();
        //当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,直接添加一个线程;
        // 线程正常结束, 如果允许对 core 线程进行超时控制,并且任务队列中有任务, 则保证线程数量大于等于 1
        // 如果不允许对 core 进行超时控制,则保证线程数量大于等于 corePoolSize
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:

tryTerminate

processWorkerExit 中调用了 tryTerminate 方法,该方法根据线程池状态进行判断是否结束线程池。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
         /*
         * 当前线程池的状态为以下几种情况时,直接返回:
         * 1. RUNNING,因为还在运行中,不能停止;
         * 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了;
         * 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task;
         */
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;


            // 只能是以下情形会继续下面的逻辑:结束线程池。
            // 1.SHUTDOWN状态,这时不再接受新任务而且任务队列也空了
            // 2.STOP状态,当调用了shutdownNow方法

            if (workerCountOf(c) != 0) { 
              // 如果工作线程数量不为 0,中断线程池中第一个线程
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
            // 将线程状态设为 TIDYING,如果设置不成功说明线程池的状态发生了变化,需要重试
            //如果设置成功,则调用terminated方法
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                      // terminated方法默认什么都不做,留给子类实现
                        terminated();
                    } finally {
                        // 将线程状态设为 TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

tryTerminate ()方法中,如果满足下面两个条件,就将线程池的状态设为TIDYING,然后执行完terminated()后,线程池状态变为TERMINATED:、

tryTerminate ()方法中,当工作线程不为0的时候,回去尝试中断线程池中的一个线程,这样做的主要目的在于防止shutdown ()方法的中断信号丢失。

shutdown ()方法被调用时,会执行interruptIdleWorkers(),此方法会先检查线程是否是空闲状态,如果发现线程不是空闲状态,才会中断线程,中断线程让在任务队列中阻塞的线程醒过来。但是如果在执行interruptIdleWorkers()方法时,线程正在运行,此时并没有被中断;如果线程执行完任务后,然后又去调用了getTask(),这时如果workQueue中没有任务了,调用workQueue.take()时就会一直阻塞。这时该线程便错过了shutdown() 的中断信号,若没有额外的操作,线程会一直处于阻塞的状态。所以每次在工作线程结束时调用tryTerminate方法来尝试中断一个空闲工作线程,避免在队列为空时取任务一直阻塞的情况,弥补了shutdown() 中丢失的信号。

interruptIdleWorkers

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 首先看当前线程是否已经中断,如果没有中断,就看线程是否处于空闲状态
            // 如果能获得线程关联的 Worker 锁,说明线程处于空闲状态,可以中断
            // 否则说明线程不能中断
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            // 如果 onlyOne 为 true,只尝试中断第一个线程
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers遍历workers中所有的工作线程,若线程没有被中断tryLock成功,就中断该线程。

shutdown

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 检查当前线程是否有关闭线程池的权限
        checkShutdownAccess();
        // 将线程池状态设为 SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断线程,这里最终调用 interruptIdleWorkers(false);
        interruptIdleWorkers();
        // hook 方法,默认为空,让用户在线程池关闭时可以做一些操作
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

shutdownNow

public List < Runnable > shutdownNow() {
    List < Runnable > tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 检查线程是否具有关闭线程池的权限
        checkShutdownAccess();
        // 更改线程状态
        advanceRunState(STOP);
        // 中断线程
        interruptWorkers();
        // 清除任务队列,并将任务返回
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

interruptWorkers

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 不管线程是否空闲都执行中断
        for (Worker w: workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

interruptWorkers() 方法中,只要线程开始了,就对线程执行中断,所以 shutdownNow() 的中断信号不会丢失。drainQueue ()主要作用是清空任务队列,并将队列中剩余的任务返回。

drainQueue

private List <Runnable> drainQueue() {
    BlockingQueue <Runnable> q = workQueue;
    ArrayList <Runnable> taskList = new ArrayList < Runnable > ();
    // 该方法会将阻塞队列中的所有项添加到 taskList 中
    // 然后清空任务队列,该方法是线程安全的
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        // 将 List 转换为 数组,传入的 Runnable[0] 用来说明是转为 Runnable 数组
        for (Runnable r: q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

通过上面的分析,shutdownNow方法与shutdown方法类似,不同的地方在于:

  1. 设置状态为STOP;
  2. 中断所有工作线程,无论是否是空闲的;
  3. 取出阻塞队列中没有被执行的任务并返回。

shutdownNow()方法执行完之后调用tryTerminate()方法,目的就是使线程池的状态设置为TERMINATED。


以上都是学习一些大神相关源码分析和对照源码阅读的相关学习记录,ThreadPoolExecutor源码有些还是比较晦涩难懂的,一些地方还是理解的不是很透彻。这也是自己第一次尝试写学习笔记,也希望对正在学习了解ThreadPoolExecutor的同学提供一点帮助。

参考文章

【Java 并发】详解 ThreadPoolExecutor
深入理解Java线程池:ThreadPoolExecutor

上一篇下一篇

猜你喜欢

热点阅读