和Java源码谈恋爱

JDK源码分析之线程池

2019-07-15  本文已影响0人  一岁一枯荣啊

声明:本文只是总结记录,参考原文章,写的很好

一,创建方式

//指定参数
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,10,5000,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(100),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

//使用Executors
1.newCachedThreadPool
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程, 那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

2.newFixedThreadPool
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

3.newSingleThreadExecutor
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

4.newScheduledThreadPool
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。Pool(10);

二,线程池参数解析

1.corePoolSize

核心线程池大小

默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。核心线程在allowCoreThreadTimeout被设置为true时会超时退出,默认情况下不会退出。

2.maximumPoolSize

当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。

3.keepAliveTime

当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。

4.TimeUnit

3的时间单位

5.BlockingQueue<Runnable>

一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
PriorityBlockingQueue

ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

6.ThreadFactory threadFactory

传入一个线程工厂。通过该线程工厂可以创建线程。它创建的所有线程都是通过同样的ThreadGroup和同样的NORM_PRIORITY和non-daemon状态。通过提供的不同ThreadFactory,可以掌握修改线程名字,线程组,优先级,守护状态等。如果线程池调用线程工厂创建一个线程失败时,则返回一个null。且executor会继续,但是可能不会执行任何任务。
如果我们自己重写封装了一遍线程工厂,还有个好处就是可以通过该线程工厂实例维护所有由它创建的线程。

————————————————
原文链接:https://blog.csdn.net/wojiaolinaaa/article/details/51345789

7.RejectedExecutionHandler

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

三,执行流程

线程池按以下行为执行任务

  1. 当线程数小于核心线程数时,创建线程。
  2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
  3. 当线程数大于等于核心线程数,且任务队列已满
    1. 若线程数小于最大线程数,创建线程
    2. 若线程数等于最大线程数,抛出异常,拒绝任务

四,参数设置

五,源码分析

我们是通过threadPoolExecutor.execute(),方法提交的任务。那么这个方法干了什么我们打开源码

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();//获取线程池状态
        //如果工作线程小于核心线程数则调用addWorker创建一个新Worker
        if (workerCountOf(c) < corePoolSize) {
            // 线程池已经shutdown 或者其他线程提前创建的worker线程 。加入的时候会超过核心线程数会失败
            if (addWorker(command, true))
                return;
            //如果失败了重新获取状态
            c = ctl.get();
        }
        //如若状态还是运行中 任务加入队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次判断状态如果不是运行中,就从队列移除任务,执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果工作线程为0 ,就创建一个任务为空的工作线程。确保有一个线程来消费workerQueue中的任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //加入队列失败,说明队列已满,添加非核心线程Worker执行任务。添加失败则执行被拒策略
        else if (!addWorker(command, false))
            reject(command);
    }

我们的任务正常情况下都进入了addWorker方法。继续

private boolean addWorker(Runnable firstTask, boolean core) {
        //外层循环  有疑问参考https://www.jianshu.com/p/a92265fc70da
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
             /**
         * 1、如果线程池state已经至少是shutdown状态了
         * 2、并且以下3个条件任意一个是false
         *   rs == SHUTDOWN         (隐含:rs>=SHUTDOWN)false情况: 线程池状态已经超过shutdown,可能是stop、tidying、terminated其中一个,即线程池已经终止
         *   firstTask == null      (隐含:rs==SHUTDOWN)false情况: firstTask不为空,rs==SHUTDOWN 且 firstTask不为空,return false,场景是在线程池已经shutdown后,还要添加新的任务,拒绝
         *   ! workQueue.isEmpty()  (隐含:rs==SHUTDOWN,firstTask==null)false情况: workQueue为空,当firstTask为空时是为了创建一个没有任务的线程,再从workQueue中获取任务,如果workQueue已经为空,那么就没有添加新worker线程的必要了
         * return false,即无法addWorker()
         */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //worker数量
                int wc = workerCountOf(c);
                //如果worker数量超过了边界,添加则失败
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //调用CAS操作,使得worker数量+1,成功则跳出retry循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //CAS worker数量+1失败,再次读取ctl
                c = ctl.get();
               //如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断
               //CAS失败时因为workerCount改变了,继续内层循环尝试CAS对worker数量+1
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        //workerCount+1之后的操作
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //将任务封装为Worker 传入参数 当前线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    //如果线程池在运行running<shutdown 或者 线程池已shutdown且firstTask==null(workQueue中可能仍有未执行完成的任务,创建没有初始任务的worker线程执行)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //如果线程已经被启动抛出异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                            //添加任务到workers。HashSet<Worker>
                        workers.add(w);
                        //设置最大的池大小largestPoolSize,workerAdded设置为true
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //添加成功就启动
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
        //启动失败
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }


