线程池源码剖析

2020-10-24  本文已影响0人  竺旭东

## 前置说明

```

所有的源码基于JDK11.0.2

```

## 如何使用线程池呢?

```java

public class WeChatBlogDemos {

@Test

  public void useThreadPool() throws InterruptedException {

      // 创建线程池

      ExecutorService executorService = Executors.newFixedThreadPool(4);

      // 往线程池提交任务,等待异步执行

      executorService.submit(() -> System.out.println("hello world"));

      // 关闭线程池

      executorService.shutdown();

      // 阻塞 2 秒等待线程池终止完成

      executorService.awaitTermination(2,TimeUnit.SECONDS);

  }

}

```

## 为什么使用线程池呢?

```

1)每个Java线程都会对应一个操作系统的工作线程,频繁的创建和销毁线程会有很大的开销,线程池能提高线程的复用率,避免线程频繁的创建和销毁,

2)线程池空闲时能自动缩小容量,防止消耗过多的系统资源,避免资源浪费。

```

## 当我往线程池里提交一个任务时,发生了什么【ThreadPoolExecutor.execute】?

```

1)外部系统通过 shutdown 或 shutdownNow 显式触发了线程池的关闭流程,任务提交失败

2)线程池处于 RUNNING 状态

    1)线程池的工作线程数 < 核心线程数,则新增工作者线程进行任务处理

    2)线程池的工作线程数 = 核心线程数,往任务队列里提交成功

    3)核心线程数 <= 线程池的工作线程数 < 最大线程数 && 往任务队列里提交失败,则新增工作者线程进行任务处理

    4)线程池的工作线程数 = 最大线程数 && 往任务队列里提交失败,通过拒绝策略处理任务

```

