Android开发探索工作相关

Java多线程(三)线程池

2018-04-01  本文已影响512人  闽越布衣

为什么使用线程池

    当我们在使用线程时,如果每次需要一个线程时都去创建一个线程,这样实现起来很简单,但是会有一个问题:当并发线程数过多时,并且每个线程都是执行一个时间很短的任务就结束时,这样创建和销毁线程的时间要比花在实际处理任务的时间要长的多,在一个JVM里创建太多的线程可能会导致由于系统过度消耗内存或切换过度导致系统资源不足而导致OOM问题;
    线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。

使用线程池的优势与存在的风险

优势

风险

死锁

    线程池引入了一种死锁可能,当所有池线程都在等待已阻塞的等待队列中另一任务的执行结果,但这一任务却因为没有未被占用的线程而不能运行,这种情况就导致了死锁。如:当线程池被用来实现涉及许多交互对象的模拟,被模拟的对象可以相互发送查询,这些查询接下来作为排队的任务执行,查询对象又同步等待着响应时,会发生这种情况。

资源不足

    如果线程池太大,那么被这些线程消耗的资源可能严重地影响系统性能。在线程之间进行切换将会浪费时间,而且使用超出比实际需要的线程可能会引起资源匮乏问题,因为池线程正在消耗一些资源,而这些资源可能会被其它任务更有效地利用。
    线程池的大小需要根据系统运行的软硬件环境以及应用本身的特点来决定。 一般来说, 如果代码结构合理的话, 线程数目与 CPU数量相适合即可。 如果线程运行时可能出现阻塞现象, 可相应增加池的大小; 如有必要可采用自适应算法来动态调整线程池的大小, 以提高 CPU 的有效利用率和系统的整体性能。

并发错误

    线程池和其它排队机制依靠使用 wait() 和 notify() 方法,要特别注意并发错误, 要从逻辑上保证程序的正确性, 注意避免死锁现象的发生。

线程泄漏

    线程池中一个严重的风险是线程泄漏,当从池中除去一个线程以执行一项任务,而在任务完成后该线程却没有返回池时,会发生这种情况。发生线程泄漏的一种情形出现在任务抛出一个 RuntimeException 或一个 Error 时。如果线程池类没有捕捉到它们,那么线程只会退出而线程池的大小将会永久减少一个。当这种情况发生的次数足够多时,线程池最终为空,系统将停止,因为没有可用的线程来处理任务。
    有些任务可能会永远等待某些资源或来自用户的输入,而这些资源又不能保证一定可用或着用户一定有输入,诸如此类的任务会永久停止,而这些停止的任务也会引起和线程泄漏同样的问题。如果某个线程被这样一个任务永久地消耗着,那么它实际上就被从池除去了。对于这样的任务,要么只给予它们自己的线程,要么让它们等待有限的时间。

请求过载

    请求过多压垮了服务器,这种情况是可能的。在这种情形下,我们可能不想将每个到来的请求都排队到我们的工作队列,因为排在队列中等待执行的任务可能会消耗太多的系统资源并引起资源缺乏。在这种情形下决定如何做取决于你;在某些情况下,可以简单地抛弃请求,依靠更高级别的协议稍后重试请求,也可以用一个指出服务器暂时很忙的响应来拒绝请求。

如何正确的使用线程池

线程池大小的分配

    调整线程池的大小基本上就是避免两类错误:线程太少或线程太多。幸运的是,对于大多数应用程序来说,太多和太少之间的余地相当宽。
    线程池的最佳大小取决于可用处理器的数目以及工作队列中的任务的性质。若在一个具有 N 个处理器的系统上只有一个工作队列,其中全部是计算性质的任务,在线程池具有 N 或 N+1 个线程时一般会获得最大的 CPU 利用率。
    对于那些可能需要等待 I/O 完成的任务,需要让池的大小超过可用处理器的数目,因为并不是所有线程都一直在工作。通过使用概要分析,可以估计某个典型请求的等待时间(WT)与服务时间(ST)之间的比例。如果我们将这一比例称之为 WT/ST,那么对于一个具有 N 个处理器的系统,需要设置大约 N*(1+WT/ST) 个线程来保持处理器得到充分利用。
    处理器利用率不是调整线程池大小过程中的唯一考虑事项。随着线程池的增长,你可能会碰到调度程序、可用内存方面的限制,或者其它系统资源方面的限制,例如套接字、打开的文件句柄或数据库连接等的数目。

ThreadPoolExecutor详解