addWorker(Runnable firstTask, boolean core)

参数:
firstTask: worker线程的初始任务,可以为空
core: true:将corePoolSize作为上限,false:将maximumPoolSize作为上限

addWorker方法有4种传参的方式:

​ 1、addWorker(command, true)

​ 2、addWorker(command, false)

​ 3、addWorker(null, false)

​ 4、addWorker(null, true)

在execute方法中就使用了前3种,结合这个核心方法进行以下分析
第一个:线程数小于corePoolSize时,放一个需要处理的task进Workers Set。如果Workers Set长度超过corePoolSize,就返回false
第二个:当队列被放满时,就尝试将这个新来的task直接放入Workers Set,而此时Workers Set的长度限制是maximumPoolSize。如果线程池也满了的话就返回false
第三个:放入一个空的task进workers Set,长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会去任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务
第四个:这个方法就是放一个null的task进Workers Set,而且是在小于corePoolSize时,如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。实际使用中是在prestartAllCoreThreads()方法,这个方法用来为线程池预先启动corePoolSize个worker等待从workQueue中获取任务执行
执行流程:
1、判断线程池当前是否为可以添加worker线程的状态,可以则继续下一步,不可以return false:
A、线程池状态>shutdown,可能为stop、tidying、terminated,不能添加worker线程
B、线程池状态==shutdown,firstTask不为空,不能添加worker线程,因为shutdown状态的线程池不接收新任务
C、线程池状态==shutdown,firstTask==null,workQueue为空,不能添加worker线程,因为firstTask为空是为了添加一个没有任务的线程再从workQueue获取task,而workQueue为空,说明添加无任务线程已经没有意义
2、线程池当前线程数量是否超过上限(corePoolSize 或 maximumPoolSize),超过了return false,没超过则对workerCount+1,继续下一步
3、在线程池的ReentrantLock保证下,向Workers Set中添加新创建的worker实例,添加完成后解锁,并启动worker线程,如果这一切都成功了,return true,如果添加worker入Set失败或启动失败,调用addWorkerFailed()逻辑

任务都被封装成了worker,为什么不直接使用线程池execute方法提交的 Runnable?