```java

public class ThreadPoolExecutor extends AbstractExecutorService {

/**

    * 用于创建工作者线程的线程工厂

*/

private volatile ThreadFactory threadFactory;

/**

    * 线程池饱和或关闭时,用于拒绝任务的拒绝执行处理器

*/

private volatile RejectedExecutionHandler handler;

/**

    * 空闲工作线程等待任务的最大纳秒数

*/

private volatile long keepAliveTime;

/**

    * true:核心工作者线程空闲时也会推出

    * false:核心工作者线程空闲时不退出

*/

private volatile boolean allowCoreThreadTimeOut;

/**

    * 核心工作线程数,限制数量为2^29-1

*/

private volatile int corePoolSize;

/**

    * 最大工作线程数,限制数量为2^29-1

*/

private volatile int maximumPoolSize;

/**

    * 默认的拒绝执行处理器

*/

private static final RejectedExecutionHandler defaultHandler =

        new AbortPolicy();

/**

    * 核心工作者线程数

*/

private volatile int corePoolSize;

/**

    * 最大工作者线程数

*/

private volatile int maximumPoolSize;

/**

    * 任务队列,

    * 1)如果工作者线程允许过期,则使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 读取任务

    * 2)否则使用 workQueue.take() 读取任务

*/

  private final BlockingQueue<Runnable>workQueue;

/**

    * 添加工作者线程、关闭线程池、读取统计数据等操作中使用的互斥锁

*/

  private final ReentrantLock mainLock = new ReentrantLock();

/**

    * 线程池中的工作者线程集合,只有在持有 mainLock 锁时才能访问

*/

  private final HashSet<Worker> workers = new HashSet<>();

/**

    * 跟踪线程池同时存在的最大工作线程数

* Accessed only under mainLock.

*/

private int largestPoolSize;

/**

    * 往线程池提交一个 Runnable 任务,

    * 如果线程池已满或线程池关闭则,该任务会交给拒绝处理器处理。

*/

@Override

    public void execute(Runnable command) {

        if (command == null) {

            throw new NullPointerException();

        }

        // 读取控制变量

        int c = ctl.get();

        // 1)线程池工作线程数 < 核心线程数

        if (ThreadPoolExecutor.workerCountOf(c) < corePoolSize) {

            // 尝试创建一个新的工作者线程来处理这个任务

            if (addWorker(command, true)) {

                // 创建成功则直接返回

return;

            }

            // 创建失败,则重新读取控制变量

            c = ctl.get();

        }

/**

        * 大前提:

        * 1)外部触发了线程池停止

        * 2)工作者线程本身创建失败了

        * 3)当前工作者线程数 >= 核心工作者线程

*

        * 线程池处于 RUNNING 状态 && 尝试向任务队列中提交任务

*/

        if (ThreadPoolExecutor.isRunning(c) && workQueue.offer(command)) {

            final int recheck = ctl.get();

/**

            * 任务成功提交到任务队列

            * 1)如果线程池正在停止,则尝试帮助终止线程池,并将任务从工作队列中移除

            * 2)线程池处于 RUNNING 状态,但是没有可用的工作者线程了,则尝试添加一个新的工作者线程

*/

            if (!ThreadPoolExecutor.isRunning(recheck) && remove(command)) {

                // 执行拒绝处理器

                reject(command);

            } else if (ThreadPoolExecutor.workerCountOf(recheck) == 0) {

                // 尝试添加一个新的工作者线程

                addWorker(null, false);

            }

}

/**

        * 大前提:

        * 1)外部触发了线程池停止

        * 2)线程池处于 RUNNING 状态 && 当前工作者线程数 >= 核心工作者线程 && 任务往队列中提交失败了【如队列已满】

*

        * 尝试新增一个工作者来处理任务

*/

        else if (!addWorker(command, false)) {

/**

            * 最大可能性

            * 1)外部触发了线程池停止

            * 2)线程池处于 RUNNING 状态 && 工作者线程数>=  maximumPoolSize

            * 执行拒绝策略

*/

            reject(command);

        }

}

/**

    * 读取线程池的工作线程数

*/

    private static int workerCountOf(int c)  { return c & ThreadPoolExecutor.COUNT_MASK; }

/**

    * 尝试增加一个核心工作者线程来处理这个任务

    * 什么时候会新增失败?

    * 1)线程池状态在 STOP 及以上

    * 2)线程池处于 SHUTDOWN 状态并且提交的任务不为空

    * 3)线程池处于 SHUTDOWN,提交的任务为空,并且工作队列也为空

    * 4)core=true:工作者线程数 >= 核心线程数【corePoolSize】

    * 5)core=false: 工作者线程数 >= 最大线程数【maximumPoolSize】

    * 6)线程池本身原因工作者线程启动失败

*/

  private boolean addWorker(Runnable firstTask, boolean core) {

retry:

      for (int c = ctl.get(); ; ) {

/**

          * 外部系统通过 shutdown 或 shutdownNow 显式触发了线程池的关闭流程

          * 1)线程池状态在 STOP 及以上

            * 2)线程池处于 SHUTDOWN 状态并且提交的任务不为空

            * 3)线程池处于 SHUTDOWN,提交的任务为空,并且工作队列也为空

*/

        if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN)

              && (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.STOP)

|| firstTask != null

              || workQueue.isEmpty())) {

            // 不允许创建新的工作者线程

return false;

        }

        for (; ; ) {

/**

            * 1)工作者线程数已经 >= 核心线程数【任务队列未满时】

            * 2)工作者线程数已经 >= 最大线程数【任务队列已满时】

*/

            if (ThreadPoolExecutor.workerCountOf(c)

                  >= ((core ? corePoolSize : maximumPoolSize) & ThreadPoolExecutor.COUNT_MASK)) {

                    // 不允许创建新的工作者线程

return false;

            }

            // 尝试递增工作者线程数

            if (compareAndIncrementWorkerCount(c)) {

              // 工作者线程数递增成功,退出循环

break retry;

            }

            // 由于并发问题,其他线程优先递增了计数值,则重新读取计数值并重试

            c = ctl.get();

            // 线程池正在关闭,则重新进入循环后将直接退出

            if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN)) {

continue retry;

// else CAS failed due to workerCount change; retry inner loop

            }

}

}

      // 1)阶段一:工作者线程是否已经添加到 workers 集合中

boolean workerAdded = false;

      // 2)阶段二:工作者线程是否成功启动

boolean workerStarted = false;

Worker w = null;

      try {

        // 创建工作者线程

        w = new Worker(firstTask);

        // 读取线程对象

final Thread t = w.thread;

        if (t != null) {

final ReentrantLock mainLock = this.mainLock;

            // 占用锁

            mainLock.lock();

            try {

/**

* Recheck while holding lock. Back out on ThreadFactory failure or if shut down before lock acquired.

                * 读取控制变量再次进行校验

*/

              final int c = ctl.get();

/**

                * 1)线程池处于 RUNNING 状态

                * 2)线程池处于 SHUTDOWN 状态 && 提交任务为null

*/

              if (ThreadPoolExecutor.isRunning(c)||

                    ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP) && firstTask == null) {

                  // 检测工作者线程是否异常启动了

                  if (t.isAlive()) {

                    throw new IllegalThreadStateException();

                  }

                  // 将工作者线程添加到集合中

                  workers.add(w);

                  // 尝试记录最大并发工作者线程数

                  final int s = workers.size();

                  if (s > largestPoolSize) {

largestPoolSize = s;

                  }

                  // 工作者线程添加到 workers 成功

workerAdded = true;

              }

            } finally {

              mainLock.unlock();

            }

            // 如果添加成功,则启动工作者线程

            if (workerAdded) {

              t.start();

              // 工作者线程启动成功

workerStarted = true;

            }

}

      } finally {

        // 如果工作者线程启动失败,则进行回退和清理

        if (!workerStarted) {

            addWorkerFailed(w);

        }

}

return workerStarted;

  }

    // 运行状态 c 大于等于指定状态s

    private static boolean runStateAtLeast(int c, int s) {

return c >= s;

    }

/**

    * 尝试原子的将工作者线程数+1

*/

  private boolean compareAndIncrementWorkerCount(int expect) {

      return ctl.compareAndSet(expect, expect + 1);

  }

/**

    * 线程池是否在运行

*/

  private static boolean isRunning(int c) {

return c < ThreadPoolExecutor.SHUTDOWN;

  }

  // 运行状态 c 小于指定状态s

  private static boolean runStateLessThan(int c, int s) {

return c < s;

  }

  private void addWorkerFailed(Worker w) {

final ReentrantLock mainLock = this.mainLock;

      mainLock.lock();

      try {

        // 1)从 workers 集合中移除工作者w

        if (w != null) {

            workers.remove(w);

        }

        // 递减总工作者线程数

        decrementWorkerCount();

        // 尝试停止线程池

        tryTerminate();

      } finally {

        mainLock.unlock();

      }

}

/**

    * 将工作者线程总数递减1

*/

  private void decrementWorkerCount() {

      ctl.addAndGet(-1);

  }

/**

    * 将目标任务从队列中移除,并返回移除结果

*/

  public boolean remove(Runnable task) {

      final boolean removed = workQueue.remove(task);

      // 尝试终止线程池

      tryTerminate(); // In case SHUTDOWN and now empty

      // 返回移除结果

return removed;

  }

/**

    * 使用指定的拒绝执行处理器来处理该任务

*/

  final void reject(Runnable command) {

      handler.rejectedExecution(command, this);

  }

}

```