几个重要字段和方法

(这块内容引用了ideabuffer
博主的https://www.jianshu.com/p/d2729853c4da文章,并做了稍微修改,有兴趣的同学可以查看这篇文章,写的很详细)

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
 public void execute(Runnable command) {
      if (command == null)
          throw new NullPointerException();
      /*
       * Proceed in 3 steps:
       *
       * 1. If fewer than corePoolSize threads are running, try to
       * start a new thread with the given command as its first
       * task.  The call to addWorker atomically checks runState and
       * workerCount, and so prevents false alarms that would add
       * threads when it shouldn't, by returning false.
       *
       * 2. If a task can be successfully queued, then we still need
       * to double-check whether we should have added a thread
       * (because existing ones died since last checking) or that
       * the pool shut down since entry into this method. So we
       * recheck state and if necessary roll back the enqueuing if
       * stopped, or start a new thread if there are none.
       *
       * 3. If we cannot queue task, then we try to add a new
       * thread.  If it fails, we know we are shut down or saturated
       * and so reject the task.
       */
      /*
       * 获取线程池状态以及有效线程数即runState以及workerCount
       */
      int c = ctl.get();
      /*
       * 如果当前线程活动数小于corePoolSize,则新建一个线程放入线程池中,并作为线程池的第一个任务
       */
      if (workerCountOf(c) < corePoolSize) {
          /*
           * 第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
           * 如果为true,根据corePoolSize来判断;
           * 如果为false,则根据maximumPoolSize来判断
           */
          if (addWorker(command, true))
              return;
          /*
           * 如果添加失败,则重新获取ctl值
           */
          c = ctl.get();
      }
      /*
       * 如果当前线程是运行状态,并且添加任务到队列中成功
       */
      if (isRunning(c) && workQueue.offer(command)) {
          /*
           * 重新获取ctl值
           */
          int recheck = ctl.get();
          /*
           * 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,这时需要移除该command,执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
           */
          if (! isRunning(recheck) && remove(command))
              reject(command);
          /*
           * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
           * 这里传入的参数表示:第一个参数为null,表示在线程池中创建一个线程,但不去启动;第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
           * 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
           */
          else if (workerCountOf(recheck) == 0)
              addWorker(null, false);
      }
      /*
       * 如果执行到这里,有两种情况:
       * 1. 线程池已经不是RUNNING状态;
       * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
       * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
       * 如果失败则拒绝该任务
       */
      else if (!addWorker(command, false))
          reject(command);
}
/**
 * Checks if a new worker can be added with respect to current
 * pool state and the given bound (either core or maximum). If so,
 * the worker count is adjusted accordingly, and, if possible, a
 * new worker is created and started, running firstTask as its
 * first task. This method returns false if the pool is stopped or
 * eligible to shut down. It also returns false if the thread
 * factory fails to create a thread when asked.  If the thread
 * creation fails, either due to the thread factory returning
 * null, or due to an exception (typically OutOfMemoryError in
 * Thread.start()), we roll back cleanly.
 *
 * @param firstTask the task the new thread should run first (or
 * null if none). Workers are created with an initial first task
 * (in method execute()) to bypass queuing when there are fewer
 * than corePoolSize threads (in which case we always start one),
 * or when the queue is full (in which case we must bypass queue).
 * Initially idle threads are usually created via
 * prestartCoreThread or to replace other dying workers.
 *
 * @param core if true use corePoolSize as bound, else
 * maximumPoolSize. (A boolean indicator is used here rather than a
 * value to ensure reads of fresh values after checking other pool
 * state).
 * @return true if successful
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        /*
         * 获取ctl值
         */
        int c = ctl.get();
        /*
         * 获取线程池运行状态
         */
        int rs = runStateOf(c);
        /*
         * 如果rs >= SHUTDOWN,即线程池处于SHUTDOWN,STOP,TIDYING,TERMINATED中的一个状态,此时线程池不再接收新任务;
         * 接着判断以下3个条件,只要有1个不满足,则返回false:
         * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
         * 2. firsTask为空
         * 3. 阻塞队列不为空
         * 
         * 首先考虑rs == SHUTDOWN的情况
         * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
         * 然后,如果firstTask为空,并且workQueue也为空,则返回false,
         * 因为队列中已经没有任务了,不需要再添加线程了
         */
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
               return false;

        for (;;) {
            /*
             * 获取线程数
             */
            int wc = workerCountOf(c);
            /*
             * 如果wc超过或等于CAPACITY,或者wc超过或等于corePoolSize当core为true时或wc超过或等于maximumPoolSize当core为false时,返回false
             */
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            /*
             * 尝试增加workerCount,如果成功,则跳出第一个for循环
             */
            if (compareAndIncrementWorkerCount(c))
                    break retry;
            /*
             * 如果增加workerCount失败,则重新获取ctl的值
             */
            c = ctl.get();  // Re-read ctl
            /*
             * 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
             */
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        /*
         * 根据firstTask来创建Worker对象
         */
        w = new Worker(firstTask);
        /*
         * 每一个Worker对象都会创建一个线程
         */
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                /*
                 * 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
                 */
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    /*
                     * 更新线程池中出现的最大线程数largestPoolSize
                     */
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
/**
 * Class Worker mainly maintains interrupt control state for
 * threads running tasks, along with other minor bookkeeping.
 * This class opportunistically extends AbstractQueuedSynchronizer
 * to simplify acquiring and releasing a lock surrounding each
 * task execution.  This protects against interrupts that are
 * intended to wake up a worker thread waiting for a task from
 * instead interrupting a task being run.  We implement a simple
 * non-reentrant mutual exclusion lock rather than use
 * ReentrantLock because we do not want worker tasks to be able to
 * reacquire the lock when they invoke pool control methods like
 * setCorePoolSize.  Additionally, to suppress interrupts until
 * the thread actually starts running tasks, we initialize lock
 * state to a negative value, and clear it upon start (in
 * runWorker).
 */
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

    Worker类继承了AbstractQueuedSynchronizer,并实现了Runnable接口,其中firstTask用来保存传入的任务;thread是用来处理任务的线程,是通过ThreadFactory来创建的。newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
    Worker继承了AbstractQueuedSynchronizer,使用AbstractQueuedSynchronizer来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
  2. 如果正在执行任务,则不应该中断线程;
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
  5. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。

    所以,Worker继承自AbstractQueuedSynchronizer,用于判断线程是否空闲以及是否可以被中断。
    此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?是因为AbstractQueuedSynchronizer中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:

protected boolean tryAcquire(int unused) {
       if (compareAndSetState(0, 1)) {
           setExclusiveOwnerThread(Thread.currentThread());
           return true;
       }
       return false;
}

    tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。
    正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0.

/**
 * Main worker run loop.  Repeatedly gets tasks from queue and
 * executes them, while coping with a number of issues:
 *
 * 1. We may start out with an initial task, in which case we
 * don't need to get the first one. Otherwise, as long as pool is
 * running, we get tasks from getTask. If it returns null then the
 * worker exits due to changed pool state or configuration
 * parameters.  Other exits result from exception throws in
 * external code, in which case completedAbruptly holds, which
 * usually leads processWorkerExit to replace this thread.
 *
 * 2. Before running any task, the lock is acquired to prevent
 * other pool interrupts while the task is executing, and then we
 * ensure that unless pool is stopping, this thread does not have
 * its interrupt set.
 *
 * 3. Each task run is preceded by a call to beforeExecute, which
 * might throw an exception, in which case we cause thread to die
 * (breaking loop with completedAbruptly true) without processing
 * the task.
 *
 * 4. Assuming beforeExecute completes normally, we run the task,
 * gathering any of its thrown exceptions to send to afterExecute.
 * We separately handle RuntimeException, Error (both of which the
 * specs guarantee that we trap) and arbitrary Throwables.
 * Because we cannot rethrow Throwables within Runnable.run, we
 * wrap them within Errors on the way out (to the thread's
 * UncaughtExceptionHandler).  Any thrown exception also
 * conservatively causes thread to die.
 *
 * 5. After task.run completes, we call afterExecute, which may
 * also throw an exception, which will also cause thread to
 * die. According to JLS Sec 14.20, this exception is the one that
 * will be in effect even if task.run throws.
 *
 * The net effect of the exception mechanics is that afterExecute
 * and the thread's UncaughtExceptionHandler have as accurate
 * information as we can provide about any problems encountered by
 * user code.
 *
 * @param w the worker
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            /*
             * 如果线程池正在停止,那么要保证当前线程是中断状态;
             * 如果不是的话,则要保证当前线程不是中断状态;
             * 这里要考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

    STOP状态要中断线程池中的所有线程,而这里使用Thread.interrupted()来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,因为Thread.interrupted()方法会复位中断的状态。

runWorker的执行过程:

  1. while循环不断地通过getTask()方法获取任务;
  2. getTask()方法从阻塞队列中取任务;
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
  4. 调用task.run()执行任务;
  5. 如果task为null则跳出循环,执行processWorkerExit()方法;
  6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
/**
 * Performs blocking or timed wait for a task, depending on
 * current configuration settings, or returns null if this worker
 * must exit because of any of:
 * 1. There are more than maximumPoolSize workers (due to
 *    a call to setMaximumPoolSize).
 * 2. The pool is stopped.
 * 3. The pool is shutdown and the queue is empty.
 * 4. This worker timed out waiting for a task, and timed-out
 *    workers are subject to termination (that is,
 *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
 *    both before and after the timed wait, and if the queue is
 *    non-empty, this worker is not the last thread in the pool.
 *
 * @return task, or null if the worker must exit, in which case
 *         workerCount is decremented
 */
private Runnable getTask() {
    /*
     * 表示上次从阻塞队列中取任务时是否超时
     */
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
       /*
        * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
       * 1. rs >= STOP,线程池是否正在stop;
       * 2. 阻塞队列是否为空。
       * 如果以上条件满足,则将workerCount减1并返回null。
       * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
       */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        /*
         * timed变量用于判断是否需要进行超时控制。
         * allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
         * wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
         * 对于超过核心线程数量的这些线程,需要进行超时控制
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        /*
         * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
         * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
         * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
         * 如果减1失败,则返回重试。
         * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            /*
             * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
             * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
             */
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            /*
             * 如果 r == null,说明已经超时,timedOut设置为true
             */
            timedOut = true;
        } catch (InterruptedException retry) {
            /*
             * 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
             */
            timedOut = false;
        }
    }
}

    这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。
    什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。
    getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法。

