深入理解线程池源码

2022-03-24  本文已影响0人  天还下着毛毛雨
image

核心属性

  1. corePoolSize :核心线程数,一般情况下,该数量的核心线程创建好之后,会常驻在线程池中,不会应空闲而关闭,可以设置allowCoreThreadTimeOut=true使核心线程空闲关闭
  2. maximumPoolSize :最大线程数,>核心线程数。
  3. keepAliveTime : 空闲时间,当线程获取任务时,超过keepAliveTime仍然获取不到任务,那么线程执行完所有逻辑后,自动消亡,workerSet也会移除该worker对象
  4. unit : 空闲事件keepAliveTime 的单位
  5. BlockingQueue<Runnable> workQueue : 任务的阻塞队列,当前提交任务时,工作线程已经>= 核心线程数, 则会将任务 推入阻塞队列中。如果阻塞队列达到最大长度,则会在工作线程数 不超过最大线程数maximumPoolSize的情况下,继续创建空闲线程来处理任务。
  6. ThreadFactory threadFactory: 创建线程的工厂
  7. RejectedExecutionHandler handler :任务的拒绝策略。当线程数任务阻塞队列满了,且工作线程数 大于等于 最大线程数了, 则 线程池无法调度线程则处理任务,调用构造方法传入的RejectedExecutionHandler实例的rejectedExecution()方法来拒绝任务。
/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

状态 value 说明
RUNNING(当线程池创建出来的初始状态) 111 能接受任务,能执行阻塞任务
SHUTDOWN(调用shutdown方法) 000 不接受新任务,能执行阻塞任务 肯定可以 執行正在執行的任務
STOP(调用shutDownNow) 001 不接受新任务,打断正在执行的任务,丢弃阻塞任务
TIDYING(中间状态) 010 任务全部执行完,活动线程也没了
TERMINATED(终结状态) 011 线程池终结

常用api

execute(Runable)

执行任务,无返回值

public void execute(Runnable command);

submit(Runable)

会返回Future对象,调用Future对象的get(),会阻塞,直到拿到返回值返回

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

doInvokeAny

返回最快执行完的任务的结果,集合中其他正在执行的线程会被关闭

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)

invokeAll

执行所有任务,返回List<Future<T>>

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

shutdown()

不会接收新的任务,但是已经运行和在队列中的任务会执行完,然后在关闭线程

线程状态变成SHUTDOWN

 */
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

awaitTermination(long timeout, TimeUnit unit)

等待线程池关闭,会提前也会超时

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

shutdownNow

打断所有所有正在执行的任务,返回队列中的任务

线程状态变成STOP

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            // 打断正在工作的线程
            interruptWorkers();
            // 从队列中取出等待的任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        // 返回队列中等待的任务
        return tasks;
    }

源码解析

流程图

image

一 、任务的执行以及线程的创建 : execute(Runnable task):

传入Runnable 对象作为要执行的任务。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 1110000000000000000000000 | 0 =  111000000000000000000
    int c = ctl.get();
    // 第一种情况 :计算当前工作线程数是否小于 所配置的核心线程数
    //  workerCountOf(c) : c & CAPACITY
    //  CAPACITY : (1 << COUNT_BITS)-1 : 先左移29位 0010 0000 0000.....   然后-1 0001 1111 1111 1111 1111....后面29位全是1
    //  这时候&c,那么 c的 低29位 与 000 11111...(29个1)  做 & 运算,只有 为1的才保留下来, 结果为高三位 为0,标识线程数量的29位,可以很好的保留下来。
    if (workerCountOf(c) < corePoolSize) {
        // 则创建新增核心线程,并执行task
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 第二种情况 :当前工作线程数已经 大于等于核心线程数了,尝试往阻塞队列中添加task
    // 如果 阻塞队列还没有满,则是添加成功的
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果线程非运行状态,并且移除任务成功,执行线程池决拒绝任务的策略
        // 啥时候移除不成功?可能并发的时候,被别的线程 取出来了 执行了。
        if (! isRunning(recheck) && remove(command))
            // 拒绝任务
            reject(command);
        // // 判断工作线程是否 = 0,这种情况是 核心线程数为0的时候,需要创建空闲线程来处理队列中的任务,比如CachedThreadPool,第一次进来
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 第三种情况 :如果阻塞队列满了,阻塞队列添加task失败,就会尝试创建空闲线程
    // 会判断 当前工作线程数是否 < 最大线程数 maximumPoolSize, 如果小于就可以创建空闲线程
    else if (!addWorker(command, false))
        // 第四 : 种情况如果 >= maximumPoolSize,执行拒绝策略
        reject(command);
}

1. 当前工作线程小于核心线程数,则创建Worker对象,加入到workerSet中

image
addWorker(command, true)

