Spring线程池ThreadPoolTaskExecutor的
1 线程池简介
1.1 为什么使用线程池
- 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
- 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
- 方便线程并发数的管控,因为线程若是无限制的创建,可能会导致内存占用过多而产生
OOM
,并且会造成cpu
过度切换(cpu
切换线程是有时间成本的(需要保持当前执行线程的现场,并恢复要执行线程的现场) - 提供更强大的功能,延时定时线程池
1.2 线程池为什么需要使用队列
因为线程若是无限制的创建,可能会导致内存
占用过多而产生OOM
,并且会造成cpu
过度切换。
创建线程池的消耗较高或者线程池创建线程需要获取mainlock
这个全局锁,影响并发效率,阻塞队列可以很好的缓冲
1.3 线程池为什么要使用阻塞队列而不使用非阻塞队列
阻塞队列
可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait
状态,释放cpu
资源,当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。
使得在线程不至于一直占用cpu
资源。(线程执行完任务后通过循环再次从任务队列中取出任务进行执行,代码片段如:while (task != null || (task = getTask()) != null) {}
)。
不用阻塞队列也是可以的,不过实现起来比较麻烦而已,有好用的为啥不用呢
1.4 如何配置线程池
-
CPU
密集型任务
尽量使用较小的线程池,一般为CPU
核心数+1
。 因为CPU
密集型任务使得CPU
使用率很高,若开过多的线程数,会造成CPU
过度切换 -
IO
密集型任务
可以使用稍大的线程池,一般为2*CPU
核心数。IO
密集型任务CPU
使用率并不高,因此可以让CPU
在等待IO
的时候有其他线程去处理别的任务,充分利用CPU
时间 -
混合型任务
可以将任务分成IO
密集型和CPU
密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效
因为如果划分之后两个任务执行时间有数据级的差距,那么拆分没有意义。
因为先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失
1.5 execute()和submit()方法
-
execute()
,执行一个任务,没有返回值 -
submit()
,提交一个线程任务,有返回值
submit(Callable<T> task)
能获取到它的返回值,通过future.get()
获取(阻塞
直到任务执行完)。一般使用FutureTask+Callable
配合使用
submit(Runnable task, T result)
能通过传入的载体result
间接获得线程的返回值。
submit(Runnable task)
则是没有返回值的,就算获取它的返回值也是null
Future.get()
方法会使取结果的线程进入阻塞状态
,直到线程执行完成之后,唤醒取结果的线程,然后返回结果
1.6 Spring线程池
Spring
通过任务执行器(TaskExecutor
)来实现多线程和并发编程,使用ThreadPoolTaskExecutor
实现一个基于线程池的TaskExecutor
,
还得需要使用@EnableAsync
开启异步,并通过在需要的异步方法那里使用注解@Async
声明是一个异步任务
Spring
已经实现的异常线程池:
-
SimpleAsyncTaskExecutor
:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。 -
SyncTaskExecutor
:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方 -
ConcurrentTaskExecutor
:Executor
的适配类,不推荐使用。如果ThreadPoolTaskExecutor
不满足要求时,才用考虑使用这个类 -
SimpleThreadPoolTaskExecutor
:是Quartz的SimpleThreadPool
的类。线程池同时被quartz和非quartz使用,才需要使用此类 -
ThreadPoolTaskExecutor
:最常使用,推荐。 其实质是对java.util.concurrent.ThreadPoolExecutor
的包装
1.7 @Async调用中的事务处理机制
在@Async
标注的方法,同时也使用@Transactional
进行标注;在其调用数据库操作之时,将无法产生事务管理的控制,原因就在于其是基于异步处理
的操作。
那该如何给这些操作添加事务管理呢?
可以将需要事务管理操作的方法放置到异步方法内部,在内部被调用的方法上添加@Transactional
示例:
-
方法A
, 使用了@Async/@Transactional
来标注,但是无法产生事务控制的目的。 -
方法B
, 使用了@Async
来标注,B
中调用了C、D
,C/D
分别使用@Transactional
做了标注,则可实现事务控制的目的
2 示例
2.1 线程池配置类
package cn.jzh.thread;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@ComponentScan("cn.jzh.thread")
@EnableAsync //开启异步操作
public class TaskExecutorConfig implements AsyncConfigurer {
/**
* 通过getAsyncExecutor方法配置ThreadPoolTaskExecutor,获得一个基于线程池TaskExecutor
*
* @return
*/
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5);//核心线程数
pool.setMaxPoolSize(10);//最大线程数
pool.setQueueCapacity(25);//线程队列
pool.initialize();//线程初始化
return pool;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
配置类中方法说明:
Spring
中的ThreadPoolExecutor
是借助JDK
并发包中的java.util.concurrent.ThreadPoolExecutor
来实现的。其中一些值的含义如下:
-
int corePoolSize:
线程池维护线程的最小数量 -
int maximumPoolSize:
线程池维护线程的最大数量,线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。如果队列中任务已满,并且当前线程个数小于maximumPoolSize
,那么会创建新的线程来执行任务。 -
long keepAliveTime:
空闲线程的存活时间TimeUnit
-
unit:
时间单位,现由纳秒,微秒,毫秒,秒 -
BlockingQueue workQueue:
持有等待执行的任务队列 -
RejectedExecutionHandler
handler 线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施。
当任务添加到线程池中之所以被拒绝,可能是由于:第一,线程池异常关闭。第二,任务数量超过线程池的最大限制。
Reject
策略预定义有四种:
ThreadPoolExecutor.AbortPolicy
策略,是默认的策略,处理程序遭到拒绝将抛出运行时RejectedExecutionException
ThreadPoolExecutor.CallerRunsPolicy
策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃.ThreadPoolExecutor.DiscardPolicy
策略,不能执行的任务将被丢弃.ThreadPoolExecutor.DiscardOldestPolicy
策略,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如
果再次失败,则重复此过程)
2.2 异步方法
@Async
注解可以用在方法上,表示该方法是个异步方法,也可以用在类上,那么表示此类的所有方法都是异步方法
异步方法会自动注入使用ThreadPoolTaskExecutor
作为TaskExecutor
package cn.jzh.thread;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.Future;
@Service
public class AsyncTaskService {
/**
*
* @param i
*/
@Async
public void executeAsync(Integer i) throws Exception{
System.out.println("线程ID:" + Thread.currentThread().getId() + "线程名字:" +Thread.currentThread().getName()+"执行异步任务:" + i);
}
@Async
public Future<String> executeAsyncPlus(Integer i) throws Exception {
System.out.println("线程ID:" + Thread.currentThread().getId() +"线程名字:" +Thread.currentThread().getName()+ "执行异步有返回的任务:" + i);
return new AsyncResult<>("success:"+i);
}
}
2.3 启动测试
package cn.jzh.thread;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import java.util.concurrent.Future;
public class MainApp {
public static void main(String[] args) throws Exception{
System.out.println("主线程id:" + Thread.currentThread().getId() + "开始执行调用任务...");
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TaskExecutorConfig.class);
AsyncTaskService service = context.getBean(AsyncTaskService.class);
for (int i = 0;i<10;i++){
service.executeAsync(i);
Future<String> result = service.executeAsyncPlus(i);
System.out.println("异步程序执行结束,获取子线程返回内容(会阻塞当前main线程)" + result.get());
}
context.close();
System.out.println("主线程id:" + Thread.currentThread().getId() + "程序结束!!");
}
}
注意:
- 是否影响主线程
如果main
主线程不去获取子线程的结果(Future.get()
),那么主线程完全可以不阻塞。那么,此时,主线程和子线程完全异步。此功能,可以做成类似MQ
消息中间件之类的,消息异步进行发送 - 判断是否执行完毕
当返回的数据类型为Future
类型,其为一个接口。具体的结果类型为AsyncResult
,这个是需要注意的地方。
调用返回结果的异步方法,判断是否执行完毕时需要使用future.isDone()
来判断是否执行完毕
public void testAsyncAnnotationForMethodsWithReturnType()
throws InterruptedException, ExecutionException {
System.out.println("Invoking an asynchronous method. " + Thread.currentThread().getName());
Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType();
while (true) { ///这里使用了循环判断,等待获取结果信息
if (future.isDone()) { //判断是否执行完毕
System.out.println("Result from asynchronous process - " + future.get());
break;
}
System.out.println("Continue doing something else. ");
Thread.sleep(1000);
}
}
这些获取异步方法的结果信息,是通过不停的检查Future
的状态来获取当前的异步方法是否执行完毕来实现的