/**
 * Performs cleanup and bookkeeping for a dying worker. Called
 * only from worker threads. Unless completedAbruptly is set,
 * assumes that workerCount has already been adjusted to account
 * for exit.  This method removes thread from worker set, and
 * possibly terminates the pool or replaces the worker if either
 * it exited due to user task exception or if fewer than
 * corePoolSize workers are running or queue is non-empty but
 * there are no workers.
 *
 * @param w the worker
 * @param completedAbruptly if the worker died due to user exception
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /*
     * 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
     * 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。
     */
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        /*
         * 统计完成的任务数
         */
        completedTaskCount += w.completedTasks;
        /*
         * 从workers中移除,也就表示着从线程池中移除了一个工作线程
         */
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    /*
     * 根据线程池状态进行判断是否结束线程池
     */
    tryTerminate();

    int c = ctl.get();
    /*
     * 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
     * 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
     * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
     */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

    processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:


image.png
/**
 * Transitions to TERMINATED state if either (SHUTDOWN and pool
 * and queue empty) or (STOP and pool empty).  If otherwise
 * eligible to terminate but workerCount is nonzero, interrupts an
 * idle worker to ensure that shutdown signals propagate. This
 * method must be called following any action that might make
 * termination possible -- reducing worker count or removing tasks
 * from the queue during shutdown. The method is non-private to
 * allow access from ScheduledThreadPoolExecutor.
 */
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        /*
         * 当前线程池的状态为以下几种情况时,直接返回:
         * 1. RUNNING,因为还在运行中,不能停止;
         * 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了;
         * 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task;
         */
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        /*
         * 如果线程数量不为0,则中断一个空闲的工作线程,并返回
         */
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            /*
             * 这里尝试设置状态为TIDYING,如果设置成功,则调用terminated方法
             */
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    /*
                     * terminated方法默认什么都不做,留给子类实现
                     */
                    terminated();
                } finally {
                    /*
                     * 设置状态为TERMINATED
                     */
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
/**
 * Initiates an orderly shutdown in which previously submitted
 * tasks are executed, but no new tasks will be accepted.
 * Invocation has no additional effect if already shut down.
 *
 * <p>This method does not wait for previously submitted tasks to
 * complete execution.  Use {@link #awaitTermination awaitTermination}
 * to do that.
 *
 * @throws SecurityException {@inheritDoc}
 */
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        /*
         * 安全策略判断
         */
        checkShutdownAccess();
        /*
         * 切换状态为SHUTDOWN
         */
        advanceRunState(SHUTDOWN);
        /* 
         * 中断空闲线程
         */
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    /*
     * 尝试结束线程池
     */
    tryTerminate();
}

    这里思考一个问题:在runWorker方法中,执行任务时对Worker对象w进行了lock操作,为什么要在执行任务的时候对每个工作线程都加锁呢?

