迈向Java高级程序员(二)-----先从并发开始(线程池篇)

2018-04-01  本文已影响0人  小码工

在写线程池之前,我们先写个神奇的代码

这段代码我每次执行结果都是10000,按道理说,我启动了多个线程对i进行自增操作,而且没有进行相关的同步操作,值应当是小于10000的,怎么就能得到10000呢?难道我以前学的很多知识都是假的?

其实不然,这是因为每个线程的执行的时间比创建线程所耗费的时间要短,根本就没触发并发。第一个线程都执行完了,第二个线程才创建完。啥?你告诉我调1000次加法居然不及线程创建所花费的时间?但事实真的如此。

既然线程的创建如此的昂贵,我们有时候有需要启动线程来为我们高效的做事怎么办?这种问题Java当然提供了相关的解决方案,那就是线程池ThreadPoolExecutor。线程池的设计大大提升了线程的复用,线程不再是游荡的孤魂野鬼,再也不会朝闻道而夕死。说的这么吊,怎么用呢?下面一起来看看吧:

我们先看一下线程池的类关系图:

ThreadPoolExecutor

我们可以看到最顶层是Executor接口,此接口很简单就定义一个方法:用于声明线程池最核心的功能--------执行任务。

Executor核心方法

我们再来看看另外一个接口:ExecutorService

ExecutorService继承了Executor,保证了最核心的功能,同时扩展了如下功能:

1、void shutdown();        停止线程池(此方法不是立马停止,已经在执行的任务肯定会执行掉)

2、List<Runnable> shutdownNow(); 立刻停止线程池(正在执行任务也会停止掉,同时返回未执行的任务)

3、boolean isShutdown();  线程池是不是停止状态。此方法呼应方法1

4、boolean isTerminated(); 线程池是不是终止状态,此方法呼应方法1的最终状态和方法2

5、boolean awaitTermination(long timeout, TimeUnit unit) 等待线程池终止。

6、<T> Future<T> submit(Callable<T> task)  提交一个Callable任务,返回一个将来执行结果的约定

7、<T> Future<T> submit(Runnable task, T result);  提交一个任务,返回一个将来执行结果的约定

8、 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 提交一批Callable任务,并全部执行,返回一批对应的执行结果的约定

9、<T> T invokeAny(Collection<? extends Callable<T>> tasks) 提交一批Callable任务,任意执行一个,返回一个队形执行结果的约定

10、<T>  List<Future<T>>  invokeAll(Collection<?  extends Callable<T>> tasks, long timeout, TimeUnit unit) 类似于方法8,但是具有超时限制。

11、<T> T invokeAny(Collection< ? extends Callable<T>> tasks, long timeout, TimeUnit unit)

类似于方法9,但具有超时限制。

其中1~5你可以认为是声明生命周期方法。6~11是为其增强功能。

我们再来看看抽象类AbstractExecutorService的实现:

AbstractExecutorService你可以认为它是一个模版类,它只针对性的对6~11的增强功能做了处理,并未实现相关的声明周期方法和Executor定义的核心方法。源码非常简单,这里我就不解析类。

下面我们重点来解析ThreadPoolExecutor类:

很多看源码不知道从哪看,其实开始我也不知道,就是点到哪是哪,但是自从知道IDEA可以看类的关系图之后(Mac下 command+option+u),我就知道怎么看了:当然是从其最核心的方法看啦。

我们先来看看ThreadPoolExecutor对execute的实现。

ThreadPoolExecutor execute方法源码

方法不长,但是有几个关键变量我们需要先来解释下:

1、private final AtomicInteger  ctl   一个原子操作变量,用于获取当前线程池的状态(刚刚已经说了,声明周期方法也是在ThreadPoolExecutor里实现的,当然要记下线程池的状态啦)

2、private static final int COUNT_BITS  辅助常量,用于协助计算线程池状态的。

3、private static final int CAPACITY       辅助常量,用于协助计算线程池状态的。

4、private static final int RUNNING       表示线程池是运行状态,可正常执行任务

5、private static final int SHUTDOWN  表示线程池正在被停止,此时不接收新任务,但能处理已添加的任务

6、private static final int STOP              表示线程池正在被停止。不接收新任务,不处理已添加的任务,并且会中断正在处理的任务

7、private static final int TIDYING         表示线程池所有任务已终止

8、private static final int TERMINATED  表示线程池被停止

