Java基础-线程池
一、线程池
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
1)、使用线程池有哪些优点
- 1、 降低资源消耗
通过重复利用已创建的线程降低线程创建和销毁造成的消耗。因为CPU创建和销毁经上下文切换,会消耗资源。- 2、 提高响应速度,减少CPU的调度
当任务到达时,任务可以不需要等到线程创建就能立即执行。
假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。
它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。- 3、提高线程的可管理性,来维护我们的线程
线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
2)、实现线程池
BlockingQueue blockingQueue = new ArrayBlockingQueue(4);
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
//corePoolSize :线程池中核心线程数的最大值, 核心线程处于运行态,避免线程重建提高效率
//maximumPoolSize :线程池中能拥有最多线程数
//workQueue:用于缓存任务的阻塞队列
//keepAliveTime :表示空闲线程的存活时间,超过这个时间就会消毁
//TimeUnit unit :表示keepAliveTime的单位。
//RejectedExecutionHandler:拒绝策略,当超过最大线程数时,用来拒绝新的任务
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
//JDK 提供的获取CPU可用的核心数
Runtime.getRuntime().availableProcessors(),
//因为操作系统引入了虚拟内存技术,可能出现也缺失,再页缺失时为了把cpu填满所以加1
Runtime.getRuntime().availableProcessors() + 1,
60,
TimeUnit.SECONDS,
blockingQueue,
defaultHandler);
poolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3 * 1000);
System.out.println("-------------helloworld_001---------------" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
3)、线程池的工作机制
- 1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
- 2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
- 3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务。
- 4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
4)、线程池执行示意图
5)、线程池中各个参数
corePoolSize
线程池中的核心线程数
,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;
如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。maximumPoolSize
线程池中允许的最大线程数
。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。keepAliveTime
线程空闲时的存活时间
,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用。TimeUnit
keepAliveTime的时间单位
。workQueue
workQueue必须是BlockingQueue阻塞队列
。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能。
一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。
1)、当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize。
2)、由于1,使用无界队列时maximumPoolSize将是一个无效参数。
3)、由于1和2,使用无界队列时keepAliveTime将是一个无效参数。
4)、更重要的,使用无界queue可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。threadFactory
创建线程的工厂
,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。
Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-数字-thread-数字”。RejectedExecutionHandler
线程池的饱和策略
,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务。
5)、线程池的饱和策略
- 1)AbortPolicy
直接抛出异常,默认策略; - 2)CallerRunsPolicy
用调用者所在的线程来执行任务 - 3)DiscardOldestPolicy
丢弃阻塞队列中靠最前的任务,并执行当前任务 - 4)DiscardPolicy
直接丢弃加入线程池的任务
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
二、 Java通过Executors提供创建四种线程池的静态方法
newCachedThreadPool 创建一个可缓存线程池
,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
适用:执行很多短期异步的小程序或者负载较轻的服务器
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newFixedThreadPool 创建一个定长线程池
,可控制线程最大并发数,超出的线程会在队列中等待。
适用:执行长期的任务,性能好很多
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newScheduledThreadPool 创建一个定时线程池
,支持定时及周期性任务执行。
适用:一个任务一个任务执行的场景
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
newSingleThreadExecutor 创建一个单利的线程池
,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
适用:周期性执行任务的场景
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
例子
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
二、 ExecutorService 的submit() 与execute()区别
-
1、接收的参数不一样
submit()
可以接受runnable和callable 有返回值
execute()
接受runnable 无返回值,所以无法判断任务是否被线程池执行成功。 -
2、submit()方法用于提交需要返回值的任务
线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完 -
3、submit方便Exception处理
意思就是如果你在你的task里会抛出checked或者unchecked exception,而你又希望外面的调用者能够感知这些exception并做出及时的处理,那么就需要用到submit,通过捕获Future.get抛出的异常。
三、关闭线程池
可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。
-
1、shutdown() 只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
-
2、shutdownNow() 首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。
一般分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后调用 shutdownNow(如有必要)取消所有遗留的任务
// 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
threadPool.shutdown();
四、常用阻塞队列 BlockingQueue
什么是阻塞队列
- 1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序整体处理数据的速度。
入对和出队方法
对应关系表
抛出异常
:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queuefull")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。返回特殊值
:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。一直阻塞
:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。超时退出
:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。
有界无界
有限队列
就是长度有限,满了以后生产者会阻塞;无界队列
就是里面能放无数的东西而不会因为队列长度限制被阻塞,当然空间限制来源于系统资源的限制,如果处理不及时,导致队列越来越大越来越大,超出一定的限制致使内存超限,操作系统或者JVM帮你解决烦恼,直接把你 OOM
省事了。
无界也会阻塞,为何?因为阻塞不仅仅体现在生产者放入元素时会阻塞,消费者拿取元素时,如果没有元素,同样也会阻塞。
1、ArrayBlockingQueue
ArrayBlockingQueue 是一个用数组实现的有界阻塞队列
,其内部按先进先出(FIFO)的原则对元素进行排序,默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时有参数可以设置。
//默认非公平阻塞队列
ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
//公平阻塞队列
ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);
2、LinkedBlockingQueue
LinkedBlockingQueue是一个由链表实现的有界队列阻塞队列
,但大小默认值为Integer.MAX_VALUE,此队列按照先进先出的原则对元素进行排序。所以我们在使用LinkedBlockingQueue时建议手动传值,为其提供我们所需的大小,避免队列过大造成机器负载或者内存爆满等情况
3、SynchronousQueue
是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素
。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。
4、DelayQueue
是一个支持延时获取元素的无界阻塞队列
。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
- DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。
缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
5、PriorityBlockingQueue
一个支持优先级排序的无界阻塞队列
。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
6、LinkedTransferQueue
一个由链表结构组成的无界阻塞队列
。
多了tryTransfer和transfer方法
- (1)transfer方法
如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。- (2)tryTransfer方法
tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。
7、LinkedBlockingDeque
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列
。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有First和Last后缀的方法更清楚。在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。
以上的阻塞队列都实现了BlockingQueue接口,也都是线程安全的。
五、合理地配置线程池
要想合理地配置线程池,就必须首先分析任务特性
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
- 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
- 任务的优先级:高、中和低。
- 任务的执行时间:长、中和短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。
CPU密集型任务
应配置尽可能小的线程,如配置Ncpu核心数+1
个线程的线程池。(执行纯计算任务;因为操作系统引入了虚拟内存技术,可能出现页缺失,再页缺失时为了把cpu填满所以加1)
IO密集型任务
线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu核心数
个线程的线程池。(如从网络、磁盘获取资源;因为其远远慢于比CPU的执行速度,没有获取数据时CPU处于等待状态。所以一般设置为二倍的CPU核心数)
混合型的任务
,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数
。
优先级不同的任务
可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。
执行时间不同的任务
可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。
建议使用有界队列
。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。
如果当时我们设置成无界队列
,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。
六、线程池的简单实现
class ExecutorText {
private val mWorkThreads: MutableList<WorkThread> = ArrayList()
val mBlockingDeque: BlockingDeque<Runnable>
var isWorking: Boolean = true
constructor(threadCount: Int, taskRunnableCount: Int) {
this.mBlockingDeque = LinkedBlockingDeque(taskRunnableCount)
for (i in 0..threadCount) {
val work = WorkThread()
work.start()
this.mWorkThreads.add(work)
}
Runtime.getRuntime().availableProcessors()
}
fun execute(runnable: Runnable): Boolean {
//把execute任务加入队列中
return this.mBlockingDeque.offer(runnable)
}
fun shutDown() {
isWorking = false
}
inner class WorkThread : Thread() {
override fun run() {
super.run()
while (isWorking || mBlockingDeque.size > 0) {
var task = mBlockingDeque.poll()
task?.run()
task = null
}
}
//中断线超
fun stopWork() {
interrupt()
}
}
fun onDestroy() {
for (i in 0..mWorkThreads.size){
mWorkThreads[i].stopWork()
}
mBlockingDeque.clear()
}
}
调用
object Text {
@JvmStatic
fun main(argc: Array<String>) {
val executorText = ExecutorText(3, 6)
for (i in 0..10) {
executorText.execute(Runnable {
println(Thread.currentThread().name + "任务开始执行" + i)
})
}
executorText.shutDown()
}
}
七、阻塞队列(DelayQueue)
1)、订单的实体类
public class Order {
private final String orderNo;//订单的编号
private final double orderMoney;//订单的金额
public Order(String orderNo, double orderMoney) {
super();
this.orderNo = orderNo;
this.orderMoney = orderMoney;
}
public String getOrderNo() {
return orderNo;
}
public double getOrderMoney() {
return orderMoney;
}
}
2)、存放的队列的元素
public class ItemVo<T> implements Delayed {
//到期时间,但传入的数值代表过期的时长,传入单位毫秒
private long activeTime;
private T data;//业务数据,泛型
//传入过期时长,单位秒,内部转换
public ItemVo(long expirationTime, T data) {
this.activeTime = expirationTime * 1000 + System.currentTimeMillis();
this.data = data;
}
public long getActiveTime() {
return activeTime;
}
public T getData() {
return data;
}
/*
* 这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。
*/
public long getDelay(TimeUnit unit) {
long d = unit.convert(this.activeTime
- System.currentTimeMillis(), unit);
return d;
}
/*
*Delayed接口继承了Comparable接口,按剩余时间排序,实际计算考虑精度为纳秒数
*/
@Override
public int compareTo(Delayed o) {
long d = (getDelay(TimeUnit.MILLISECONDS)
- o.getDelay(TimeUnit.MILLISECONDS));
if (d == 0) {
return 0;
} else {
if (d < 0) {
return -1;
} else {
return 1;
}
}
}
}
3)、将订单推入队列
public class PutOrder implements Runnable {
private DelayQueue<ItemVo<Order>> queue;
public PutOrder(DelayQueue<ItemVo<Order>> queue) {
this.queue = queue;
}
@Override
public void run() {
//5秒后到期
Order orderTb = new Order("Tb12345", 366);
ItemVo<Order> itemTb = new ItemVo<Order>(5, orderTb);
queue.offer(itemTb);
System.out.println("订单5秒后超时:" + orderTb.getOrderNo() + ";"
+ orderTb.getOrderMoney());
//8秒后到期
Order orderJd = new Order("Jd54321", 366);
ItemVo<Order> itemJd = new ItemVo<Order>(8, orderJd);
queue.offer(itemJd);
System.out.println("订单8秒后超时:" + orderJd.getOrderNo() + ";"
+ orderJd.getOrderMoney());
}
}
4)、取出到期的订单的功能
public class FetchOrder implements Runnable {
private DelayQueue<ItemVo<Order>> queue;
public FetchOrder(DelayQueue<ItemVo<Order>> queue){
this.queue = queue;
}
@Override
public void run() {
while(true) {
try {
ItemVo<Order> item = queue.take();
Order order = (Order)item.getData();
System.out.println("Get From Queue:"+"data="
+order.getOrderNo()+";"+order.getOrderMoney());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
5)、延时队列测试程序
public class Test {
public static void main(String[] args) throws InterruptedException {
DelayQueue<ItemVo<Order>> queue = new DelayQueue<ItemVo<Order>>();//延时队列
new Thread(new PutOrder(queue)).start();
new Thread(new FetchOrder(queue)).start();
//每隔500毫秒,打印个数字
for (int i = 1; i < 15; i++) {
Thread.sleep(500);
System.out.println(i * 500);
}
}
}
八、beforeExecute 和 afterExecute
在线程池执行某个任务前会调用beforeExecute()方法,在任务结束后(任务异常退出)会执行afterExecute()方法。
查看ThreadPoolExecutor源码,在该类中定义了一个内部类Worker, ThreadPoolExecutor线程池中的工作线程就是Worker类的实例,Worker实例在执行时会调用beforeExecute()与afterExecute()方法。
public static void main(String[] args) {
Runnable r = new Runnable() {
@Override
public void run() {
System.out.println("任务数量---");
}
};
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5)){
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("renwu 666");
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("jiueshu");
}
};
for (int i = 0; i < 10; i++) {
executor.submit(r);
}
}