## 工作者线程是如何工作的呢?

```

1)如果初始化任务不为空,则先执行它

2)从任务队列中循环拉取任务

    1)允许当前工作者线程超时退出:则通过 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方式尝试在 keepAliveTime 纳秒内获取任务

    2)当前工作者线程数 <= corePoolSize 并且不允许超时退出:通过 workQueue.take() 阻塞读取任务

3)如果拉取到了任务,则执行它,并循环步骤2

4)任务拉取失败、阻塞时被中断、任务执行一次,则执行工作者退出流程。

```

```java

public class ThreadPoolExecutor extends AbstractExecutorService {

/**

    * 工作者线程的核心循环,重复的从任务队列中读取任务并执行。

*/

  final void runWorker(Worker w) {

      // 读取当前线程

      final Thread wt = Thread.currentThread();

      // 读取第一个任务

Runnable task = w.firstTask;

      // 清理

w.firstTask = null;

      w.unlock(); // 允许中断

/**

      * 是否异常退出

      * 1)前置钩子函数抛出异常

      * 2)任务执行时抛出异常

      * 3)后置钩子函数抛出异常

*/

boolean completedAbruptly = true;

      try {

        // 1)尝试从工作队列中读取任务

        while (task != null || (task = getTask()) != null) {

            w.lock();

/**

* If pool is stopping, ensure thread is interrupted;

* if not, ensure thread is not interrupted.

* This requires a recheck in second case to deal with shutdownNow race while clearing interrupt

*/

            if ((ThreadPoolExecutor.runStateAtLeast(ctl.get(), ThreadPoolExecutor.STOP)||

                  Thread.interrupted()&&

                        ThreadPoolExecutor.runStateAtLeast(ctl.get(), ThreadPoolExecutor.STOP))&&

                  !wt.isInterrupted()) {

              wt.interrupt();

            }

            try {

/**

                * 线程池钩子函数,在每个任务执行之前触发

*/

              beforeExecute(wt, task);

              try {

                  task.run();

/**

                  * 线程池钩子函数,在每个任务执行之后或执行异常时触发

*/

                  afterExecute(task, null);

              } catch (final Throwable ex) {

                  afterExecute(task, ex);

throw ex;

              }

            } finally {

              // 将当前任务置空

task = null;

              // 递增累积完成任务数,包括正常完成和异常完成

w.completedTasks++;

              w.unlock();

            }

}

        // 标记是正常完成任务

completedAbruptly = false;

      } finally {

/**

          * 1)completedAbruptly=false:工作线程拉取不到任务正常退出

          * 2)completedAbruptly=true:工作线程执行任务时异常退出,包括前置钩子、核心 run 方法、后置钩子

*/

        processWorkerExit(w, completedAbruptly);

      }

}

  private Runnable getTask() {

      // 上次拉取任务超时了吗?

boolean timedOut = false;

      for (; ; ) {

        // 读取控制变量

        final int c = ctl.get();

/**

          * 1)线程池正在停止,状态>= STOP

          * 2)线程池状态为 SHUTDOWN,并且任务队列为空

*/

        if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN)

              && (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.STOP) || workQueue.isEmpty())) {

            decrementWorkerCount();

return null;

        }

/**

          * 1)线程池处于 RUNNING 状态

          * 2)线程池处于 SHUTDOWN 状态,但是 workQueue 还未排空

*/

        // 计算当前工作者线程数

        final int wc = ThreadPoolExecutor.workerCountOf(c);

/**

          * 是否允许当前工作者线程退出

          * 1)allowCoreThreadTimeOut=true:允许核心工作者线程退出

          * 2)allowCoreThreadTimeOut=false:当前工作者线程数 > 核心工作者线程数

*/

final boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

/**

          * 1)外部系统通过 setMaximumPoolSize 调小了最大线程数 && 当前工作线程数溢出了

          * 2)允许当前线程过期 && 上次拉取未得到任务

*    &&

          * 1)工作者线程数> 1

          * 2)工作者线程数=1 && 任务队列为空

*

          * 什么情况下线程池的所有工作者线程都会退出?

          * 1)allowCoreThreadTimeOut=true && workQueue 为空

*/

        if ((wc > maximumPoolSize || timed && timedOut)

              && (wc > 1 || workQueue.isEmpty())) {

            // 拉取任务失败就直接递减工作者线程数了

            if (compareAndDecrementWorkerCount(c)) {

              // 返回 null 以终止该工作者线程

return null;

            }

            // 出现竞争,重新拉取任务

continue;

        }

        try {

/**

            * 1)允许当前工作者线程退出:则通过 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方式尝试在 keepAliveTime 纳秒内获取任务

            * 2)当前工作者线程数 <= corePoolSize:通过 workQueue.take() 阻塞读取任务

*/

final Runnable r = timed ?

                  workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):

                  workQueue.take();

            if (r != null) {

              // 成功获取到一个任务

return r;

            }

            // 拉取超时了

timedOut = true;

        } catch (final InterruptedException retry) {

            // 当前线程被中断,则继续循环拉取任务

timedOut = false;

        }

}

}

  protected void beforeExecute(Thread t, Runnable r) {

}

  protected void afterExecute(Runnable r, Throwable t) {

}

  private void processWorkerExit(Worker w, boolean completedAbruptly) {

      // 如果是异常退出,则递减工作者线程数

      if (completedAbruptly) {

        decrementWorkerCount();

      }

final ReentrantLock mainLock = this.mainLock;

      mainLock.lock();

      try {

        // 1)将当前工作者 w 完成的任务数累加到线程池已完成任务数中

completedTaskCount += w.completedTasks;

        // 2)从工作者集合中删除该工作者

        workers.remove(w);

      } finally {

        mainLock.unlock();

      }

      // 尝试终止线程池

      tryTerminate();

      final int c = ctl.get();

      // 线程池处于 RUNNING 或 SHUTDOWN 状态

      if (ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP)) {

        // 1)如果不是异常退出

        if (!completedAbruptly) {

/**

            * 计算需要保留的最小工作者线程数,

            * 1)如果允许核心工作者线程退出则为 0;

            * 2)否则为corePoolSize

*/

int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

            // 任务队列不为空,则至少保留一个工作者线程

            if (min == 0 && !workQueue.isEmpty()) {

min = 1;

            }

            // 已有工作者线程 > 期望工作者线程数,则直接返回

            if (ThreadPoolExecutor.workerCountOf(c) >= min) {

return; // replacement not needed

            }

}

        // 2)异常退出则尝试新增工作者线程

        addWorker(null, false);

      }

}

private final class Worker

extends AbstractQueuedSynchronizer

        implements Runnable {

private static final long serialVersionUID = 6138294804551838833L;

      /** Worker 驻留线程,创建失败时为null */

final Thread thread;

      /** 第一个运行的任务,可能为null */

Runnable firstTask;

      /** 每个驻留线程完成的任务数,在线程退出时会累加到线程池中*/

volatile long completedTasks;

/**

      * 基于指定的初始任务和线程工厂创建工作者线程

*/

      Worker(Runnable firstTask) {

        // 禁止中断,直到工作者线程运行为止

        setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

/**

          * Worker 本身实现了 Runnable 并且重写了 run 方法,

          * 基于 Worker 创建驻留线程,并启动运行。

*/

        thread = getThreadFactory().newThread(this);

      }

      /** 运行工作者线程*/

@Override

      public void run() {

        runWorker(this);

      }

}

}

```

