程序员java进阶干货

ThreadPoolExecutor详解

2017-12-24  本文已影响79人  charming_coder

       ThreadPoolExecutor顾名思义,是一个线程池管理工具类,该类主要提供了任务管理,线程的调度和相关的hook方法来控制线程池的状态。

1.方法说明

任务管理主要方法如下:

public void execute(Runnable command);
public <T> Future<T> submit(Callable<T> task);
public <T> Future<T> submit(Runnable task, T result);
public Future<?> submit(Runnable task);
public void shutdown();
public List<Runnable> shutdownNow();

       上述方法中,execute()和submit()方法在有空闲线程存在的情况下会立即调用该线程执行任务,区别在于execute()方法是忽略任务执行结果的,而submit()方法则可以获取结果。除此之外,ThreadPoolExecutor还提供了shutdown()和shutdownNow()方法用于关闭线程池,区别在于shutdown()方法在调用之后会将任务队列中的任务都执行完毕之后再关闭线程池,而shutdownNow()方法则会直接关闭线程池,并且将任务队列中的任务导出到一个列表中返回。

       除上述用于执行任务的方法外,ThreadPoolExecutor还提供了如下几个hook(钩子)方法:

protected void beforeExecute(Thread t, Runnable r);
protected void afterExecute(Runnable r, Throwable t);
protected void terminated();

       在ThreadPoolExecutor中这几个方法默认都是空方法,beforeExecute()会在每次任务执行之前调用,afterExecute()会在每次任务结束之后调用,terminated()方法则会在线程池被终止时调用。使用这几个方法的方式就是声明一个子类继承ThreadPoolExecutor,并且在子类中重写需要定制的钩子方法,最后在创建线程池时使用该子类实例即可。

2.任务调度

a.相关参数

       对于ThreadPoolExecutor的实例化,其主要有如下几个重要的参数:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
                          TimeUnit unit, BlockingQueue<Runnable> workQueue, 
                          ThreadFactory threadFactory, RejectedExecutionHandler handler);
b.调度策略

       当初始化一个线程池之后,池中是没有任何用户执行任务的活跃线程的,当新的任务到来时,根据配置的参数其主要的执行任务如下:

c.调度策略注意点

3.源码讲解

a.主要属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 32
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 00011111 11111111 11111111 11111111

private static final int RUNNING    = -1 << COUNT_BITS; // 11100000 00000000 00000000 00000000
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 00000000 00000000 00000000 00000000
private static final int STOP       =  1 << COUNT_BITS; // 00100000 00000000 00000000 00000000
private static final int TIDYING    =  2 << COUNT_BITS; // 01000000 00000000 00000000 00000000
private static final int TERMINATED =  3 << COUNT_BITS; // 01100000 00000000 00000000 00000000

       由于ThreadPoolExecutor需要管理多种状态,并且还要记录当前执行任务的线程的数量,如果使用多个变量,并发更新时管理将会非常复杂,这里ThreadPoolExecutor则主要使用一个AtomicInteger类型的变量ctl存储所有主要的信息。ctl是一个32位的整形数字,初始值为0,其最高的三位用于存储当前线程池的状态信息,主要有RUNNING,SHUTDOWN,STOP,TIDING和TERMINATED,分别表示运行状态,关闭状态,终止状态,整理状态和结束状态。这几种状态对应的具体数值信息如上述代码所示,这里需要注意的一点是,在ThreadPoolExecutor中,这几种状态在数值上是从小到大依次增大的,并且状态流转也是依次往下的,这就为其判断状态信息提供了比较便利的方式,如当需要判断线程池状态是否处于SHUTDOWN状态时,只需要判断其代表状态位部分的值是否等于SHUTDOWN即可。在ctl中,除了最高三位用于表示状态外,其余位所代表的数值则指定了当前线程池中正在执行任务的线程数。如下是操作ctl属性的相关方法:

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean runStateLessThan(int c, int s) {
  return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
  return c >= s;
}

private static boolean isRunning(int c) {
  return c < SHUTDOWN;
}

private boolean compareAndIncrementWorkerCount(int expect) {
  return ctl.compareAndSet(expect, expect + 1);
}

