多线程并发编程17-线程池ThreadPoolExecutor源

2020-03-28  本文已影响0人  Demo_zfs

    今天来说一说线程池ThreadPoolExecutor,线程池主要解决两个问题:一是当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行异步任务时直接new一个线程来运行,而线程的创建和销毁都需要开销。线程池中的线程是可以复用的,不需要每次执行异步任务都进行创建线程,从而减少了开销。二是线程池提供了一种资源限制和管理的手段,例如限制线程的个数、动态增加线程的个数、缓存异步任务等。

    ThreadPoolExecutor内部有一个原子性变量ctl,用来存储线程池的状态以及线程的个数,变量ctl高3位存储线程池状态,低29位存储线程个数。其中线程池状态有如下5中:

RUNNING:接受新任务并且处理阻塞队列里的任务。

SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务。

STOP:拒绝新任务并抛弃阻塞队列中的任务,并同时中断正在处理的任务。

TIDYING:所有任务执行完(包括阻塞队列里的任务)后当前线程池活动线程数位0,线程状态变为次TIDYING,之后将要调用terminated方法。

TERMINATED:调用terminated()方法后的状态。

线程池之间状态变化如下所示:

RUNNING -> SHUTDOWN:显式调用了shutdown()方法,或者隐式调用finalize()方法里面的shutdown()。

(RUNNING or SHUTDOWN) -> STOP:显式调用shutdownNow()方法。

SHUTDOWN -> TIDYING:当阻塞队列和线程池都为空时。

STOP -> TIDYING:当线程池为空时。

TIDYING -> TERMINATED:当terminated()方法执行完成时。

    下面通过线程池构造函数的参数来说说线程池的执行步骤。构造函数如下

public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue<Runnable> workQueue,

                              ThreadFactory threadFactory,

                              RejectedExecutionHandler handler)

corePoolSize:线程池核心线程个数。

maximumPoolSize:线程池最大线程个数。

keepAliveTime:存活时间,当非核心线程处于闲置状态,这些闲置的非核心线程能存活的最大时间。

unit:存活时间的时间单位。

workQueue:任务队列,用来保存等待执行任务的阻塞队列。

threadFactory:创建线程的工厂。

handler:饱和策略。当线程个数已达到最大线程个数,并且任务队列也已满,继续添加任务则会指定该饱和策略,比如:AbortPolicy(抛出异常)、CallerRunsPolicy(使用调用则所在线程来运行任务)、DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)和DiscardPolicy(默默丢弃,不抛异常)。

    当调用线程池的executor方法执行任务的时候都会创建一个core线程(即使之前创建的core线程处于空闲状态),直到core线程数达到corePoolSize,当core线程数达到corePoolSize并且所有的core线程都在执行任务,这时再添加任务就会往BlockingQueue阻塞队列中放,当BlockingQueue阻塞队列也达到设置的容量之后,就会再创建线程来执行新添加的任务,直到创建的线程达到maximumPoolSize,如果线程达到maximumPoolSize还在添加任务,这时就会执行饱和策略(抛弃、报错或当前执行executor方法的线程去执行该任务)。

    创建ThreadPoolExecutor 时会指定一个keepAliveTime的参数,用来指定非core线程在空闲keepAliveTime的时候之后就会被回收,这个非core线程keepAliveTime超时之后被回收是通过在获取BlockingQueue阻塞队列中任务时添加一个超时时间(BlockingQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)),从而实现了非core线程 空闲时间超过keepAliveTime之后被回收。 

    下面对ThreadPoolExecutor内部主要方法源码进行讲解。

execute(Runnable command)

    提交任务。

public void execute(Runnable command) {

//(1)检查提交的任务是否为null,是则抛出 NullPointerException异常。

    if (command == null)

        throw new NullPointerException();

//(2)获取原子变量ctl,高3位表示线程池状态, 低29位表示线程个数

    int c = ctl.get();

//(3)如果当前线程池的线程小于核心线程数,则尝试添加一个核心work线程到线程池中,并将当前任务作为新创建线程的第一个任务。

    if (workerCountOf(c) < corePoolSize) {

        if (addWorker(command, true))

            return;

        c = ctl.get();

    }

//(4)如果线程池处于RUNNING状态,则添加任务到阻塞队列。

    if (isRunning(c) && workQueue.offer(command)) {

//(5)因为是向阻塞队列中插入元素,这个操作有可能阻塞,在阻塞的这段时间有可能线程池状态发生了变化,所以这里需要再进行一次状态检查。

        int recheck = ctl.get();

//(6)如果当前线程状态不是RUNNING则从队列中删除任务,并执行拒绝策略。

        if (! isRunning(recheck) && remove(command))

            reject(command);

//(7)如果当前线程为空,则尝试添加一个非核心线程,代码走到这个分支有可能就是执行的核心线程为0。

        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);

    }