/**
 * Interrupts threads that might be waiting for tasks (as
 * indicated by not being locked) so they can check for
 * termination or configuration changes. Ignores
 * SecurityExceptions (in which case some threads may remain
 * uninterrupted).
 *
 * @param onlyOne If true, interrupt at most one worker. This is
 * called only from tryTerminate when termination is otherwise
 * enabled but there are still other workers.  In this case, at
 * most one waiting worker is interrupted to propagate shutdown
 * signals in case all threads are currently waiting.
 * Interrupting any arbitrary thread ensures that newly arriving
 * workers since shutdown began will also eventually exit.
 * To guarantee eventual termination, it suffices to always
 * interrupt only one idle worker, but shutdown() interrupts all
 * idle workers so that redundant workers exit promptly, not
 * waiting for a straggler task to finish.
 */
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

    为什么需要持有mainLock?因为workers是HashSet类型的,不能保证线程安全。

/**
 * Attempts to stop all actively executing tasks, halts the
 * processing of waiting tasks, and returns a list of the tasks
 * that were awaiting execution. These tasks are drained (removed)
 * from the task queue upon return from this method.
 *
 * <p>This method does not wait for actively executing tasks to
 * terminate.  Use {@link #awaitTermination awaitTermination} to
 * do that.
 *
 * <p>There are no guarantees beyond best-effort attempts to stop
 * processing actively executing tasks.  This implementation
 * cancels tasks via {@link Thread#interrupt}, so any task that
 * fails to respond to interrupts may never terminate.
 *
 * @throws SecurityException {@inheritDoc}
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        /*
         * 中断所有工作线程,无论是否空闲
         */
        interruptWorkers();
        /*
         * 取出队列中没有被执行的任务
         */
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

