死磕源码

死磕源码系列 - ThreadPoolExecutor

2019-09-25  本文已影响0人  sunyelw

本文是本人对线程池最常用的一种实现ThreadPoolExecutor的源码的一些理解, 边看边写, 不定时更新.


一、线程池的状态控制
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

下面列出了左移二十九位后的高三位的值及其状态说明, 注意第一位是符号位.

// 111
// 接受新任务并且处理阻塞队列里的任务
private static final int RUNNING    = -1 << COUNT_BITS;

// 000
// 拒绝新任务但是处理阻塞队列里的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;

// 001
// 拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
private static final int STOP       =  1 << COUNT_BITS;

// 010
// 所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
private static final int TIDYING    =  2 << COUNT_BITS;

// 011
// 终止状态。terminated方法调用完成以后的状态
private static final int TERMINATED =  3 << COUNT_BITS;

那么如何通过一个变量(ctl)控制两个变量(workerCount/runState)呢?

/**
* rs 线程状态,即上面的五个常量
* wc 活跃线程数
* @return 控制位
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
  1. rs只影响高三位,后二十九位全0
  2. wc只影响低二十九位,高三位全0
  3. 二者相或,结果就是高位取rs,低位取wc
private static int runStateOf(int c)     { return c & ~CAPACITY; }
  1. 参数为控制位 ctl
  2. ~运算符是连符号位一起取反后以补码取原码
没有溢出的情况下。
实际也不可能溢出, 由于 -0 是负数的最小值, 负数的最小值的绝对值比最大正数大一
可以参考 [-128, 127]
0001 1111    CAPACITY 原始值 高三位全0 (低位全1, 只写了高八位)
1010 0000    `~CAPACITY`运算后值 (这里就变成负数了)
1110 0000    CAPACITY 运算值 (补码, 计算机都是补码参与运算)
  1. ~CAPACITY 参与运算是高三位全1、低位全0, 这样相与就把控制位的高三位取出来了, 也就是状态位.
private static int workerCountOf(int c)  { return c & CAPACITY; }
  1. runStateOf类似,这里只提一下不一致的地方
0001 1111  CAPACITY 原始值(低位全1, 只写了高八位)
0001 1111  CAPACITY 运算值(正数的 原码/反码/补码 一致)
  1. 所以取的是低二十九位的值, 也就是 workerCount
二、ThreadPoolExecutor的构造函数

这里就只说一个最终的构造函数, 其他的参数个数少于7个的都是调用此完整版

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

一个一个来看下:

名称 类型 描述
corePoolSize int 核心线程数, 活跃线程数先达到的数量
maximumPoolSize int 最大线程数, 活跃线程数的最大值, 为核心线程+非核心线程数(队列中的任务不属于此类)
keepAliveTime long 超时时间, max - core 的这部分活跃线程最大等待时间. 还有一个参数 allowCoreThreadTimeOut, 如果为true, 则适用于全部活跃线程(也就是哪怕是核心线程core如果没有任务处理也会被close)
unit TimeUnit keepAliveTime的单位, 是一个枚举值
workQueue BlockingQueue<Runnable> core满了以后, 再加进来的任务会进入此队列等待. 要求是一个实现了BlockingQueue接口的类, 自然的, 塞进去的任务要求实现了Runnable接口
threadFactory ThreadFactory 线程工厂类, 要求实现ThreadFactory接口, 重写newThread方法
handler RejectedExecutionHandler 任务拒绝策略, 当阻塞队列已满且活跃线程达到max之后加入进来的任务会执行拒绝策略, 需要实现RejectedExecutionHandler, 重写rejectedExecution方法

举个例子:

public class HyThreadFactory implements ThreadFactory{

    // 线程名称前缀
    private String namePrefix;
    // 线程优先级
    private int priority;
    // 守护线程
    private boolean isDaemon;

    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final AtomicInteger threadNumber = new AtomicInteger(1);

    public HyThreadFactory(String namePrefix){
        this(namePrefix, Thread.NORM_PRIORITY, false);
    }

    private HyThreadFactory(String namePrefix, int priority, boolean isDaemon){
        this.namePrefix = poolNumber.getAndIncrement() + "-" +namePrefix;
        this.priority = priority;
        this.isDaemon = isDaemon;
    }

    @Override
    public Thread newThread(Runnable r){

        Thread th = new Thread(r, namePrefix + threadNumber.getAndIncrement());
        th.setDaemon(isDaemon);
        th.setPriority(priority);
        return th;
    }
}
public class ThreadRejection implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor){

        System.out.println(r.toString() + " I am rejected...");

    }
}
ExecutorService es = new ThreadPoolExecutor(
                10,
                10,
                0L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(10 << 7),
                new HyThreadFactory("hy-thread-"),
                new ThreadRejection());
/**
 * The default rejected execution handler
 */
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