## 如何停止线程池呢?

```

触发线程池关闭之后,提交到线程池的任务会被直接拒绝

1)通过 shutdown 停止线程池时,线程池的状态会递进到 SHUTDOWN,并且活跃工作者线程还能处理剩余任务。

2)通过 shutdownNow 停止线程池时,线程池的状态会递进到 STOP,并且活跃工作者线程不能处理剩余任务,拉取到的任务是 null。

```

```java

public class ThreadPoolExecutor extends AbstractExecutorService {

@Override

  public void shutdown() {

final ReentrantLock mainLock = this.mainLock;

      mainLock.lock();

      try {

        // 当前线程是否允许关闭线程池

        checkShutdownAccess();

        // 将线程池状态更新为SHUTDOWN

        advanceRunState(ThreadPoolExecutor.SHUTDOWN);

        // 中断所有空闲工作者,正在处理任务的工作者线程可以继续运行

        interruptIdleWorkers();

        // 执行钩子函数

        onShutdown(); // hook for ScheduledThreadPoolExecutor

      } finally {

        mainLock.unlock();

      }

      // 尝试终止线程池

      tryTerminate();

  }

  private void checkShutdownAccess() {

// assert mainLock.isHeldByCurrentThread();

      final SecurityManager security = System.getSecurityManager();

      if (security != null) {

        security.checkPermission(ThreadPoolExecutor.shutdownPerm);

        for (final Worker w : workers) {

            security.checkAccess(w.thread);

        }

}

}

/**

    * 将线程池状态设置为目标状态targetState

*/

  private void advanceRunState(int targetState) {

      for (; ; ) {

        final int c = ctl.get();

        if (ThreadPoolExecutor.runStateAtLeast(c, targetState)||

              // CAS 更新线程池状态

              ctl.compareAndSet(c, ThreadPoolExecutor.ctlOf(targetState, ThreadPoolExecutor.workerCountOf(c)))) {

break;

        }

}

}

/**

    * 中断所有阻塞拉取任务的空闲线程

*/

  private void interruptIdleWorkers() {

      interruptIdleWorkers(false);

  }

  private void interruptIdleWorkers(boolean atMostOne) {

final ReentrantLock mainLock = this.mainLock;

      mainLock.lock();

      try {

        // 遍历所有的工作者

        for (final Worker w : workers) {

            // 读取工作者驻留线程

final Thread t = w.thread;

/**

            * 当前线程还未被设置中断标志,则尝试锁定此 Worker【

            * 如果此 worker 已经获取到了任务正在执行,则锁已经被占用无法获取】

*/

            if (!t.isInterrupted() && w.tryLock()) {

              try {

                  // 中断阻塞等待任务的空闲线程

                  t.interrupt();

              } catch (final SecurityException ignore) {

              } finally {

                  w.unlock();

              }

}

            // 随机获取的第一个线程正在处理任务 && atMostOne=true,此时一个线程都不会被中断退出

            if (atMostOne) {

break;

            }

}

      } finally {

        mainLock.unlock();

      }

}

  void onShutdown() {

}

  final void tryTerminate() {

      for (; ; ) {

        final int c = ctl.get();

/**

          * 1)线程池处于 RUNNING 状态

          * 2)线程池处于 TIDYING、TERMINATED 状态

          * 3)线程池处于 SHUTDOWN 状态 && 任务队列不为空

          * 不允许终止

*/

        if (ThreadPoolExecutor.isRunning(c)||

              ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.TIDYING)||

              ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP) && !workQueue.isEmpty()) {

return;

        }

/**

          * 工作者线程数不为 0,尝试中断最多一个空闲工作者线程

*/

        if (ThreadPoolExecutor.workerCountOf(c) != 0) {// Eligible to terminate

            interruptIdleWorkers(ThreadPoolExecutor.ONLY_ONE);

return;

        }

        // 所有的工作者线程都已退出,或最后一个工作者线程正在退出

final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            // 递进状态为TIDYING

            if (ctl.compareAndSet(c, ThreadPoolExecutor.ctlOf(ThreadPoolExecutor.TIDYING, 0))) {

              try {

                  // 执行线程池的终止钩子函数

                  terminated();

              } finally {

                  // 递进状态为TERMINATED

                  ctl.set(ThreadPoolExecutor.ctlOf(ThreadPoolExecutor.TERMINATED, 0));

                  // 唤醒通过 awaitTermination 阻塞的所有线程

                  termination.signalAll();

              }

return;

            }

        } finally {

            mainLock.unlock();

        }

// else retry on failed CAS

      }

}

  protected void terminated() {

}

@Override

  public List<Runnable> shutdownNow() {

      List<Runnable>tasks;

final ReentrantLock mainLock = this.mainLock;

      mainLock.lock();

      try {

        // 当前线程是否允许关闭线程池

        checkShutdownAccess();

        // 将线程池状态更新为STOP

        advanceRunState(ThreadPoolExecutor.STOP);

        // 强制中断所有工作者线程,包括正在执行任务的线程

        interruptWorkers();

        // 读取所有未完成的任务

        tasks = drainQueue();

      } finally {

        mainLock.unlock();

      }

      // 尝试终止线程池

      tryTerminate();

      // 返回所有未完成的任务

return tasks;

  }

  private List<Runnable> drainQueue() {

      final BlockingQueue<Runnable>q = workQueue;

      final ArrayList<Runnable> taskList = new ArrayList<>();

      q.drainTo(taskList);

      if (!q.isEmpty()) {

        for (final Runnable r : q.toArray(new Runnable[0])) {

            if (q.remove(r)) {

              taskList.add(r);

            }

}

}

return taskList;

  }

}

```