shutdownNow方法与shutdown方法类似,不同的地方在于:

  1. 设置状态为STOP;
  2. 中断所有工作线程,无论是否是空闲的;
  3. 取出阻塞队列中没有被执行的任务并返回。

线程池的监控

ThreadPoolExecutor的构造函数

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
}

参数解释

    从ThreadPoolExecutor的源码我们可以看到,ThreadPoolExecutor类继承了抽象类AbstractExecutorService,而抽象类AbstractExecutorService实现了ExecutorService接口,ExecutorService接口又继承了Executor接口。
    Executor是最顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,用来执行传进去的任务的;

线程池的五种状态

线程池执行流程

线程池执行流程图.png

各种任务队列(BlockingQueue)的区别

RejectedExecutionHandler饱和策略

提交任务

线程池关闭

    shutdown()和shutdownNow()是用来关闭线程池的,都是调用了interruptIdleWorkers()方法去遍历线程池中的工作线程,然后去打断它们。
    shutdown原理:将线程池状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
    shutdownNow原理:将线程池的状态设置成STOP状态,然后中断所有任务(包括正在执行的)的线程,并返回等待执行任务的列表。
    中断采用interrupt方法,所以无法响应中断的任务可能永远无法终止。但调用上述的两个关闭之一,isShutdown()方法返回值为true,当所有任务都已关闭,表示线程池关闭完成,则isTerminated()方法返回值为true。当需要立刻中断所有的线程,不一定需要执行完任务,可直接调用shutdownNow()方法。

几种常用的线程池

CachedThreadPool

    创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

/**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available.  These pools will typically improve the performance
 * of programs that execute many short-lived asynchronous tasks.
 * Calls to {@code execute} will reuse previously constructed
 * threads if available. If no existing thread is available, a new
 * thread will be created and added to the pool. Threads that have
 * not been used for sixty seconds are terminated and removed from
 * the cache. Thus, a pool that remains idle for long enough will
 * not consume any resources. Note that pools with similar
 * properties but different details (for example, timeout parameters)
 * may be created using {@link ThreadPoolExecutor} constructors.
 *
 * @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

    从java.util.concurrent.Executors的源码中我们可以发现,CachedThreadPool使用了SynchronousQueue这个没有容量的阻塞队列,线程池的coreSize为0,maxSize为无限大。

特点:

使用示例:

public class CachedThreadPool {
    public static void main(String[] args) {
        ExecutorService cacheThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            cacheThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(index);
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

    程序一次性输出所有的数字,因为可以创建多个线程同时执行任务

FixedThreadPool

    创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到阻塞队列中。

/**
 * Creates a thread pool that reuses a fixed number of threads
 * operating off a shared unbounded queue.  At any point, at most
 * {@code nThreads} threads will be active processing tasks.
 * If additional tasks are submitted when all threads are active,
 * they will wait in the queue until a thread is available.
 * If any thread terminates due to a failure during execution
 * prior to shutdown, a new one will take its place if needed to
 * execute subsequent tasks.  The threads in the pool will exist
 * until it is explicitly {@link ExecutorService#shutdown shutdown}.
 *
 * @param nThreads the number of threads in the pool
 * @return the newly created thread pool
 * @throws IllegalArgumentException if {@code nThreads <= 0}
 */
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

    从java.util.concurrent.Executors的源码中我们可以发现,FixedThreadPool线程池的corePoolSize和maximumPoolSize数量是一样的,且使用的LinkedBlockingQueue是无界阻塞队列,因此达到corePoolSize后不会继续创建线程而是阻塞在队列那了。

