ThreadPoolExecutor 源码分析

2020-12-06  本文已影响0人  想起个帅气的头像

前言

本文重点分析了ThreadPoolExecutor两个方法execute() 和 submit() 的执行原理,并说明Future如何实现阻塞返回。

继承关系图

关键方法介绍

构造方法

    /**
     * @param corePoolSize   核心线程数
     * @param maximumPoolSize  最大线程数
     * @param keepAliveTime 临时线程保留时间
     * @param unit  临时线程保留时间单位
     * @param workQueue 阻塞队列
     * @param threadFactory  线程工程
     * @param handler  拒绝策略
     */
 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

为了方便区分,本文会将超过核心线程数创建的线程叫临时线程,本质上这两类线程没有任何区别,到期回收哪个线程完全是跟当时线程池哪个线程先被空闲有关,跟创建时间的先后无关

execute(Runnable command)

默认参数

先介绍主要方法实现前,先说明一些静态变量的含义和值。

ctl 官方给出的注释是The main pool control state,这个值包含了两部分,workerCount和runState。

int COUNT_BITS = Integer.SIZE - 3 = 29; 一共32位,高3位表示线程池的运行状态,低29位表示线程池中的线程数量。是一种高低位的实现。

用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况

int CAPACITY = (1 << COUNT_BITS) - 1 = 536870912;也就是从的线程容量是536870912个。

RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 都是用高3位表示不同的含义。低29位都是0

具体值参考下表:

   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 初始值 RUNNING | 0 = -536870912 , 1110 0000 + 24位0 
    private static final int COUNT_BITS = Integer.SIZE - 3;   //29   高3位表示状态  低29表示线程数量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;  //536870912  0001 1111 + 24位1

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;  // -536870912  1110 0000 + 24位0
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 0            0000 0000 + 24位0 
    private static final int STOP       =  1 << COUNT_BITS;  // 536870912   0010 0000 + 24位0
    private static final int TIDYING    =  2 << COUNT_BITS;  // 1073741824  0100 0000 + 24位0
    private static final int TERMINATED =  3 << COUNT_BITS; // 1610612736   0110 0000 + 24位0

    // Packing and unpacking ctl
    // 如果c是默认值-536870912, 
    // runStateOf = (-536870912 & ~29) = -536870912, 
    // workerCountOf = (-536870912 & 29) = 0
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }  
源码分析
 /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //c = -536870912
        int c = ctl.get();
        // workerCountOf(c) = 0
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

首先,这个execute有三个主要的if判断:

      //判断当前线程池中的线程数量有没有到核心线程数,没有就创建新的worker来处理任务。
      if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
      //执行到此处,说明此时线程池的线程数已经超过了coolPoolSize。先判断线程池状态,且尝试将任务添加到阻塞队列里。
      if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 最后意味着此时阻塞队列已满,尝试创建新的worker来处理,不能创建则执行拒绝策略。
        else if (!addWorker(command, false))
            reject(command);
addWorker()

很长的一个方法,注释就不贴了,两个参数分别是当前要执行的任务和core(表示要创建的是核心线程还是临时线程)。
这里的worker是真正负责处理任务的对象,worker内部封装了所属线程和待执行的任务.

 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        // ...
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
       
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