## 等待线程池完全退出

```

目标线程会最多阻塞 unit.toNanos(timeout) 时间来等待线程池完全销毁。

```

```java

public class ThreadPoolExecutor extends AbstractExecutorService {

@Override

  public boolean awaitTermination(long timeout, TimeUnit unit)

        throws InterruptedException {

      // 计算超时时间

      long nanos = unit.toNanos(timeout);

final ReentrantLock mainLock = this.mainLock;

      mainLock.lock();

      try {

        // 如果线程池还未递进到 TERMINATED 状态【线程池还未退出】

        while (ThreadPoolExecutor.runStateLessThan(ctl.get(), ThreadPoolExecutor.TERMINATED)) {

            if (nanos <= 0L) {

return false;

            }

            // 阻塞等待指定的纳秒数

            nanos = termination.awaitNanos(nanos);

        }

return true;

      } finally {

        mainLock.unlock();

      }

}

}

```

## 线程池的逻辑最大线程数是多少呢?

```

线程池的线程数保存在 ctl 控制变量的低 29 位中,因此线程池的逻辑最大线程数为 2^29-1。

```

```java

public class ThreadPoolExecutor extends AbstractExecutorService {

/**

    * 控制变量低 29 位为线程池的工作线程数

    * 控制变量高 3 位为线程池的生命周期状态

*/

    private final AtomicInteger ctl = new AtomicInteger(ThreadPoolExecutor.ctlOf(ThreadPoolExecutor.RUNNING, 0));

/**

    * 工作线程数所占的位数,为 29 位,最大工作线程数为 2^29-1 个

*/

private static final int COUNT_BITS = Integer.SIZE - 3;

/**

    * 工作线程数掩码,低位 29 个1

*/

    private static final int COUNT_MASK = (1 << COUNT_BITS)- 1;

/**

    * 读取线程池的工作线程数

*/

    private static int workerCountOf(int c)  { return c & ThreadPoolExecutor.COUNT_MASK; }

}

```