/**
 * A handler for rejected tasks that throws a
 * {@code RejectedExecutionException}.
 */
public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }

    /**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

说到这里, 讲讲我对线程池中任务线程的理解

问题: 请问一个线程池构造成功后, 其所能同时容纳的最大任务数量是多少?

三、线程池任务执行流程 -- 啃源码~

先看下继承结构


ThreadPoolExecutor 继承

这一串/接口 中, 我们只拎出来接下来要讲的执行过程中涉及到的方法,

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

这个顶级接口中只有一个抽象的任务执行方法

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

我们通常都是基于此接口规范进行线程池的处理, 比如下面的主角ThreadPoolExecutor.

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

submit方法就做了三件事

  1. 将传入线程Runnable转为RunnableFuture对象
  2. 将此对象作为入参调用 execute方法
  3. 返回此对象

RunnableFuture 是一个同时实现了RunnableFuture(跟Execute类似的顶级接口)的接口

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

submitexecute两个方法的区别你知道几条?

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

这三个addWorker方法的参数都不一样, 后面马上说, 这里你心里有个数就行

总结

所以一个创建好的线程池最大同时容纳任务数是 max + workQueue.size() (一个线程同时只能处理一个任务哦~)

好了, 现在的问题就剩addWorker方法了, 这个方法我们分两步来说:

/**
*  firstTask 用于执行的第一个任务, 可以为null
*  core  此次添加的是否核心线程
*/
private boolean addWorker(Runnable firstTask, boolean core){...}
retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;

    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        if (compareAndIncrementWorkerCount(c))
            break retry;
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}

这一步由1、2两层循环组成,分别看下这两层循环做了什么

  1. 获取控制位,拿到运行状态
  2. 判断是否允许入库, 上面这个判断我换了一种写法
rs >= SHUTDOWN && 
( rs != SHUTDOWN || firstTask != null || workQueue.isEmpty() )

很明显了, 不执行此方法的情况有三种
(1) rs = STOP/TIDYING/TERMINATED
(2) rs = SHUTDOWNfirstTask 不为null
(3) rs = SHUTDOWNfirstTasknull,阻塞队列workQueue 中没有任务待执行

还记得SHUTDOWN的意思吗, 回顾一下[允悲]

拒绝新任务但是处理阻塞队列里的任务

下面对应着来尝试解释下上面三种情况:
(1) 除了RUNNING之外的四种状态, 是不会允许添加新任务的
(2) firstTasknull时的addWorker方法会做什么事? 等会详细说, 这里先给出答案, 会创建一个Worker线程去处理阻塞队列中的任务(核心与否取决于 另一个参数core). 所以不为nullfirstTask是不会被SHUTDOWN状态的线程池接受的
(3) SHUTDOWN状态的线程池还是要处理队列中的任务, 所以如果队列为空也就不用再创建Worker线程去处理了

firstTasknulladdWorker方法创建的是Worker线程而不是添加新的任务进入线程池, 目的是处理队列中的任务

第二层循环, 修改活跃线程数量
(1) 拿到活跃线程数量, 判断是否超过最大值
(2) 没有超过则CAS更改活跃线程数量(+1),

private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

成功则跳出所有循环, 进入第二步
(3) CAS失败, 重新获取ctl, 判断状态位与开始是否一致
- 一致继续内层循环, 也就是重试CAS
- 不一致需要重新判断当前任务是否满足加入条件, 外层循环

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;
  1. 使用firstTask初始化Worker得到w, 拿到其内部线程t, 这个t是以Worker本身构造的
  2. 获取ReentrantLock锁, 然后做两件事
四、Worker类解析
private final class Worker 
   extends AbstractQueuedSynchronizer implements Runnable
  1. 先看看字段
** Thread this worker is running in.  Null if factory fails. */
final Thread thread;
/** Initial task to run.  Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

(1) thread

          `getTask` 
`t` > `w` > ---- > `task`

(2) firstTask

null != `firstTask` || null != `getTask`
  1. 构造函数
/**
 * Creates with given first task and thread from ThreadFactory.
 * @param firstTask the first task (null if none)
 */
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
  1. Runnable部分
/** Delegates main run loop to outer runWorker  */
public void run() {
    runWorker(this);
}

runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
If pool is stopping, ensure thread is interrupted;
如果线程不在RUNNABLE状态, 确保线程中断
if not, ensure thread is not interrupted.  
如果不在, 确保线程不会被中断
This requires a recheck in second case to deal with shutdownNow race while clearing interrupt
第二种情况需要做二次check 以防止在清除中断标识时调用shutdownNow方法

task是用户自定义的线程任务, 其run方法中可能会抛出非受检异常, 代码捕获到此异常后又立马往外抛出了, 这时就会跳过这个赋值 (可以多看两遍代码层次)

completedAbruptly = false;

这时completedAbruptly就是true了. 接着往下看其调用.

  1. processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

ThreadPoolExecutor 如何保证内部始终存活 core 数量的核心线程?

  1. getTask方法
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

getTask 返回 null 时都会将wc 减一, 对应processWorkerExit中的处理逻辑

两个方法的区别取决于具体的阻塞队列, 这里我想说的是, 线程池如何保证core数量的核心线程在活跃中呢? 如果允许核心线程超时那自然不用保证, 一般都是不允许也就是默认的allowCoreThreadTimeOut = false, 这种情况下 timed的值取决于wc > corePoolSize, 那很明显了, 如果大于, 说明当前wc已经是max级别了, 不需要阻塞执行, 带超时时间去取任务就好; 如果是小于, 那么wc就是core级别, 所有的worker都是核心线程, 需要阻塞执行以确保core数量的核心线程维持活跃(也就是不结束).

五、线程池的其他方法
  1. shutdown
public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
  1. shutdownNow
public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

这俩方法放一块看看

/**
 * Interrupts all threads, even if active. Ignores SecurityExceptions
 * (in which case some threads may remain uninterrupted).
 */
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers

/**
 * Common form of interruptIdleWorkers, to avoid having to
 * remember what the boolean argument means.
 */
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
  1. tryTerminate
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

感觉写的乱七八糟的, 自己做做笔记, 后面有新思路再来优化.


拓展1
线程池的关闭
ThreadPoolExecutor的开头注释中就写了

<dd>A pool that is no longer referenced in a program <em>AND</em>
has no remaining threads will be {@code shutdown} automatically. If
you would like to ensure that unreferenced pools are reclaimed even
if users forget to call {@link #shutdown}, then you must arrange
that unused threads eventually die, by setting appropriate
keep-alive times, using a lower bound of zero core threads and/or
setting {@link #allowCoreThreadTimeOut(boolean)}.  </dd>

这段注释告诉我们关闭线程池的两个条件 (然后还有一些建议):

我们需要做的就从第二点入手来关闭线程池, 如果一个fixed线程池创建后使用完没有调用shutdown方法会怎么样?
core数量的核心线程会一直在getTask()方法中take()而不会结束, 那么此线程池也不会自动终止, 这是原因.
官方给的建议是什么呢?

建议每次用完还是显示调用shutdown()方法


2019-10-09更新
拓展2
聊聊Worker模式控制中断的一些想法

对线程池通常的理解是:线程池中接收到任务后启动线程,由线程去执行任务。

这句话说起来非常简单,但其实内部做了不少事,其中一点就是使用AQS来控制中断。
假设你已经理解了AQS的基本使用,那么有如下建议(或者说标准):

举个例子,比如要中断Worker的线程

try (worker.tryLock()) {
  Thread th = worker.thread;
  th.interrupt();
}

记住这个例子。

Worker的构造函数中有一步操作是改变状态为-1

setState(-1); // inhibit interrupts until runWorker

runWorker之前禁止中断. 在runWorker方法中有一步w.unLock()操作将state字段置为了0.

这看起来好像是我把状态设置为-1后你就中断不了我一样?

这里有个很重要的东西要搞明白:

中断线程跟你的AQS状态没有一点关系

那么搞这么复杂还说要控制中断如何理解?

下面是我自己的一些想法,可能不对,但我目前只能这么理解。

Thread类的interrupt()方法注释最后有一句是这么说的

Interrupting a thread that is not alive need not have any effect.

中断一个尚未激活的线程不会有任何效果.

来捋一捋现在我们已经掌握的信息

按照Worker类逻辑来一遍,假如在Worker线程创建后启动前被中断了会发生什么情况呢?

那么如果Worker构造函数中没有setState(-1) 会怎么样?

结论:

感觉还有些地方是模糊的,还需要再想想。

上一篇 下一篇

猜你喜欢

热点阅读