//(8)如果阻塞队列已满,则尝试创建一个非核心线程,并将当前任务作为该非核心线程的第一个任务。创建失败则执行拒绝策略。

    else if (!addWorker(command, false))

        reject(command);

}

addWorker(Runnable firstTask, boolean core)

    添加工作线程,可以是核心线程也可以是非核心线程。

private boolean addWorker(Runnable firstTask, boolean core) {

    retry:

    for (;;) {

//(1)获取ctl变量的值,并获取线程池状态。

        int c = ctl.get();

        int rs = runStateOf(c);

//(2)检查阻塞队列是否只在必要时为空。

        if (rs >= SHUTDOWN &&

            ! (rs == SHUTDOWN &&

              firstTask == null &&

              ! workQueue.isEmpty()))

            return false;

//(3)循环CAS增加线程个数。  

        for (;;) {

//(4)如果线程个数超限则返回false,下面3中情况都会返回false

//一:线程个数大于最大容量数。

//二:如果创建的是核心线程,线程池的核心线程个数已大于corePoolSize指定的核心线程数大小。

//三:如果创建的是非核心线程,线程池的线程数已大于 maximumPoolSize指定的线程数大小。

            int wc = workerCountOf(c);

            if (wc >= CAPACITY ||

                wc >= (core ? corePoolSize : maximumPoolSize))

                return false;

//(5)增加线程数,使用CAS算法为ctl变量的低29位表示的数字加1。

            if (compareAndIncrementWorkerCount(c))

                break retry;

//(6)CAS失败,则看线程池状态是否变化了,变化则跳到外层循环重新尝试获取线程池状态,否则在内层循环重新CAS。

            c = ctl.get();  // Re-read ctl

            if (runStateOf(c) != rs)

                continue retry;

            // else CAS failed due to workerCount change; retry inner loop

        }

    }

    boolean workerStarted = false;

    boolean workerAdded = false;

    Worker w = null;

    try {

//(7)创建worker,在获取独占锁之前进行创建,减少锁占用的时间。

        w = new Worker(firstTask);

        final Thread t = w.thread;

        if (t != null) {

//(8)尝试获取独占锁。

            final ReentrantLock mainLock = this.mainLock;

            mainLock.lock();

            try {

//(9)再次检查线程池状态,有可能在获取锁前有别的线程调用了线程池的shutdown接口。

                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||

                    (rs == SHUTDOWN && firstTask == null)) {

                    if (t.isAlive()) // precheck that t is startable

                        throw new IllegalThreadStateException();

                    workers.add(w);

                    int s = workers.size();

                    if (s > largestPoolSize)

                        largestPoolSize = s;

                    workerAdded = true;

                }

            } finally {

//(10)释放获取的独占锁。

                mainLock.unlock();

            }

//(11)添加worker成功后,则调用该新增worker的start()方法开始执行任务。

            if (workerAdded) {

                t.start();

                workerStarted = true;

            }

        }

    } finally {

//(12)如果添加worker失败,则通过CAS算法将ctl变量的低29位表示的数字减1,将该创建的worker移除,并检查线程池状态。

        if (! workerStarted)

            addWorkerFailed(w);

    }

    return workerStarted;

}

    提交任务之后worker是如何执行任务的呢?下面来看一下worker是如何处理任务的。

runWorker(Worker w)

    运行worker,从阻塞队列中持续获取任务进行执行。

final void runWorker(Worker w) {

    Thread wt = Thread.currentThread();

    Runnable task = w.firstTask;

    w.firstTask = null;

    w.unlock(); // allow interrupts

    boolean completedAbruptly = true;

    try {

// (1)调用getTask()方法从阻塞队列中持续获取任务进行执行。

        while (task != null || (task = getTask()) != null) {

//(2)在执行任务之前获取worker中的锁,防止任务执行的时候被中断,除非线程池正在stop。

            w.lock();

            // If pool is stopping, ensure thread is interrupted;

            // if not, ensure thread is not interrupted.  This

            // requires a recheck in second case to deal with

            // shutdownNow race while clearing interrupt

//(3)如果线程池正在stop,保证worker中的线程是中断状态,否则保证worker中的线程为非中断状态。

            if ((runStateAtLeast(ctl.get(), STOP) ||

                (Thread.interrupted() &&

                  runStateAtLeast(ctl.get(), STOP))) &&

                !wt.isInterrupted())

                wt.interrupt();

            try {

                beforeExecute(wt, task);

                Throwable thrown = null;

                try {

//(4)执行任务。

                    task.run();

                } catch (RuntimeException x) {

                    thrown = x; throw x;

                } catch (Error x) {

                    thrown = x; throw x;

                } catch (Throwable x) {

                    thrown = x; throw new Error(x);

                } finally {

                    afterExecute(task, thrown);

                }

            } finally {

                task = null;

                w.completedTasks++;

//(5)释放worker的锁。

                w.unlock();

            }

        }

        completedAbruptly = false;

    } finally {

//(6)worker结束前执行清理任务。

        processWorkerExit(w, completedAbruptly);

    }

}