## 线程池内置了哪些拒绝策略呢?

```

1)CallerRunsPolicy:线程池未关闭,则交给提交任务的线程自己执行

2)AbortPolicy:抛出 RejectedExecutionException 异常,默认拒绝策略。

3)DiscardPolicy:静默丢弃

4)DiscardOldestPolicy:静默丢弃最老的任务后,重新提交到线程池

```

```java

public class ThreadPoolExecutor extends AbstractExecutorService {

  public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }

@Override

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            if (!e.isShutdown()) {

                // 线程池未关闭,则交给提交任务的线程自己执行

                r.run();

            }

}

}

    public static class AbortPolicy implements RejectedExecutionHandler {

        public AbortPolicy() { }

@Override

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            // 抛出异常

            throw new RejectedExecutionException("Task " + r.toString()+

" rejected from " +

                    e.toString());

        }

}

    public static class DiscardPolicy implements RejectedExecutionHandler {

        public DiscardPolicy() { }

@Override

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            // 静默丢弃

        }

}

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        public DiscardOldestPolicy() { }

@Override

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            if (!e.isShutdown()) {

                // 丢弃最老的任务,再尝试提交到线程池中

                e.getQueue().poll();

                e.execute(r);

            }

}

}

}

```

## 内置线程池有哪些?

