Java线程池深度研究

2020-01-14  本文已影响0人  善思者_tin

一、概述

池化技术相比大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理。如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题:

如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。

那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?

这就是线程池的目的了。线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。

二、线程池的应用场景

异步通知方式

2.1、消息分发

目前持久化数据库保存很多消息,我们需要将消息发送给对应的用户,那么当向很多用户发消息的时候,为了提高发送效率,我们可以使用线程池进行发送。比如用户认证成功发送消息等

如何进行发送?

result =sendService.sendMsg(dto);//service是一个接口,我们可以通过传递对应的实现,并且这个实现实现了runnable.

问题:如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了时,该如何进行处理?请看如下分析

三、(重要)ThreadPoolExecutor 类

线程池实现类 ThreadPoolExecutor 是 Executor 框架最核心的类。

ThreadPoolExecutor 3 个最重要的参数:

corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。

maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。

workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,信任就会被存放在队列中。

ThreadPoolExecutor其他常见参数:

keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;

unit : keepAliveTime 参数的时间单位。

threadFactory :executor 创建新线程的时候会用到。

handler :饱和策略。关于饱和策略下面单独介绍一下。

配置包含两种方式:xml和代码

xml配置:

<bean id="taskSendNtfExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">

    <property name="corePoolSize" value="8"/>

    <property name="maxPoolSize" value="64"/>

    <property name="queueCapacity" value="320"/>

    <property name="keepAliveSeconds" value="60"/>

</bean>

代码配置:

ThreadPoolTaskExecutor executor =new ThreadPoolTaskExecutor();

//初始线程大小

executor.setCorePoolSize(APPExecuteEnum.CORE_POOL_SIZE.getValue());

//最大线程大小

executor.setMaxPoolSize(APPExecuteEnum.MAX_POOL_SIZE.getValue());

//设置队列长度

executor.setQueueCapacity(APPExecuteEnum.QUEUE_CAPACITY.getValue());

//设置多长时间,线程回收

executor.setKeepAliveSeconds(APPExecuteEnum.KEEP_ALIVE_SECONDS.getValue());;

executor.initialize();

return executor;

ThreadPoolExecutor 饱和策略定义:

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了时,ThreadPoolTaskExecutor 定义一些策略:

ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。

ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。

ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。

ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。

例如:

Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 ThreadPoolExecutor 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了。)

核心代码(XML和代码配置):

<property name="rejectedExecutionHandler">

<bean class="com.bee.task.core.ThreadPoolException"/>

</property>

ThreadPoolTaskExecutor executor =new ThreadPoolTaskExecutor();

// rejection-policy:当pool已经达到max size的时候,如何处理新任务

// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

四、拒绝策略场景分析

4.1、AbortPolicy

AbortPolicy

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

这是线程池默认的拒绝策略,在任务不能再提交的时候,抛出异常,及时反馈程序运行状态。如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现。

4.2、DiscardPolicy

ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。如果线程队列已满,则后续提交的任务都会被丢弃,且是静默丢弃。

使用此策略,可能会使我们无法发现系统的异常状态。建议是一些无关紧要的业务采用此策略。例如,本人的博客网站统计阅读量就是采用的这种拒绝策略。

4.3、DiscardOldestPolicy

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。

此拒绝策略,是一种喜新厌旧的拒绝策略。是否要采用此种拒绝策略,还得根据实际业务是否允许丢弃老任务来认真衡量。

4.4、CallerRunsPolicy

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

如果任务被拒绝了,则由调用线程(提交任务的线程)直接执行此任务,我们可以通过代码来验证这一点:

五、线程池参数设置

参数设置方法:

   需要根据如下几个值来决定:

          tasks :每秒的任务数,假设为500~1000

          taskcost:每个任务花费时间,假设为0.1s

          responsetime:系统允许容忍的最大响应时间,假设为1s

    计算

          corePoolSize = 每秒需要多少个线程处理? 

          threadcount = tasks/(1/taskcost) =tasks*taskcout =  (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50

           根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可

          queueCapacity = (coreSizePool/taskcost)*responsetime

           计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行

            切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。

          maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)(最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数

            计算可得 maxPoolSize = (1000-80)/10 = 92

          rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理

          keepAliveTime和allowCoreThreadTimeout:采用默认通常能满足

上一篇 下一篇

猜你喜欢

热点阅读