创建Worker对象(持有线程),并调用持有线程的start方法,在run方法中执行runnable

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 获取当前工作线程数
            int wc = workerCountOf(c);
            //  如果当前工作线程数 大于最大线程数 2 ^ 29 -1 ,或者大于 (根据当前添加工作线程的类型) 核心线程数 还是 线程池最大线程数
            // 核心线程 判断是否 > corePoolSize,空闲线程,判断 maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // cas,工作线程数+1,退出循环,因为并发问题只有对工作线程数量加成功了,才能在下面开启线程。
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            // 重新校验线程池 运行状态
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 下面走创建线程的逻辑
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 线程一个worker, Worker 是Thread的子类,传入runnable对象
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // 不是关闭状态
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 加到工作线程集合中
                    workers.add(w);
                    // 当前工作线程集合大小
                    int s = workers.size();
                    // 更新 线程池 线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 执行task,启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Worker.Run方法
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    // 执行任务
    public void run() {
        runWorker(this);
    }
}

在run方法中调用runWorker方法,传入当前对象

  1. 该worker对象第一次执行任务时,w.firstTask是!= null的,所以可以进入while的循环体, 执行Runnable的run方法
  2. 第二次进来则从阻塞队列中拿任务。
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // task第一次被创建时,构造方法传入了Runnable对象,所以现在是!= null的
    Runnable task = w.firstTask;
    // 之后清空,第二次进来是null
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 无限循环,当前有任务未执行 或者 阻塞队列中有任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行runnable的run方法执行业务逻辑
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 允许核心线程超时关闭 或者 当前工作线程数 > 核心线程数
        // 线程关闭前,从worderSet中移除worker对象
        processWorkerExit(w, completedAbruptly);
    }
}

2. 当前工作线程数已达到核心线程数了,但是阻塞队列还没满

则会往workQueue 存入Runnable对象

如果 队列长度还没达到上限,则offer方法会成功存入Runnable对象,返回true

如果 队列长度已达到上限,则返回false,说明当前工作线程从队列中拿task,处理task的速度还不够,会创建非工作线程。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 1110000000000000000000000 | 0 =  111000000000000000000
    int c = ctl.get();
    // 第一种情况 :计算当前工作线程数是否小于 所配置的核心线程数
    //  workerCountOf(c) : c & CAPACITY
    //  CAPACITY : (1 << COUNT_BITS)-1 : 先左移29位 0010 0000 0000.....   然后-1 0001 1111 1111 1111 1111....后面29位全是1
    //  这时候&c,那么 c的 低29位 与 000 11111...(29个1)  做 & 运算,只有 为1的才保留下来, 结果为高三位 为0,标识线程数量的29位,可以很好的保留下来。
    if (workerCountOf(c) < corePoolSize) {
        // 则创建新增核心线程,并执行task
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 第二种情况 :当前工作线程数已经 大于等于核心线程数了,尝试往阻塞队列中添加task
    // 如果 阻塞队列还没有满,则是添加成功的
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果线程非运行状态,并且移除任务成功,执行线程池决拒绝任务的策略
        // 啥时候移除不成功?可能并发的时候,被别的线程 取出来了 执行了。
        if (! isRunning(recheck) && remove(command))
            // 拒绝任务
            reject(command);
        // // 判断工作线程是否 = 0,这种情况是 核心线程数为0的时候,workQueue.offer(command)返回false,就需要创建空闲线程来处理队列中的任务,比如CachedThreadPool,第一次进来
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 第三种情况 :如果阻塞队列满了,阻塞队列添加task失败,就会尝试创建空闲线程
    // 会判断 当前工作线程数是否 < 最大线程数 maximumPoolSize, 如果小于就可以创建空闲线程
    else if (!addWorker(command, false))
        // 第四 : 种情况如果 >= maximumPoolSize,执行拒绝策略
        reject(command);
}

3. 阻塞队列已满,添加task失败,就会尝试创建空闲线程

创建空闲线程的方法和创建核心线程的方法都是addWorker(runnable,boolean core),只不过传入的core参数是false,表示是空闲线程

如果是空闲线程创建,则会判断当前工作线程数是否 > 最大线程数maximumPoolSize, 如果是创建核心线程则判断的是 核心线程数

如果大于 最大线程数maximumPoolSize 就会创建线程失败

 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 获取当前工作线程,
            int wc = workerCountOf(c);
            // 如果当前工作线程数 大于最大线程数 2 ^ 29 -1 ,或者大于 (根据当前添加工作线程的类型) 核心线程数 还是 线程池最大线程数
            // 核心线程 判断是否 > corePoolSize,空闲线程,判断 maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 后面创建线程Worker对象和创建核心线程是一模一样的
 }