```

1)newFixedThreadPool:固定工作者线程池

2)newCachedThreadPool:一个任务一个工作者线程池,无法存储任务

3)newSingleThreadExecutor:单工作者线程池

```

```

public class Executors {

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue());

}

public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue());

}

public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue()));

}

}

```

## 线程池有哪几种运行状态呢?

```

线程池有 5 种运行状态,状态值是顺序递增的,分别为 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。

RUNNING: 接受新任务,同时处理工作队列中的任务

SHUTDOWN: 不接受新任务,但是能处理工作队列中的任务

STOP: 不接受新任务,不处理工作队列中的任务,并且强制中断正在运行的工作者线程

TIDYING: 所有的工作者线程都已经停止,将运行 terminated() 钩子函数

TERMINATED: terminated() 钩子函数运行完毕,线程池退出

RUNNING -> SHUTDOWN【调用 shutdown 时触发状态流转】

RUNNING -> STOP【调用 shutdownNow 时触发状态流转】

SHUTDOWN -> TIDYING【队列和线程池都为空时】

STOP -> TIDYING【队列和线程池都为空时】

TIDYING -> TERMINATED【terminated 方法完成执行时】

```

```java

public class ThreadPoolExecutor extends AbstractExecutorService {

/**

    * 线程池运行状态通过 int 高 3 位进行区分

* 11100000000000000000000000000000

*/

private static final int RUNNING    = -1 << COUNT_BITS;

// 00000000000000000000000000000000

private static final int SHUTDOWN  =  0 << COUNT_BITS;

// 00100000000000000000000000000000

private static final int STOP      =  1 << COUNT_BITS;

// 01000000000000000000000000000000

private static final int TIDYING    =  2 << COUNT_BITS;

// 01100000000000000000000000000000

private static final int TERMINATED =  3 << COUNT_BITS;

}

```