private boolean compareAndDecrementWorkerCount(int expect) {
  return ctl.compareAndSet(expect, expect - 1);
}
b.主要方法

       对于线程池的execute()和submit()方法,其实在底层submit()方法会将传入的任务封装为一个FutureTask对象,由于FutureTask对象是实现了Runnable接口的,因而其也可以当做一个任务执行,这里就是将封装后的FutureTask对象传递给execute()方法执行的。我们这里则主要讲解execute()方法的实现方式,如下是execute()方法的代码:

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();

  int c = ctl.get();    // 获取当前线程池状态
  if (workerCountOf(c) < corePoolSize) {
    // 当工作线程数小于核心线程数时,则调用addWorker()方法创建线程并执行任务
    if (addWorker(command, true))
      return;
    c = ctl.get();  // 若添加失败,则更新当前线程池状态
  }
  
  // 执行到此处,则说明线程池中的工作线程要么大于等于核心线程数,要么当前线程池已经被命令关闭了(addWorker方法添加失败的原因),因而这里判断线程池是否为RUNNING状态,是则将任务添加到任务队列中
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    // 添加队列成功后双重验证,确保线程池处于正确状态
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);   // 若线程池中没有线程,则创建一个新线程执行添加的任务
  } else if (!addWorker(command, false))
    reject(command);    // 线程池至少处于SHUTDOWN状态,拒绝当前任务的执行
}

       在execute()方法中,其首先判断线程池工作线程数是否小于核心线程数,是则创建核心线程执行任务,添加失败或者工作线程数大于等于核心线程数时,则将任务添加到任务队列中,添加成功后会进行双重验证确保当前线程池处于正确的状态,并且确保当前有可用的线程执行新添加的任务。由此可见对于execute()方法的实现,其比较核心的方法是addWorker()方法,如下是addWorker()方法的实现方式:

private boolean addWorker(Runnable firstTask, boolean core) {
  retry:
  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c); // 获取当前运行状态

    // 判断当前线程池是否至少为SHUTDOWN状态,并且firstTask和任务队列中没有任务,是则直接返回
    if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
      return false;

    for (;;) {
      int wc = workerCountOf(c);
      // 判断是否工作线程数大于可记录的最大线程数,或者工作线程超过了指定的核心线程或者最大线程数
      if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
      // 走到这一步说明当前线程池处于RUNNING状态,或者任务队列存在任务,并且工作线程数不超过
      // 指定的线程数量,那么就增加工作线程数量,成功则继续往下执行,失败则重复上述添加步骤
      if (compareAndIncrementWorkerCount(c))
        break retry;
      c = ctl.get();
      if (runStateOf(c) != rs)
        continue retry;
    }
  }

  // 记录工作线程数的变量已经更新,接下来创建线程执行任务
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    w = new Worker(firstTask);  // 创建一个工作者对象
    final Thread t = w.thread;
    if (t != null) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        int rs = runStateOf(ctl.get());

        // 重新检查线程池状态,或者是判断当前是SHUTDOWN状态,而firstTask为空,这说明任务队列此时不为空
        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
          if (t.isAlive())
            throw new IllegalThreadStateException();
          workers.add(w);   // 将创建的工作者添加到工作者集合中
          int s = workers.size();
          if (s > largestPoolSize)
            largestPoolSize = s;    // 更新已使用的最大线程数
          workerAdded = true;
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        t.start();  // 工作者对象成功创建之后,调用该工作者执行任务
        workerStarted = true;
      }
    }
  } finally {
    if (!workerStarted)
      addWorkerFailed(w);
  }
  return workerStarted;
}

       在addWorker()方法中,其首先检查当前线程池是否处于RUNNING状态,或者处于SHUTDOWN状态,但是任务队列中还存在有任务,那么其就会创建一个新的Worker对象,并且将其添加到工作者对象集合中,然后调用工作者对象所维护的线程执行任务,如下是工作者对象的实现代码:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  private static final long serialVersionUID = 6138294804551838833L;
  final Thread thread;  // 当前工作者中执行任务的线程
  Runnable firstTask;   // 第一个需要执行的任务
  volatile long completedTasks; // 当前工作者完成的任务数

  Worker(Runnable firstTask) {
    // 默认设置为-1,那么如果不调用当前工作者的run()方法,那么其状态是不会改变的,
    // 其他的线程也无法使用当前工作者执行任务,在run()方法调用的runWorker()方法中会
    // 调用unlock()方法使当前工作者处于正常状态
    setState(-1);
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);   // 使用线程工厂创建线程
  }

  public void run() {
    runWorker(this);    // 使用当前工作者执行任务
  }

  protected boolean isHeldExclusively() {
    return getState() != 0;
  }

  protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
      setExclusiveOwnerThread(Thread.currentThread());
      return true;
    }
    return false;
  }

  protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
  }

  public void lock()        { acquire(1); }
  public boolean tryLock()  { return tryAcquire(1); }
  public void unlock()      { release(1); }
  public boolean isLocked() { return isHeldExclusively(); }

  // 如果当前线程已经在执行任务,那么将其标记为打断状态,待其任务执行完毕则终止任务的执行
  void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
        t.interrupt();
      } catch (SecurityException ignore) {
      }
    }
  }
}

       在工作者对象中,其主要维护了一个工作者线程,用于执行任务。该工作者对象继承了AbstractQueuedSynchronizer,用于控制当前工作者工作状态的获取,并且其也实现了Runnable接口,将主要任务的执行封装到run()方法中。如下是runWorker()方法的具体实现:

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock();   // 重置Worker对象的状态
  boolean completedAbruptly = true;
  try {
    // 首先执行工作者线程中的任务,然后循环从任务队列中获取任务执行
    while (task != null || (task = getTask()) != null) {
      w.lock();
      // 检查当前线程池的状态,如果线程池被终止或者线程池终止并且当前线程已被打断
      if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
        wt.interrupt();
      try {
        beforeExecute(wt, task);    // 调用钩子方法进行预处理
        Throwable thrown = null;
        try {
          task.run();   // 执行任务
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          afterExecute(task, thrown);   // 调用钩子方法进行任务完成后的处理工作
        }
      } finally {
        task = null;    // 重置工作者的初始任务
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    processWorkerExit(w, completedAbruptly);
  }
}

       可以看到,在runWorker()方法中,其首先会执行工作者对象的初始化任务,当执行完毕后会通过一个无限循环不断在任务队列中获取任务执行。如下是getTask()方法的源码:

private Runnable getTask() {
  boolean timedOut = false;

  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // 判断当前线程是否处于STOP状态,或者处于SHUTDOWN状态,并且工作队列是空的,是则不返回任务
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      decrementWorkerCount();
      return null;
    }

    int wc = workerCountOf(c);
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;    // 是否允许空闲线程过期

    // 工作线程数大于最大允许线程数,或者线程在指定时间内无法从工作队列中获取到新任务,则销毁当前线程
    if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
      if (compareAndDecrementWorkerCount(c))
        return null;
      continue;
    }

    try {
      // 允许核心线程过期或者工作线程数大于corePoolSize时,从任务队列获取任务时会指定等待时间,
      // 否则会一直等待任务队列中新的任务
      Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
      if (r != null)
        return r;
      timedOut = true;
    } catch (InterruptedException retry) {
      timedOut = false;
    }
  }
}

       可以看到,getTask方法首先会判断当前线程池状态是否为STOP状态,或者是SHUTDOWN状态,并且任务队列是空的,是则不返回任务,否则会根据相关参数从任务队列中获取任务执行。

       以上execute()方法的主要实现步骤,在ThreadPoolExecutor中另一个至关重要的方法则是shutdown()方法,以下是shutdown()方法的主要代码:

public void shutdown() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();  // 检查对线程状态的控制权限
    advanceRunState(SHUTDOWN);  // 更新当前线程池状态为SHUTDOWN
    interruptIdleWorkers(); // 打断空闲的工作者
    onShutdown();   // 钩子方法,但是没有对外公开,因为该方法只有包访问权限
  } finally {
    mainLock.unlock();
  }
  tryTerminate();   
}

       在shutdown()方法中,其首先检查当前线程是否有修改线程状态的权限,然后将当前线程池的状态修改为SHUTDOWN,接着调用interruptIdleWorkers()方法中断所有处于空闲状态的线程,最后则是调用tryTerminate()方法尝试将当前线程池的状态由SHUTDOWN修改为TERMINATED,这里interruptIdleWorkers()方法最终会调用其重载方法interruptIdleWorkers(boolean)方法,该方法代码如下:

private void interruptIdleWorkers(boolean onlyOne) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    for (Worker w : workers) {
      Thread t = w.thread;
      if (!t.isInterrupted() && w.tryLock()) {
        try {
          t.interrupt();
        } catch (SecurityException ignore) {
        } finally {
          w.unlock();
        }
      }
      if (onlyOne)
        break;
    }
  } finally {
    mainLock.unlock();
  }
}

       可以看到,该方法会遍历所有的工作者对象,如果其处于空闲状态,则将其终止。对于处于工作状态的线程,由于在shutdown()方法中已经将当前线程池的状态设置为SHUTDOWN,那么工作状态的线程会将任务队列中的任务都执行完毕之后自动销毁。

       本文主要讲解了ThreadPoolExecutor的主要方法,线程池的调度方式,以及其核心功能的实现原理,如本文有任何不当之处,敬请指正,谢谢!

上一篇 下一篇

猜你喜欢

热点阅读