分析worker类

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        
        Worker(Runnable firstTask) {
            //设置AQS的同步状态private volatile int state,是一个计数器,大于0代表锁已经被获取
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
        //会讲state更新为0 runWorker()是ThreadPoolExecutor的方法
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
 //尝试一次将state从0设置为1,即“锁定”状态,但由于每次都是state 0->1,而不是+1,那么说明不可重入
        //且state==-1时也不会获取到锁
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
 /**
     * 尝试释放锁
     * 不是state-1,而是置为0
     */
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
 /**
     * 中断(如果运行)
     * shutdownNow时会循环对worker线程执行
     * 且不需要获取worker锁,即使在worker运行时也可以中断
     */
        void interruptIfStarted() {
            Thread t;
              //如果state>=0、t!=null、且t没有被中断
        //new Worker()时state==-1,说明不能中断
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }


Worker类本身既实现了Runnable,又继承了AbstractQueuedSynchronizer(以下简称AQS),所以其既是一个可执行的任务,又可以达到锁的效果。
Worker控制中断主要有以下几方面:
1、初始AQS状态为-1,此时不允许中断interrupt(),只有在worker线程启动了,执行了runWoker(),将state置为0,才能中断
不允许中断体现在:
A、shutdown()线程池时,会对每个worker tryLock()上锁,而Worker类这个AQS的tryAcquire()方法是固定将state从0->1,故初始状态state==-1时tryLock()失败,没发interrupt()
B、shutdownNow()线程池时,不用tryLock()上锁,但调用worker.interruptIfStarted()终止worker,interruptIfStarted()也有state>0才能interrupt的逻辑
2、为了防止某种情况下,在运行中的worker被中断,runWorker()每次运行任务时都会lock()上锁,而shutdown()这类可能会终止worker的操作需要先获取worker的锁,这样就防止了中断正在运行的线程

Worker实现的AQS为不可重入锁,为了是在获得worker锁的情况下再进入其它一些需要加锁的方法

Worker和Task的区别:
Worker是线程池中的线程,而Task虽然是runnable,但是并没有真正执行,只是被Worker调用了run方法,后面会看到这部分的实现。

最后一步runWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
      // new Worker()是state==-1,此处是调用Worker类的tryRelease()方法,将state置为0, 而interruptIfStarted()中只有state>=0才允许
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
   // 如果task不为null,或者从阻塞队列中getTask()不为null
        while (task != null || (task = getTask()) != null) {
       //上锁,不是为了防止并发执行任务,为了在shutdown()时不终止正在运行的worker
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // /**
             * clearInterruptsForTaskRun操作
             * 确保只有在线程stoping时,才会被设置中断标示,否则清除中断标示
             * 1、如果线程池状态>=stop,且当前线程没有设置中断状态,wt.interrupt()
             * 2、如果一开始判断线程池状态<stop,但Thread.interrupted()为true,即线程已经被中断,又清除了中断标示,再次判断线程池状态是否>=stop
             *   是,再次设置中断标示,wt.interrupt()
             *   否,不做操作,清除中断标示后进行后续步骤
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                
                wt.interrupt();
            try {
            // //执行前(子类实现)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                   //执行后(子类实现)
                    afterExecute(task, thrown);
                }
            } finally {
                task = null; //task置为null
                w.completedTasks++; //完成任务数+1
                w.unlock(); //解锁
            }
        }
        completedAbruptly = false;
    } finally {
    // //处理worker的退出
        processWorkerExit(w, completedAbruptly);
    }
}

执行流程:
1、Worker线程启动后,通过Worker类的run()方法调用runWorker(this)
2、执行任务之前,首先worker.unlock(),将AQS的state置为0,允许中断当前worker线程
3、开始执行firstTask,调用task.run(),在执行任务前会上锁wroker.lock(),在执行完任务后会解锁,为了防止在任务运行时被线程池一些中断操作中断
4、在任务执行前后,可以根据业务场景自定义beforeExecute() 和 afterExecute()方法
5、无论在beforeExecute()、task.run()、afterExecute()发生异常上抛,都会导致worker线程终止,进入processWorkerExit()处理worker退出的流程
6、如正常执行完当前task后,会通过getTask()从阻塞队列中获取新任务,当队列中没有任务,且获取任务超时,那么当前worker也会进入退出流程

getTask() -- 获取任务

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
  /**
         * 对线程池状态的判断,两种情况会workerCount-1,并且返回null
         * 线程池状态为shutdown,且workQueue为空(反映了shutdown状态的线程池还是要执行workQueue中剩余的任务的)
         * 线程池状态为stop(shutdownNow()会导致变成STOP)(此时不用考虑workQueue的情况)
         */
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//循环的CAS减少worker数量,直到成功
                return null;
            }

            int wc = workerCountOf(c);
            // Are workers subject to culling?
            // 是否需要定时从workQueue中获取
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //如果allowCoreThreadTimeOut为true,说明corePoolSize和maximum都需要定时
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                
                  
            /**
             * 如果到了这一步,说明要么线程数量超过了maximumPoolSize(可能maximumPoolSize被修改了)
             * 要么既需要计时timed==true,也超时了timedOut==true
             * worker数量-1,减一执行一次就行了,然后返回null,在runWorker()中会有逻辑减少worker线程
             * 如果本次减一失败,继续内层循环再次尝试减一
             */
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
              //poll() - 使用  LockSupport.parkNanos(this, nanosTimeout) 挂起一段时间,interrupt()时不会抛异常,但会有中断响应
            //take() - 使用 LockSupport.park(this) 挂起,interrupt()时不会抛异常,但会有中断响应
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
              //如获取到了任务就返回
                if (r != null)
                    return r;
               //没有返回,说明超时,那么在下一次内层循环时会进入worker count减一的步骤
                timedOut = true;
                  /**
              * blockingQueue的take()阻塞使用LockSupport.park(this)进入wait状态的,对LockSupport.park(this)进行interrupt不会抛异常,但还是会有中断响应
              * 但AQS的ConditionObject的await()对中断状态做了判断,会报告中断状态 reportInterruptAfterWait(interruptMode)
              * 就会上抛InterruptedException,在此处捕获,重新开始循环
              * 如果是由于shutdown()等操作导致的空闲worker中断响应,在外层循环判断状态时,可能return null
              */
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