特点:

使用示例:

public class FixedThreadPool {
    public static void main(String[] args) {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            newFixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(index);
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

    因为线程池的大小为5,每个任务输出index后sleep 2秒,所以每2秒打印5个数字。

SingleThreadExecutor

    创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。

/**
 * Creates an Executor that uses a single worker thread operating
 * off an unbounded queue. (Note however that if this single
 * thread terminates due to a failure during execution prior to
 * shutdown, a new one will take its place if needed to execute
 * subsequent tasks.)  Tasks are guaranteed to execute
 * sequentially, and no more than one task will be active at any
 * given time. Unlike the otherwise equivalent
 * {@code newFixedThreadPool(1)} the returned executor is
 * guaranteed not to be reconfigurable to use additional threads.
 *
 * @return the newly created single-threaded Executor
 */
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

    从java.util.concurrent.Executors的源码中我们可以发现,SingleThreadExecutor线程池的corePoolSize和maximumPoolSize数量都是1,即线程池只创建一个线程,且使用的LinkedBlockingQueue是无界阻塞队列,因此如果线程池中的线程处于活动的,则后面的任务只能到阻塞队列中。
特点:

使用示例:

public class SingleThreadExecutor {
    public static void main(String[] args) {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            cacheThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(index);
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

    程序每隔两秒依次输出一个数字。

ScheduleThreadPool

    创建一个定长的线程池,而且支持定时的以及周期性的任务执行,支持定时及周期性任务执行。

/**
 * Creates a thread pool that can schedule commands to run after a
 * given delay, or to execute periodically.
 * @param corePoolSize the number of threads to keep in the pool,
 * even if they are idle
 * @return a newly created scheduled thread pool
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 */
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
 * Creates a new {@code ScheduledThreadPoolExecutor} with the
 * given core pool size.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 */
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

    从java.util.concurrent.Executors的源码即ScheduledThreadPoolExecutor的构造函数我们可以发现,ScheduleThreadPool的核心线程数是固定的,对于非核心线程几乎可以说是没有限制的,并且当非核心线程处于闲置状态的时候就会立即被回收。

特点:

使用示例:

延迟2秒执行任务
public class ScheduleThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            scheduledExecutorService.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println(index);
                }
            }, 2, TimeUnit.SECONDS);
        }
    }
}
延迟2秒执行,每5秒周期执行任务
public class ScheduleThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println(index);
                }
            }, 2,5, TimeUnit.SECONDS);
        }
    }
}

Executors各个方法的弊端:

总结

线程池面试

什么是线程池

    创建一组可供管理的线程,它关注的是如何缩短或调整线程的创建与销毁所消费时间的技术,从而提高服务器程序性能的。
    它把线程的创建与销毁分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段以减少服务请求是去创建和销毁线程的时间。
    它还显著减少了创建线程的数目。

为什么要使用线程池

    当我们在使用线程时,如果每次需要一个线程时都去创建一个线程,这样实现起来很简单,但是会有一个问题:当并发线程数过多时,并且每个线程都是执行一个时间很短的任务就结束时,这样创建和销毁线程的时间要比花在实际处理任务的时间要长的多,在一个JVM里创建太多的线程可能会导致由于系统过度消耗内存或切换过度导致系统资源不足而导致OOM问题。    线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。

如何正确的使用线程池

ThreadPoolExecutor构造函数中几个重要参数的解释

线程池的状态

线程池的执行流程

常用的几种线程池


    整理文章主要为了自己日后复习用,文章中可能会引用到别的博主的文章内容,如涉及到博主的版权问题,请博主联系我。

上一篇下一篇

猜你喜欢

热点阅读