getTask() 

    从阻塞队列中获取任务,该方法实现了线程空闲过期策略。

private Runnable getTask() {

    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {

        int c = ctl.get();

//(1)获取线程池状态。

        int rs = runStateOf(c);

        //(2) 检查阻塞队列是否为空,空则将worker数减一并返回null,之后在 runWorker()方法中会对worker进行回收。

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

            decrementWorkerCount();

            return null;

        }

        int wc = workerCountOf(c);

//(3)worker是否设置闲置超时 ,核心worker也是可以设置进行闲置超时回收。

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

//(4)worker已经闲置超时,并且阻塞队列为空,则将worker数减一,并返回null,之后在 runWorker()方法中会对worker进行回收。

        if ((wc > maximumPoolSize || (timed && timedOut))

            && (wc > 1 || workQueue.isEmpty())) {

            if (compareAndDecrementWorkerCount(c))

                return null;

            continue;

        }

        try {

// (5)使用阻塞队列带超时时间的poll(long timeout, TimeUnit unit)方法来实现闲置超时,闲置超时后在下一次进到循环中会执行(4)的代码。

            Runnable r = timed ?

                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                workQueue.take();

            if (r != null)

                return r;

            timedOut = true;

        } catch (InterruptedException retry) {

            timedOut = false;

        }

    }

}

    上面讲完了ThreadPoolExecutor如何进行提交任务以及worker如何获取任务并执行任务。下面讲一下ThreadPoolExecutor中的一些其他常用方法。

shutdown()

    调用shutdown方法后,线程池就会不再接受新的任务,但是阻塞队列中的任务还是会执行。该方法会立刻返回,并不会等待队列中任务完成后返回。

public void shutdown() {

//(1)尝试获取独占锁。

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

//(2)检查权限。

        checkShutdownAccess();

//(3)将当前线程池状态修改为SHUTDOWN状态,如果已为SHUTDOWN状态则直接返回。

        advanceRunState(SHUTDOWN);

//(4)设置worker的中断标识。

        interruptIdleWorkers();

        onShutdown(); // hook for ScheduledThreadPoolExecutor

    } finally {

//(5)释放锁。

        mainLock.unlock();

    }

//(6)尝试将线程池状态设置为TERMINATED,以下两种情况会将线程池状态设置为TERMINATED

//1.当线程池状态为SHUTDOWN并且线程池和阻塞队列都为空

//2.当线程池状态为STOP并且线程池为空。

    tryTerminate();

}

shutdownNow() 

    调用shutdownNow方法后,线程池不再接受新的任务,并且会丢弃工作队列里面的任务,中断正在执行的任务(中断是通过设置中断标识完成的,如果任务没有对中断标识进行响应则不会停止该任务),该方法会立刻返回,返回值为阻塞队列里被丢弃的任务列表。

public List<Runnable> shutdownNow() {

    List<Runnable> tasks;

//(1)尝试获取锁。

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

//(2)检查权限

        checkShutdownAccess();

//(3)设置线程池状态为stop

        advanceRunState(STOP);

//(4)设置所有worker中的线程的中断标志。

        interruptWorkers();

//(5)将阻塞队列中的任务移动到tasks中。

        tasks = drainQueue();

    } finally {

//(6)释放锁。

        mainLock.unlock();

    }

//(7)尝试将线程池状态设置为TERMINATED,以下两种情况会将线程池状态设置为TERMINATED

//1.当线程池状态为SHUTDOWN并且线程池和阻塞队列都为空

//2.当线程池状态为STOP并且线程池为空。

    tryTerminate();

    return tasks;

}

    线程池巧妙地使用了原子变量来记录线程池的状态以及线程个数。通过线程池状态来控制任务的执行,每个worker可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销。

    今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。

上一篇下一篇

猜你喜欢

热点阅读