接下来主要看addWorker方法的实现。

    private boolean addWorker(Runnable firstTask, boolean core) {
      //第一部分
      retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //第二部分
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

第一部分:
rs表示是线程池的状态,先校验线程池状态和队列数量。前文已经提过,RUNNING的值是负数,SHOTDOWN是0,其他值都是正数。

之后是for循环,判断容量和是否超过了预设的线程数量。
如果成功增加了workerCount的值就跳出循环,开始执行任务。
如果失败,说明有并发情况,就重新获取ctl,判断rs状态是否变了,从而决定是重新执行一遍大或小循环。

for循环结束后,说明当前可以增加worker对象。此时就真正创建对象开始执行任务。

第二部分:
在创建worker对象时,构造方法中也创建了一个Thread。并通过lock来保证原子性,校验状态之后将worker对象add到HashSet中。
private final HashSet<Worker> workers = new HashSet<Worker>();

添加后,释放锁并start线程。

如果在addWorker过程中失败,且第一阶段顺利完成,就从hashSet中移除,并减少workerCount。

/**
     * Rolls back the worker thread creation.
     * - removes worker from workers, if present
     * - decrements worker count
     * - rechecks for termination, in case the existence of this
     *   worker was holding up termination
     */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

如果添加任务顺利,则在t.start();执行完成后,主要任务就完成了并返回true。此时线程会执行worker对象内的run方法。

worker内 run()
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

runWorker真正执行,这个this只得是worker对象,task和线程都已经封装到worker内了。


    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 如果线程池已经是STOP或TIDYING或TERMINATED,需要将线程也主动中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这里说明一些最核心的逻辑
执行过程:

  1. 在while中判断当前的task和队列中的task,如果当前task != null,说明是线程是伴随着任务一起创建的,直接调用task.run来执行。
  2. 第一圈执行完成后,task=null,第二次执行while时,需要从getTask中取task来执行。
  3. 当getTask() 返回null时,while结束,设置completedAbruptly = false;表明任务时正常结束。最后调用processWorkerExit来退出线程。

这里提供了两个方法:beforeExecute 和 afterExecute,task.run()的切面,我们可以定义worker的子类,来实现扩展,比如加入一些监控等。

getTask() 返回null就代表着线程可以正常结束,那么什么情况下会返回null?

getTask()
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

getTask() 的主要任务是从阻塞队列中获取task。通过判断当前的wc 是否超过了核心线程数,来决定poll还是take来取任务。
如果超过了,说明此时已经创建过了临时线程,临时线程的有效期就是等待从队列返回的时间,超过这个时间没有取到,则设置timeOut表示已经超时,在下一次for循环的if判断中,返回null,让这个临时线程自动结束。
如果没超过,说明此时还处在核心线程的阶段,可以take长期等待。

至此,run方法的执行过程就此完成。

任务是如何添加到队列中的,还得回到execute方法。

      if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

如果已经达到核心线程数,就不能在继续addWorker,而是要offer到workQueue中,并再次检查线程池状态。

如果offer失败,说明阻塞队列已满,此时需要继续创建新的worker来完成任务。

        else if (!addWorker(command, false))
            reject(command);

这里的false代表 创建时和最大线程数进行比较,如果超过了最大线程数,则调用reject来执行拒绝策略。

reject()
/**
     * Invokes the rejected execution handler for the given command.
     * Package-protected for use by ScheduledThreadPoolExecutor.
     */
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

4种默认的拒绝策略

AbortPolicy : 直接抛出异常(默认策略)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
DiscardPolicy : 什么也不处理
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
DiscardOldestPolicy : 把当前最早在队列的任务丢弃,并将再次执行此任务(可能会直接执行,也可能被加到队列中)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
CallerRunsPolicy : 由当前线程来直接执行run,不再交给线程池。
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

submit 源码分析

submit()

    Future<?> future = Executors.newCachedThreadPool().submit(new Thread());
    ...
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

submit方法可用于带返回值的任务执行。可以返回Future来获取线程的执行结果,具体的实现定义在AbstractExecutorService中。

首先创建了一个FutureTask对象,传入了要执行的任务。把封装后的FutureTask交给execute来执行。

  /**
     * Returns a {@code RunnableFuture} for the given runnable and default
     * value.
     *
     * @param runnable 要执行的任务
     * @param 返回的默认值
     * @param <T> the type of the given value
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

FutureTask

继承关系图和构造方法
    /**  
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    // FutureTask 可能的状态列表
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** 执行的任务 */
    private Callable<V> callable;
    /** get() 的返回值,即最终的执行结果 */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    // 单项列表的node
    private volatile WaitNode waiters;

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

FutureTask 既然将任务封装到了callable属性中,且它自身还是一个Runnable,那么真正执行一定在run方法中。而get() 是一个阻塞方法,当执行完成后,可以获取返回值,否则就等待。

那重点看下run() 和 get() 的实现。

get()
public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

 private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

state 有多种状态,用来标记当前任务的执行情况,如果已经是完成状态,通过report方法直接返回outcome即可。
如果还未到达完成态,就说明当前任务还在执行,此时需要await等待,也就是awaitDone。

awaitDone()
/**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

awaitDone的两个参数分别用于表示是否有等待时间,以及等待时间的纳秒数。
如果有等待时间,deadline就是截止时间。
下面则是主要逻辑:
一般来说,这里的for循环会执行3圈,(不考虑已经执行完成和中断的情况)。

  1. 第一圈:因为WaitNode q 最初被赋值为null,在run执行完之前,state是NEW,所以for循环会执行q=null的逻辑,先创建一个WaitNode对象。
  2. 第二圈:因为q此时有值,但queued是false,此时for循环执行! queued的逻辑,如果设置成功,则queued = true。
  3. 第三圈:LockSupport.park(this); (如果有deadline,就判断是否超时了)此时线程进入阻塞状态等待唤醒。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
    //背景:
    UNSAFE = sun.misc.Unsafe.getUnsafe();
    Class<?> k = FutureTask.class;
    waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

重点就是这一句。
这一句是做了两个事情:

  1. 构建waiters的Node单向链表
  2. 如果添加队列成功就返回true。
这里为什么要构建单向链表?

一般来说,一个task通过一个get()方法等待获取就OK了,是一个单任务。但如果,同一个FutureTask的get() 方法被多个线程调用时,多个线程(可能)会同时处于阻塞状态,这时就需要一个存储介质来存储这些等待线程,这里是通过单链表来实现
构建单向链表的过程如下:

  1. 第一次调用get():
    当前waiters = null;q.next = waiters(null); waiters = q; 即waiters的头节点是q,q.next是null。
  2. 第二次调用get(); 如果当前的任务命名为p;
    当前waiters = q; p.next = waiter(q); waiters = p; 即构建了一个 p -> q的链表结构,waiters是头节点p。
  3. 第三次调用get(); 如果当前的任务命名为r;
    最后的效果是 r -> p -> q; 可以看出来是头插法。
run()
 public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt
        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
}

run() 比较简单,如果当前FutureTask是NEW的状态,就调用callable.call(),将执行完成的result通过set方法设置到outcome中。
且无论成功失败,都将runner线程置为null,并判断执行过程中是否被其他线程中断,如果因为中断而失败,则此线程一直交出时间片,直到状态从INTERRUPTING变成INTERRUPTED。

如果成功执行且没有被中断过,则通过set方法进行返回值的设置。

set()
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

先判断此时状态是NEW,则改成COMPLETING,设置outcome后,状态改成NORMAL(完成态),调用finishCompletion来唤醒等待中的线程。

finishCompletion()
/**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        // done是一个空方法,给子类重写用。
        done();

        callable = null;        // to reduce footprint
    }

这个方法比较简单,可以看到就是在遍历waiters单链表,依次唤醒内部的阻塞线程。(阻塞的发起点是get方法)。

总结

execute()

实现思想:
  1. task因为交由线程池来执行,线程池的线程直接调用task中的run,而不是执行task.start()。
  2. 如果当前线程池中的线程数 < corePoolSize ,就创建新的线程添加到线程池中(HashSet存储)。
  3. 如果当前的线程数 > corePoolSize 就先存放到阻塞队列里
  4. 如果阻塞队列已满,且 < maximumPoolSize,就创建新的线程添加到线程池中(HashSet存储),当keepAliveTime的时间没有处理任务,则销毁(也就是让run方法结束)。
  5. 如果已经超过maximumPoolSize,则根据拒绝策略执行。
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */

submit()

实现思想:

任务执行的思想还是execute,阻塞等待返回值的思想是通过Future完成。实现类是FutureTask。

  1. get()返回值时如果还未完成,将当前线程封装成WaiterNode,进行LockSupport.park,并将所有park的线程按照头插法构建一个单向链表。
  2. run() 执行完成后,将内部的outcome属性设置成当前FutureTask的返回值,并unpark单链表中的所有阻塞线程,这些线程的get()会直接返回outcome的值。
上一篇下一篇

猜你喜欢

热点阅读