4. 任务拒绝:阻塞队列满了,并且 工作线程数已经达到最大线程数了, 则尝试创建空闲线程会失败,走任务的拒绝策略。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    // -100000000000000000000000000000 | 0 =  -100000000000000000000000000000
    int c = ctl.get();
    // 第一种情况 :计算当前工作线程数是否小于 所配置的核心线程数
    //  workerCountOf(c)  : 11111111111111111111111111111 & -100000000000000000000000000000
    if (workerCountOf(c) < corePoolSize) {
        // 则创建新增核心线程,并执行task
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 第二种情况 :当前工作线程数已经 大于等于核心线程数了,尝试往阻塞队列中添加task
    // 如果 阻塞队列还没有满,则是添加成功的
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 第三种情况 :如果满了,阻塞队列添加task失败,就会尝试创建空闲线程
    // 会判断 当前工作线程数是否 < 最大线程数 maximumPoolSize, 如果小于就可以创建空闲线程
    else if (!addWorker(command, false))
        // 如果 >= maximumPoolSize,执行拒绝策略
        reject(command);
}

jdk提供的拒绝策略类 :

image
  1. AbortPolicy 抛异常

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }
    
        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    
  2. DiscardPolicy 丢弃 = 啥也不干

    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }
    
        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    
  3. DiscardOldestPolicy 推出并忽略阻塞队列中的第一个任务,尝试执行当前任务

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
         // poll出阻塞队列中的第一个任务,并忽略掉
            e.getQueue().poll();
            // 以执行当前任务
            e.execute(r);
        }
    }
}

................................................

二 、线程的维护

线程池中的作用维护线程,避免频繁创建,销毁线程而带来系统资源的浪费。

核心线程默认(可以配置allowCoreThreadTimeOut = true 来设置 注销核心线程 )是不会在执行完某一个任务后被注销的

空闲线程 在空闲时间达到keepAliveTime 后, 会自动注销(执行完run方法)。

1. 线程的阻塞 :

Worker.runWorker(Worker w)

线程的执行方法中,用while的方式,判断 当前是否有任务(第一次被创建出来) 或者 从阻塞队列中拿任务

  1. 判断 当前是否有任务(第一次被创建出来)
  2. 阻塞队列中有任务 :execute任务时,当工作线程数 > 大于核心线程数时且 阻塞队列没有满时, 会把任务存入阻塞队列
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // task第一次被创建时,构造方法传入了Runnable对象,所以现在是!= null的
    Runnable task = w.firstTask;
    // 之后清空,第二次进来是null
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 无限循环,当前有任务未执行 或者 阻塞队列中有任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行runnable的run方法执行业务逻辑
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 允许核心线程超时关闭 或者 当前工作线程数 > 核心线程数
        // 线程关闭前,从worderSet中移除worker对象
        processWorkerExit(w, completedAbruptly);
    }
}

如果可以不停的获取任务,处理任务,这种情况下 所有线程都不会被注销,因为无法退出while循环

2. 线程的注销

但是当没有任务提交时,也就是当前任务没有,阻塞队列里也拿不到任务,线程则处于空闲状态,空闲线程 空闲状态下的时间达到keepAliveTime ,则会退出while循环,结束线程。

而核心线程则会在getTask中(如果没配置allowCoreThreadTimeOut=true) 阻塞住, 不返回结果,直到阻塞队列中可以获取到任务, 再进入while循环体。

getTask()

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling? 
        // 是否允许核心线程超时关闭 或者 当 工作线程数 > 核心线程数了
        // 当 线程中 只剩下核心线程的时候, wc > corePoolSize 就不会返回true,则会workQueue.take()阻塞住
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果允许核心线程超时注销 或者 当前工作线程数 > 核心线程数, 则调阻塞队列的 poll,超时返回null
            // 否则调take()方法,一直拿不到就一直阻塞
            
            // 这就说明,只有允许核心线程超时注销,或者 当 当前工作线程数 > 核心线程数时,才会调 阻塞队列会超时的poll方法,
            // runWorker方法才会退出while循环体, 结束线程
            
            // 如果allowCoreThreadTimeOut被设置为true,则所有线程从队列中拿任务调用的都是workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法,所有线程在poll超时之后,仍然没获取到任务,则会返回 null ,退出循环体, 结束线程
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

从workers移除线程

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 从workers移除线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            // 如果允许核心线程超时关闭,则为0,否则为corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 如果当前工作线程数 > 最小的线程数量
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 小于最小的线程数量,添加worker
        addWorker(null, false);
    }
}

常用的线程池配置

jdk的Executors类提供了4个创建线程池的配置方法, 通过之前的原理,我们来分析下这些线程池的不同

1. newFixedThreadPool

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

创建一个 通过操作一个共享的无界队列来复用固定数量的线程的线程池。

首先看构造方法,核心线程数和最大线程数是一样的 ,说明不存在线程池扩容的情况

