Java并发 --- 线程池解析
线程池是什么,有什么好处?简述线程池中线程复用原理?
线程是一个重资源,JVM 中的线程与操作系统的线程是一对一的关系,所以在 JVM 中每创建一个线程就需要调用操作系统提供的 API 创建线程,赋予资源,并且销毁线程同样也需要系统调用。而系统调用就意味着上下文切换等开销,并且线程也是需要占用内存的,而内存也是珍贵的资源。因此线程的创建和销毁是一个重操作(系统调用),并且线程本身也占用资源。
总结补充:
- Java中线程与操作系统线程是一比一的关系。
- 线程的创建和销毁是一个“较重”的操作。
- 多线程的主要是为了提高 CPU 的利用率。
- 线程的切换有开销,线程数的多少需要结合 CPU核心数与 I/O 等待占比。
线程池概述:
- 线程池将线程和任务进行解耦,线程是线程,任务是任务,摆脱之前通过 Thread 创建线程时一个线程必须对应一个任务的限制。
- 线程池采用生产者-消费者设计模式。线程池的使用方是生产者(任务),线程池本身是消费者,阻塞队列来存储要处理的任务。
- 线程池主要特点:线程复用;控制最大并发数;管理线程。
生产者-消费者设计模式
- 生产者和消费者之间通过缓冲区(通常是一个阻塞队列)实现通讯,,生产者将产生的数据放入缓冲区,消费者从缓冲区中获取数据。本质上和消息中间件MQ是一样的。
为什么要使用生产者消费者模式?
- 并发 (异步):生产者直接调用消费者,两者是同步(阻塞)的,如果消费者吞吐数据很慢,这时候生产者白白浪费大好时光。而使用这种模式之后,生产者将数据丢到缓冲区,继续生产,完全不依赖消费者,程序执行效率会大大提高。
- 解耦:生产者和消费者之间不直接依赖,通过缓冲区通讯,将两个类之间的耦合度降到最低。
使用线程池的好处:
- 降低资源消耗:线程复用提高利用率,避免频繁创建和销毁线程(费时且消耗内存等系统资源)
- 提高响应速度:当任务提交时,可以不用等待创建线程,能立即执行
- 提高线程的可管理性:线程是稀缺资源,使用线程池可以对线程进行统一分配、调优和监控,更利于线程管理
线程池线程复用原理:在线程池中,同一个线程可以从阻塞队列中不断获取新任务来执行。
- 核心原理:线程池对Thread 进行了封装,并不是每次执行任务都会调用 Thread.start() 来创建新线程,而是让每个线程去执行一个“循环任务”,在这个“循环任务”中不停检查是否有任务需要被执行:如果有则直接执行,也就是调用任务中的 run 方法,将 run 方法当成一个普通的方法执行,通过这种方式只使用固定的线程就将所有任务的 run 方法串联起来。
创建线程池的方法有哪些?有哪些问题?自定义线程池注意点?
先回忆一下线程创建的方法:
- 继承Thread类:Thread是类,有单继承的局限性。
- 实现Runnable接口:任务和线程分开,不能返回执行结果。
- 实现Callable接口:利用FutureTask执行任务,能通过futureTask.get()取到执行结果。
但是我们工作中一般不这样来创建线程。原因:虽然在 Java 语言中创建线程看上去就像创建一个对象一样简单,只需要 new Thread() 就可以了,但实际上创建线程远不是创建一个对象那么简单。创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了,所以线程是一个重量级的对象,应该避免频繁创建和销毁。线程池线程复用刚好可以解决这一个问题。
ps:阿里开发手册:【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
从jdk1.5版本开始,在java.uitl.concurrent包下面定义定义了一些与并发相关的类,其中线程池最核心的一个类是ThreadPoolExecutor。
通过 Executors 的静态工厂方法创建线程池的四种方法:
- newSingleThreadExecutor:单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行任务
- newFixedThreadExecutor(n):固定数量的线程池,每提交一个任务就是一个线程,直到达到线程池的最大数量,然后后面进入等待队列直到前面的任务完成才继续执行
- newCacheThreadExecutor:可缓存线程池,当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程,当有任务来时,又智能的添加新线程来执行。
- newScheduleThreadExecutor:大小无限制的线程池,支持定时和周期性的执行线程
阿里开发手册:【强制】线程池不允许使用Executors去创建,而是通过ThreadPooLExecutor
的方式,这样的处理方式让写的同学更加明确线程池的运行规则(避免上述四种方法的默认实现),规避资源耗尽的风险。说明:Executors返回的线程池对象的弊端如下:
-
FixedThreadPool
和SingleThreadPool
:允许的请求队列长度为Integer.MAX_VALUE
,可能会堆积大量的请求,导致OOM
(out of memory)。 -
CachedThreadPool
和ScheduledThreadPool
:允许的创建线程数量为Integer.MAX_VALUE
,可能会创建大量的线程,导致OOM
。
最终都有可能导致OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列,并设置最大线程数!
如何合理自定义线程池ThreadPoolExecutor:上述四种线程池的最终方式也是调用的ThreadPoolExecutor的构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
-
Runtime.getRuntime().availableProcessors()
查看核心数 -
corePoolSize
:1或者0 -
maximumPoolSize
,分情况考虑,要看业务是CPU密集型还是IO密集型- CPU密集型:该任务需要大量的运算,而没有阻塞,CPU一直全速运行。需要尽可能少的线程,设置:cpu核数+1个线程
- IO密集型:该任务需要大量的IO,即大量的阻塞,使用多线程可以加速程序运行,如数据库读写。大部分线程都阻塞,故需要多配制线程数。参考设置:CPU核数/(1-阻塞系数阻塞系数在0.8~0.9之间)
创建线程池有哪些参数(构造函数中参数)?
- corePoolSize :核心线程数,也就是正常情况下创建工作的线程数,这些线程创建后并不会消除,是一种常驻线程
- maxinumPoolSize :最大线程数,它与核心线程数相对应,表示最大允许被创建的线程数,比如当前任务较多,将核心线程数都用完了,还无法满足需求时,此时就会创建新的线程,但是线程池内线程总数不会超过最大线程数
- keepAliveTime 、 unit :超出核心线程数之外的线程的存活时间,线程池其实想要的只是核心线程数个线程,但是又预留了一些数量来预防突发状况,当突发状况过去之后,线程池希望只维持核心线程数的线程,所以就弄了个 KeepAliveTime,当线程数大于核心数之后,如果线程空闲了一段时间(KeepAliveTime),就回收线程,直到数量与核心数持平。
- workQueue :任务队列(阻塞队列),用来存放待执行的任务,起到一个缓冲作用,假设我们现在核心线程都已被使用,还有任务进来则全部放入队列,直到整个队列被放满但任务还再持续进入则会开始创建新的线程;具体队列长度需要结合线程数,任务的执行时长,能承受的等待时间等。
- ThreadFactory :线程工厂,用来生产线程执行任务。我们可以选择使用默认的创建工厂,产生的线程都在同一个组内,拥有相同的优先级,且都不是守护线程。当然我们也可以选择自定义线程工厂,一般我们会根据业务来制定不同的线程工厂
-
Handler 任务拒绝策略:JDK中已经预设了4种线程池拒绝策略:
-
CallerRunsPolicy(调用者运行策略):当触发拒绝策略时,只要线程池没有关闭,就由提交任务的当前线程处理。
使用场景:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了。
-
CallerRunsPolicy(调用者运行策略):当触发拒绝策略时,只要线程池没有关闭,就由提交任务的当前线程处理。
public satatic class CallerRunsPolicy implements RejectExecutionHandler{
public CallerRunsPolicy(){}
public void rejectedExecution(Runnable r,ThreadPoolExecutor e){
if(!e.isShutdown()){
r.run();
}
}
}
-
AbortPolicy(中止策略):当触发拒绝策略时,直接抛出拒绝执行的异常,中止策略的意思也就是打断当前执行流程
使用场景:这个就没有特殊的场景了,但是有一点要正确处理抛出的异常。ThreadPoolExecutor中默认的策略就是AbortPolicy,ExecutorService接口的系列ThreadPoolExecutor因为都没有显示的设置拒绝策略,所以默认的都是这个。但是请注意,ExecutorService中的线程池实例队列都是无界的,也就是说把内存撑爆了都不会触发拒绝策略。当自己自定义线程池实例时,使用这个策略一定要处理好触发策略时抛的异常,因为他会打断当前的执行流程。
public static class AbortPolicy implements RejectedExecutionHandler{
public AbortPolicy(){}
public void rejectedExecution(Runnable r,ThreadPoolExecutor e){
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
-
DiscardPolicy(丢弃策略):直接静悄悄的丢弃这个任务,不触发任何动作。
使用场景:如果你提交的任务无关紧要,你就可以使用它 。因为它就是个空实现,会悄无声息的吞噬你的的任务。所以这个策略基本上不用了
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
-
DiscardOldestPolicy(弃老策略):如果线程池未关闭,就弹出队列头部的元素(先进先出),即丢弃这个任务。
使用场景:这个策略还是会丢弃任务,丢弃时也是毫无声息,但是特点是丢弃的是老的未执行的任务,而且是待执行优先级较高的任务。基于这个特性,想到的场景就是,发布消息和修改消息,当消息发布出去后,还未执行,此时更新的消息又来了,这个时候未执行的消息的版本比现在提交的消息版本要低就可以被丢弃了。因为队列中还有可能存在消息版本更低的消息会排队执行,所以在真正处理消息的时候一定要做好消息的版本比较。
public static class DiscardOldestPolicy implements RejectExecutionHandler{
public DiscardOldestPolicy(){}
public void rejectedExecution(Runnable r,ThreadPoolExecutor e){
if(!e.isShutdown()){
e.getQueue().poll();
e.execute(r);
}
}
}
ps:还可以自定义实现(实现RejectExecutionHandler接口即可)和第三方实现的拒绝策略,比如Netty。
Netty中的实现很像JDK中的CallerRunsPolicy(调用者运行策略),舍不得丢弃任务。不同的是,CallerRunsPolicy是直接在调用者线程执行的任务。而Netty是新建了一个线程来处理的。所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了。但是也要注意一点,Netty的实现里,在创建线程时未做任何的判断约束,也就是说只要系统还有自由就会创建新的线程来处理,直到new不出新的线程了,才会抛创建线程失败的异常
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy(){
super();
}
public void rejectedExecution(Runnable r,ThreadPoolThread e){
try{
final Thread t = new Thread(r,"Temporary task executor");
t.start();
}catch(Throwable e){
throw new RejectedExecutionException("Failed to start a new thread ",e);
}
}
}
线程池都有哪几种工作队列
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
- SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
线程池处理任务的流程?(或execute() 方法的执行机制?)
当提交一个新的任务到线程池时,线程池的处理流程如下:
- 检查核心线程数:如果线程池中正在执行任务的线程数是否达到corePoolSize,如果没有,即使有空闲线程,也会创建一个新的线程执行任务;
- 检查任务队列:如果核心线程池已满,工作队列未满,将新提交的任务加入工作队列;
- 检查最大线程数:如果工作队列已满,线程池正在执行的线程数小于最大线程数,就创建一个线程来执行新的任务。否则,执行任务拒绝策略;
ps:线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后还会循环获取工作队列中的任务来执行。
特别注意:
- 当线程数达到核心数的时候,任务是先入队,而不是先创建最大线程数。没达到核心线程数,即使存在空闲线程,也会创建一个新线程执行这个新任务。
- 线程池虽说默认是懒创建线程,但是它实际是想要快速拥有核心线程数的线程(处理日常任务的中坚力量),而最大线程数其实是为了应付突发状况,回归正常后,淘汰线程数量至核心线程数。
- 线程池本意只是让核心数量的线程工作着,而任务队列起到一个缓冲的作用。最大线程数这个参数更像是无奈之举,在最坏的情况下做最后的努力,去新建线程去帮助消化任务。
- 有个 allowCoreThreadTimeOut 方法,把它设置为 true ,则所有线程都会超时,不会有核心数那条线的存在。具体是会调用 interruptIdleWorkers 这个方法。
向线程池提交任务的execute()和submit()方法的区别?
- 接收参数:execute()只能执行 Runnable 类型的任务。submit()可以执行 Runnable 和 Callable 类型的任务。
- 返回值(核心):execute() ⽅法⽤于提交不需要返回值的任务,所以⽆法判断任务是否被线程池执行成功与否;submit() ⽅法⽤于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future.get()方法来获取返回值。get()会阻塞当前线程(即调用future.get()的线程)直到任务执行完成。而get(long timeout,TimeUnit unit)方法会阻塞一点时间后返回,此时任务可能还没有执行完。
- 异常处理:submit()方便Exception处理
线程池中阻塞队列的作用?为什么是先添加队列而不是先创建最大线程?
阻塞队列作用:
- 一般的队列只能保证作为一个有限长度的缓冲区,如果超出了缓冲长度,就无法保留当前的任务了,阻塞队列通过阻塞可以保留住当前想要继续入队的任务。
- 保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源。
- 阻塞队列自带阻塞和唤醒的功能,不需要额外处理,无任务执行时,线程池利用阻塞队列的take方法挂起,从而维持核心线程的存活、不至于一直占用cpu资源。
添加队列而不是先创建最大线程原因:
-
在创建新线程的时候,是要获取全局锁的,这个时候其它的就得阻塞,先添加阻塞队列主要是起到一个缓冲作用,保证了执行效率。
-
举例:就好比一个企业里面有10个(core)正式工的名额,最多招10个正式工,要是任务超过正式工人数(task > core)的情况下,工厂领导(线程池)不是首先扩招工人,还是这10人,但是任务可以稍微积压一下,即先放到队列去(代价低)。10个正式工慢慢干,迟早会干完的,要是任务还在继续增加,超过正式工的加班忍耐极限了(队列满了),就的招外包帮忙了(注意是临时工)要是正式工加上外包还是不能完成任务,那新来的任务就会被领导拒绝了(线程池的拒绝策略)。
线程池有几种状态吗?
- RUNNING:能接受新任务,并处理阻塞队列中的任务
- SHUTDOWN:不接受新任务,但是可以处理阻塞队列中的任务
- STOP:不接受新任务,并且不处理阻塞队列中的任务,并且还打断正在运行任务的线程,就是直接撂担子不干了!
- TIDYING:所有任务都终止,并且工作线程也为0,处于关闭之前的状态
- TERMINATED:已关闭。
如何关闭线程池?
可以调用 shutdown 或 shutdownNow 方法关闭线程池,原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法中断线程,无法响应中断的任务可能永远无法终止。
- shutdownNow 首先将线程池的状态设为 STOP,然后尝试停止正在执行或暂停任务的线程,并返回等待执行任务的列表。
- shutdown 只是将线程池的状态设为 SHUTDOWN,然后中断没有正在执行任务的线程。
通常调用 shutdown 来关闭线程池,如果任务不一定要执行完可调用 shutdownNow。
手写一个简易的线程池
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TreadPoolDemo {
/** 利用阻塞队列实现生产者-消费者模式 */
BlockingQueue<Runnable> workQueue;
/** 保存内部工作线程 */
List<WorkThread> workThreadList = new ArrayList<>();
/** 构造函数传入核心线程数和阻塞队列 */
TreadPoolDemo(int poolSize, BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
for (int i = 0; i < poolSize; i++) {
WorkThread workThread = new WorkThread();
workThread.start();
workThreadList.add(workThread);
}
}
/** 放入任务,如果没有空间,则阻塞 */
void execute(Runnable command) {
try {
workQueue.put(command);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** 自定义工作线程 */
class WorkThread extends Thread {
@Override
public void run() {
// 循环取任务并执行
while (true) {
Runnable task = null;
try {
// 获取阻塞队列的第一个任务,并删除
// 如果没有元素,则会阻塞等待
task = workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
task.run();
}
}
}
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(5);
TreadPoolDemo pool = new TreadPoolDemo(2, workQueue);
for (int i = 0; i < 10; i++) {
int num = i;
pool.execute(()->{
System.out.println("线程 " + num + " 执行");
});
}
}
}
执行结果:
线程 0 执行
线程 1 执行
线程 2 执行
线程 4 执行
线程 3 执行
线程 5 执行
线程 6 执行
线程 7 执行
线程 8 执行
线程 9 执行
巨人的肩膀:
https://blog.csdn.net/wolf909867753/article/details/77500625
https://blog.csdn.net/Kurry4ever_/article/details/109294661
https://blog.csdn.net/zj57356498318/article/details/102579980
https://mp.weixin.qq.com/s/mpKipT8tEmSCDUWi_lj-Bw
https://blog.csdn.net/qq_29373285/article/details/85238728
https://blog.csdn.net/zzti_erlie/article/details/91999145