## 什么情况下线程池的所有工作者线程都会退出?

```

allowCoreThreadTimeOut=true && workQueue.isEmpty()

```

## 工作者线程拉取任务时被中断了会发生什么?

```

会继续拉取任务?

```

## 线程池停止过程中会中断空闲工作者线程,如何保证不会伤及运行中的工作者线程?

```

工作者线程本身继承了 AbstractQueuedSynchronizer,是一个互斥锁,当其获取到任务时会对自己进行锁定,

线程池中断空闲线程过程中由于无法获取锁,此工作者线程不会被中断。

```

## shutdown 和 shutdownNow 的区别是什么?

```

1)shutdown 之后:线程池的工作线程能完成正在处理的任务,也能拉取到存留的任务,任务队列中的任务会被执行完毕。

2)shutdownNow 之后:线程池的工作线程只能完成正在处理的任务,但是无法拉取到存留的任务,存留任务会通过方法返回。

```

## 使用线程池的最佳实践

```

1)自定义 ThreadFactory 并在其创建线程时提供一个有效的名称,用于后续的跟踪分析。

2)限定任务队列的长度,避免突发流量导致系统 OOM。

3)线程池创建完毕可以通过预先启动工作者线程来缩短响应时间

    prestartCoreThread:预先启动一个工作者线程

    prestartAllCoreThreads:预先启动所有的核心工作者线程

```

上一篇下一篇

猜你喜欢

热点阅读