空闲有效时间为0 毫秒, 由于只存在核心线程,所以不存在 线程被注销的情况

LinkedBlockingQueue 是一个无界队列,默认大小为int的最大值,所以不会出现 队列长度不够而导致 创建空闲线程的情况,也就不会出现 拒绝策略。

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

总结 :

  1. 线程数固定,
  2. 没有多余线程线程回收,
  3. 不会出现因线程不够,队列装不下而拒绝任务的情况

2.newSingleThreadExecutor

/**
 * Creates an Executor that uses a single worker thread operating
 * off an unbounded queue. (Note however that if this single
 * thread terminates due to a failure during execution prior to
 * shutdown, a new one will take its place if needed to execute
 * subsequent tasks.)  Tasks are guaranteed to execute
 * sequentially, and no more than one task will be active at any
 * given time. Unlike the otherwise equivalent
 * {@code newFixedThreadPool(1)} the returned executor is
 * guaranteed not to be reconfigurable to use additional threads.
 *
 * @return the newly created single-threaded Executor
 */
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

只有一个线程,无界队列

3.newCachedThreadPool

/**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available.  These pools will typically improve the performance
 * of programs that execute many short-lived asynchronous tasks.
 * Calls to {@code execute} will reuse previously constructed
 * threads if available. If no existing thread is available, a new
 * thread will be created and added to the pool. Threads that have
 * not been used for sixty seconds are terminated and removed from
 * the cache. Thus, a pool that remains idle for long enough will
 * not consume any resources. Note that pools with similar
 * properties but different details (for example, timeout parameters)
 * may be created using {@link ThreadPoolExecutor} constructors.
 *
 * @return the newly created thread pool
 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

典型的缓存策略。

说明适用于 吞吐量优先,会优先开启尽可能多的线程来保证数据处理效率,适合高并发场。如果任务不那么繁忙了,线程空闲下来,超过一分钟线程就被注销,可灵活回收空闲线程。

1. 如何灵活回收空闲线程?

从构造方法可以看到, 核心线程是0,是没有常驻线程的,所有的线程都是非核心线程。并且超时时间设置为60s,超过60没有从队列中获取到任务就会被回收。

2. 如果保证吞吐量优先?

同样从构造方法得知,非核心线程数量 最大可以达到Integer.MAX_VALUE个,也就是整形的最大数2^31-1个, 如果并发量高,会启用大量的线程来更快的处理任务,几乎不会有任务在排队的现象。

那么是有一个任务来就创建一个线程吗?

答案肯定不是的, 关键点在于 SynchronousQueue这个队列,和一般的BlockingQueue不同,这个队列的offer() 只有在 有线程阻塞在poll()方法的时候才会返回true,否则返回false,从ThreadPoolExecutor的execute(Runnable r) 可以得知, 当 workQueue.offer(command)返回false时,会调用下面那个else if里的addWorker去创建线程, 由于最大线程数 是Integer.MAX_VALUE,那么是肯定 可以新创建线程的。

image

那么 当没有任何其他线程阻塞在队列的poll()方法时,有两种情况

  1. 要么 是第一次进来,还没有产生任何线程在作业
  2. 要么 就是其他线程在正在处理任务, 整个线程池的所有线程都处于繁忙的状态,就没有任何空闲下来的线程。

所以,基于第2种情况,可以看出 为了吞吐量优先,就会新创建一个线程来处理 当前要提交的任务,保证任务的及时处理。

如果有线程空闲下来,在60s之内还未被回收,那么此时提交任务,调用workQueue.offer(command)这处代码就会返回true,将任务提交至队列,让其他线程poll到 处理即可,就不会创建新的线程, 达到线程的复用,节省了线程创建,回收的资源开销。

4.newScheduledThreadPool

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

定时任务线程池

/**
 * Creates a thread pool that can schedule commands to run after a
 * given delay, or to execute periodically.
 * @param corePoolSize the number of threads to keep in the pool,
 * even if they are idle
 * @return a newly created scheduled thread pool
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 */
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}
延迟执行
/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}
重复执行

自定义

schedule(){

        // dosomething
// 递归
        schedule();

}

api

scheduleWithFixedDelay

执行完任务再计算延迟时间

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 * @throws IllegalArgumentException   {@inheritDoc}
 */
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}
scheduleAtFixedRate

从任务开始执行就计算延迟时间

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
案例 每周三22点执行
public static void main(String[] args) {
    ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(1);
    // 计算当前时间距离目标时间还有多久
    //  初始化延迟时间 = 目标时间 - 当前时间
    // 周期  = 7天
    int period = 7;
    scheduled.scheduleAtFixedRate(() -> {
        //doSomething
    }, 初始化延迟时间, period, TimeUnit.DAYS)
}
上一篇下一篇

猜你喜欢

热点阅读