getTask()**
执行流程:
1、首先判断是否可以满足从workQueue中获取任务的条件,不满足return null
A、线程池状态是否满足:
(a)shutdown状态 + workQueue为空 或 stop状态,都不满足,因为被shutdown后还是要执行workQueue剩余的任务,但workQueue也为空,就可以退出了
(b)stop状态,shutdownNow()操作会使线程池进入stop,此时不接受新任务,中断正在执行的任务,workQueue中的任务也不执行了,故return null返回
B、线程数量是否超过maximumPoolSize 或 获取任务是否超时
(a)线程数量超过maximumPoolSize可能是线程池在运行时被调用了setMaximumPoolSize()被改变了大小,否则已经addWorker()成功不会超过maximumPoolSize
(b)如果 当前线程数量>corePoolSize,才会检查是否获取任务超时,这也体现了当线程数量达到maximumPoolSize后,如果一直没有新任务,会逐渐终止worker线程直到corePoolSize
2、如果满足获取任务条件,根据是否需要定时获取调用不同方法:
A、workQueue.poll():如果在keepAliveTime时间内,阻塞队列还是没有任务,返回null
B、workQueue.take():如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务
3、在阻塞从workQueue中获取任务时,可以被interrupt()中断,代码中捕获了InterruptedException,重置timedOut为初始值false,再次执行第1步中的判断,满足就继续获取任务,不满足return null,会进入worker退出的流程

processWorkerExit() -- worker线程退出

private void processWorkerExit(Worker w, boolean completedAbruptly) {
  /**
     * 1、worker数量-1
     * 如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
     * 如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
     */
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();


    /**
     * 2、从Workers Set中移除worker
     */
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;//把worker的完成任务数加到线程池的完成任务数
            workers.remove(w);//从HashSet<Worker>中移除
        } finally {
            mainLock.unlock();
        }

//**
     * 3、在对线程池有负效益的操作时,都需要“尝试终止”线程池
     * 主要是判断线程池是否满足终止的状态
     * 如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
     * 没有线程了,更新状态为tidying->terminated
     */
        tryTerminate();

    /**
     * 4、是否需要增加worker线程
     * 线程池状态是running 或 shutdown
     * 如果当前线程是突然终止的,addWorker()
     * 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
     * 故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
     */
        int c = ctl.get();
        //如果状态是running、shutdown,即tryTerminate()没有成功终止线程池,尝试再添加一个worker
        if (runStateLessThan(c, STOP)) {
         //不是突然完成的,即没有task任务可以获取而完成的,计算min,并根据当前worker数量判断是否需要addWorker()
            if (!completedAbruptly) {
             //allowCoreThreadTimeOut默认为false,即min默认为corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                 //如果min为0,即不需要维持核心线程数量,且workQueue不为空,至少保持一个线程
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                     //如果线程数量大于最少数量,直接返回,否则下面至少要addWorker一个
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
              //添加一个没有firstTask的worker
        //只要worker是completedAbruptly突然终止的,或者线程数量小于要维护的数量,就新添一个worker线程,即使是shutdown状态
            addWorker(null, false);
        }
    }

processWorkerExit(Worker w, boolean completedAbruptly)**
参数:
worker: 要结束的worker
completedAbruptly: 是否突然完成(是否因为异常退出)
执行流程:
1、worker数量-1
A、如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
B、如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
2、从Workers Set中移除worker,删除时需要上锁mainlock
3、tryTerminate():在对线程池有负效益的操作时,都需要“尝试终止”线程池,大概逻辑:
判断线程池是否满足终止的状态
A、如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
B、没有线程了,更新状态为tidying->terminated
4、是否需要增加worker线程,如果线程池还没有完全终止,仍需要保持一定数量的线程
线程池状态是running 或 shutdown
A、如果当前线程是突然终止的,addWorker()
B、如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程

上一篇下一篇

猜你喜欢

热点阅读