9、private final BlockingQueueworkQueue 阻塞队列,用于存放无法立即执行的任务

10、private volatile int corePoolSize   核心线程数量,线程池保持不销毁的线程不销毁的数量

11、private volatile int maximumPoolSize 最大线程数,线程池里能承载的最大线程数量

还有很多变量此时先不介绍,等用到再说。

此时我们再来分析execute方法:

先来看1335行~1354行的注释,大致意思是这样的:

1、如果提交一个任务时,此时线程池里的线程数量小于核心线程数,则直接创建一个线程用于执行本次提交的任务

2、如果提交一个任务时,线程池里已经创建了核心线程数目的线程,且线程池中所有已创建的线程均在执行任务。则判断缓存队列是否满了,如果没满则将任务加入到缓存队列,等待执行。如果此时缓存队列也满了,此时线程池中的线程不足最大线程数,则新建一个线程用于执行该任务。

3、如果线程池中线程已达到最大线程数也没有空闲的线程,且缓存队列也满了,则此时再提交任务将被拒绝。

这个套路我们来写段代码验证下:

测试代码 测试结果

从这段执行结果里不难看出,结果正如注释说的那样。

而且我们定义了线程的超时时间为5秒,继续看一个结果:

线程池也果然在指定时间内把超过线程池核心数的空闲线程干掉了。并且只保留了核心线程数。

线程池的运行从表态上我们已经简单的知道了,那我们再来看看其源码是如何实现的:

1356行,workerCountOf(c)获取当前线程池的的线程数,如果小于核心线程数,直接提交一个任务。

1357行:提交一个新任务并执行,addWorker的第二个参数为ture时,会创建一个新线程来执行任务。同时addWorker的第一个参数为这个新线程的首个任务。

1361行:到了这一行,说明线程池已经创建了核心线程池数目的线程,且没有空闲的,isRunning(c)是判断线程池状态是不是RUNNING的,如果是,则将任务插入到缓冲队列里。如果换从队列插入成功了,进入1363行。

1363行:再次判断线程状态是不是是不是停止的,因为有可能你在把任务放进队列的时候,有线程把线程池停止的情况。如果此时线程停止了,那就将任务从缓冲对接中删除,同时拒绝掉任务。

1365行:再次判断线程池核心线程是否空闲,如果有空闲,就直接让线程从缓冲队列里取任务,此时也不必创建线程,也不需提交任务,所有执行的是addWorker(null,false).如果还是没有空闲的,那就让任务在缓冲队列里躺着吧。

再来看1368行:线程池停止了或者队列满了都可能走这行,那就再试下是不是能创建超国核心线程池数目的线程来执行这个任务。如果也不行,不管是线程池状态不对也罢,还是线程池里的线程也已经达到了最大,都是拒绝任务里。

我们简单的分析了1356行~1369行的代码,发现都和一个重要的的方法addWorker有关。那我们再来好好看看addWorker有什么神奇之处吧。

先上源码:

先来分析此方法的两个参数的含义。

Runnable firstTask,顾名思义第一个任务,什么意思呢?就是如果这个参数不为空,此时肯定会创建一个线程,来执行这个任务,作为线程的第一个任务

boolean core,核心?还是补个boolean什么鬼。大致浏览下代码你就发现了,是为了让当前线程池的线程数量是和核心线程数比较,还是和最大线程数比较。

来具体分析下吧:

901行~905行:检查线程池的状态是不是非可执行状态,如果是,则继续判断;可能会出现以下几种情况。

1、线程池不是待停止状态SHUTDOWN,结合rs >=SHUTDOWN是true ,那只可能是线程池已经停止,线程池都已经进入停止,当然什么都不能干啦。

2、线程池是待停止状态SHUTDOWN,但是你提交的任务firstTask不是空,也不用继续判断了,!(rs==SHUTDOWN &&firstTask ==null &&!workQueue.isEmpty())肯定就是返回ture了,901行~904行整个表达式为true,直接什么事情都不做了。因为SHUTDOWN就是不能再接受新任务的一个状态。满足线程池的设计。

3、线程池是待停止状态SHUTDOWN,但是你提交的任务firstTask也是空。缓冲区里不是空,此时!(rs==SHUTDOWN &&firstTask ==null &&!workQueue.isEmpty())也返回true,因为SHUTDOWN也是只执行当前在进行的任务,你要我再去执行缓冲队列里的任务那也是不可能的。

