自定义线程池+多线程处理+CountDownLatch
2020-09-30 本文已影响0人
Rain_z
前几天在写同步接口,因为数据量比较大,所以使用多线程,这里写了Demo记录下。
自定义线程池
- Java中Executors已经提供了创建线程池的方式,但在阿里巴巴开发手册上是严禁使用的,建议使用自定义线程池,究其原因,是可能会产生一些问题。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
-
查看源码发现newFixedThreadPool和newSingleThreadExecutor方法他们都使用了LinkedBlockingQueue的任务队列,LikedBlockingQueue的默认大小为Integer.MAX_VALUE。newCachedThreadPool中定义的线程池大小为Integer.MAX_VALUE。
-
通过源码发现禁止使用Executors创建线程池的原因就是newFixedThreadPool和newSingleThreadExecutor的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
-
newCachedThreadPool允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
下面是写了一个Demo演示
public class ThreadPool {
/**
* 自定义线程名称,方便的出错的时候溯源
*/
private static final ThreadFactory NAME_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("order-pool-%d").build();
//private static final ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder().build();
/**
* corePoolSize 线程池核心池的大小
* maximumPoolSize 线程池中允许的最大线程数量
* keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间
* unit keepAliveTime 的时间单位
* workQueue 用来储存等待执行任务的队列
* threadFactory 创建线程的工厂类
* handler 拒绝策略类,当线程池数量达到上线并且workQueue队列长度达到上限时就需要对到来的任务做拒绝处理
*/
private static final ExecutorService service = new ThreadPoolExecutor(
4,
6,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
NAME_THREAD_FACTORY,
new ThreadPoolExecutor.AbortPolicy()
);
/**
* 获取线程池
* @return 线程池
*/
public static ExecutorService getEs() {
return service;
}
/**
* 使用线程池创建线程并异步执行任务
* @param r 任务
*/
public static void newTask(Runnable r) {
service.execute(r);
}
public static void main(String[] args) throws InterruptedException {
String[] arr = {"a","b","c","d","e","f","g","h","1","4","n"};
CountDownLatch countDownLatch = new CountDownLatch(arr.length);
List<String> list = Arrays.asList(arr);
ExecutorService es = getEs();
System.out.println("开始处理...");
int size = 0;
List<Future<Integer>> resultList = new ArrayList<Future<Integer>>();
try {
for (String m : list) {
SendEmail sendEmail = new SendEmail(m, countDownLatch);
SendEmailCallBack sendEmailCallBack = new SendEmailCallBack(m, countDownLatch);
Future<Integer> future = es.submit(sendEmailCallBack);
resultList.add(future);
}
System.out.println("主线程等待...");
countDownLatch.await();
for (Future<Integer> future : resultList) {
size += future.get();
}
System.out.println("处理数量: "+size);
}catch (Exception e){
e.printStackTrace();
}finally {
es.shutdown();
}
System.out.println("处理完成...");
}
}
业务类
主要实现自己的业务逻辑
public class SendEmailCallBack implements Callable<Integer> {
private final String mobile;
private final CountDownLatch countDownLatch;
public SendEmailCallBack(String mobile, CountDownLatch countDownLatch) {
this.mobile = mobile;
this.countDownLatch = countDownLatch;
}
private int sendEmail() {
try {
Thread.sleep(1000);
System.out.println("线程:"+Thread.currentThread().getName()+ ", 邮件" + mobile +"发送成功");
}catch (Exception e){
e.printStackTrace();
}
return 1;
}
@Override
public Integer call() throws Exception {
int num = 0;
try {
synchronized (this){
num = this.sendEmail();
}
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
return num;
}
}
CountDownLatch
使用CountDownLatch是为了同步时,主线程等待所有线程执行执行完毕后,获取返回数据,汇总同步总数。
控制台打印结果
开始处理...
主线程等待...
线程:order-pool-0, 邮件a发送成功
线程:order-pool-2, 邮件c发送成功
线程:order-pool-3, 邮件d发送成功
线程:order-pool-1, 邮件b发送成功
线程:order-pool-3, 邮件g发送成功
线程:order-pool-2, 邮件f发送成功
线程:order-pool-0, 邮件e发送成功
线程:order-pool-1, 邮件h发送成功
线程:order-pool-3, 邮件1发送成功
线程:order-pool-2, 邮件4发送成功
线程:order-pool-0, 邮件n发送成功
处理数量: 11
处理完成...
下面单独写了一个线程池创建类,修改下就可以使用,和上面Demo实现无关。
public class ThreadPoolFactory {
/**
* 下面的属性大小可以改成获取服务器配置来动态调整
*/
/**
*核心线程数
*/
private static final int corePoolSize = 4;
/**
* 最大核心线程数
*/
private static final int maximumPoolSize = 6;
/**
* 工作队列
*/
private static final int workQueue = 1024;
/**
*线程空闲时间
*/
private static final int keepAliveTime = 30;
/**
* 自定义线程名称
*/
private static final ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("order-pool-%d").build();
/**
* corePoolSize 线程池核心池的大小
* maximumPoolSize 线程池中允许的最大线程数量
* keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间
* unit keepAliveTime 的时间单位
* workQueue 用来储存等待执行任务的队列
* threadFactory 创建线程的工厂类
* handler 拒绝策略类,当线程池数量达到上线并且workQueue队列长度达到上限时就需要对到来的任务做拒绝处理
*/
private static final ExecutorService service = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(workQueue),
NAMED_THREAD_FACTORY,
new ThreadPoolExecutor.AbortPolicy()
);
/**
* 获取线程池
* @return 线程池
*/
public static ExecutorService getEs() {
return service;
}
/**
* 使用线程池创建线程并异步执行任务
* @param runnable 任务
*/
public static void newTask(Runnable runnable) {
service.execute(runnable);
}
}