此时你再回去看看1368行,是不是能对应其中一个线程池状态不是RUNNING场景。

从以上的分析也可以看出,线程池对其声明周期方法的诠释就在901行~905行。

我们再来继续分析线程池是可执行任务RUNNING状态下的情况:

907~918行:

907行这里进行了自旋。暂时不管为啥继续往下看

908行:获取当前正在执行任务的线程,先不管wc >=CAPACITY,先看

wc >= (core ?corePoolSize :maximumPoolSize)很明显是为了应对刚刚我们解析1356行~1369行那几个使用addWorker的场景。如果是ture,就是和核心线程池比较。如果是false,就和最大线程池比较。如果超出了最大线程池,肯定是没法干活了。正好映射1368行线程池状态正常,但是线程池却不能接受更多任务的另一个场景。

912行:如果此时线程池还能继续接受任务,使用CAS操作对当前执行的任务数+1;如果成功了,去执行任务,如果没成功,则需要重新判断线程池是不是RUNNING状态,如果线程池此时还是正常,则重复908行和尝试912行的CAS操作。如果线程池已经停止,则从901——907行终止自旋。

在看继续往下看:

921行~958行。

921行:定义了任务的启动状态

922行:定义了任务的添加状态。

923行引入了一个关键类Worker:我们来看下Worker的类图:

Worker类图

一个实现的Runable接口的类,同时继承了并行框架AQS的一个类(这里暂时不细讲AQS是干啥的,你只需要了解他是从Java层面实现锁的一个框架就好了)。

我们来看看Work类的实现:

不难看出,这个类持有了一个线程。而且核心的run方法调用的是线程池ThreadPoolExecutor的runWorker方法,并把自己传入。

我们来具体分析runWorker(Worker w) 方法。

我们做个假设:如果我们是第一次提交任务。1122行拿到的就是我们的任务。1124行的代码很让人疑惑,为什么上来先解锁。从注释来看是允许中断。允许中断?此时看是一脸懵逼的,但是如果现在告诉你线程池还提供了一个shutdownNow方法来停止线程池,而且此方法是不等正在执行的任务执行完就强制中断,你或许就清楚这句话是干嘛的了。

runWork的核心代码在1127行。如果是第一次提交任务则直接执行该任务。如果不是第一次提交任务。则是从阻塞队列中获取任务来执行。

1333行~1137行,我们一步步看:

1、假设线程池被停止了,runStateAtLeast(ctl.get(), STOP)返回的是ture,则我们就没必要看(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))了,直接看wt.isInterrupted,这个代表work的线程是不是中断的,如果wt 已经被中断则返回true,否则返回false。这样我们就可以得到这样的表达式结果,线程池已经停了,而且线程也被停止了,那就不管了。线程池停了,但是线程线程没有被中断,则需要将线程中断。

2、假设线程池状态正常,runStateAtLeast(ctl.get(), STOP)返回的是false,为什么还执行Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)呢?这个表达式不是怎么都会返回false吗?答案就再于Thread.interrupted()其实是为了解除线程的中断状态。如果线程本来是中断的,这个表达式返回的是true,如果之前线程没有被中断,则反回的是false。&&runStateAtLeast(ctl.get(), STOP)是为了不影响结果。这样(runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)))肯定得到的是个false,这样就保证了线程池如果不是停止状态,对应的线程也能继续的再正常的执行任务了。

如果当时线程池的运行的情况是我们的假设2。则就会执行到1141行,将任务执行。

我们再回过头来看addWorker方法:

925行,如果我们是第一次提交任务,创建一个新的Work也等同于创建一个新的线程来执行任务。

928行,获取线程池的可重入锁,并且加锁。这里是为了保证只有一个线程往workers中增加任务。

936行,如果当前线程池是可执行状态,或者线程池是要停止的状态,但是并未提交新任务,都可以让线程池继续执行任务。

938行,判断work持有的线程,预先检查该线程是否可启动,如果已经被启动,则抛出异常。否则将work加入到工作池中。同时更新当前线程的的任务数。并且启动任务执行。如果任务启动成功,则返回成功,如果失败了,执行957行的addWorkerFailed方法将任务从任务池中删除。并减少执行任务的数量。并且尝试终止线程池。

线程池的实现基本分析完了,如果哪里有不对的地方,烦请指正。

上一篇下一篇

猜你喜欢

热点阅读