自定义线程池+多线程处理+CountDownLatch

2020-09-30  本文已影响0人  Rain_z

前几天在写同步接口,因为数据量比较大,所以使用多线程,这里写了Demo记录下。

自定义线程池

  1. Java中Executors已经提供了创建线程池的方式,但在阿里巴巴开发手册上是严禁使用的,建议使用自定义线程池,究其原因,是可能会产生一些问题。
public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>());
   }


public static ExecutorService newSingleThreadExecutor() {
       return new FinalizableDelegatedExecutorService
           (new ThreadPoolExecutor(1, 1,
                                   0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue<Runnable>()));
   }


public static ExecutorService newCachedThreadPool() {
       return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                     60L, TimeUnit.SECONDS,
                                     new SynchronousQueue<Runnable>());
   }
  1. 查看源码发现newFixedThreadPool和newSingleThreadExecutor方法他们都使用了LinkedBlockingQueue的任务队列,LikedBlockingQueue的默认大小为Integer.MAX_VALUE。newCachedThreadPool中定义的线程池大小为Integer.MAX_VALUE。

  2. 通过源码发现禁止使用Executors创建线程池的原因就是newFixedThreadPool和newSingleThreadExecutor的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

  3. newCachedThreadPool允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

下面是写了一个Demo演示

public class ThreadPool {


    /**
     * 自定义线程名称,方便的出错的时候溯源
     */
    private static final ThreadFactory NAME_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("order-pool-%d").build();
    //private static final ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder().build();

    /**
     * corePoolSize    线程池核心池的大小
     * maximumPoolSize 线程池中允许的最大线程数量
     * keepAliveTime   当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间
     * unit            keepAliveTime 的时间单位
     * workQueue       用来储存等待执行任务的队列
     * threadFactory   创建线程的工厂类
     * handler         拒绝策略类,当线程池数量达到上线并且workQueue队列长度达到上限时就需要对到来的任务做拒绝处理
     */
    private static final ExecutorService service = new ThreadPoolExecutor(
            4,
            6,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(1024),
            NAME_THREAD_FACTORY,
            new ThreadPoolExecutor.AbortPolicy()
    );

    /**
     * 获取线程池
     * @return 线程池
     */
    public static ExecutorService getEs() {
        return service;
    }

    /**
     * 使用线程池创建线程并异步执行任务
     * @param r 任务
     */
    public static void newTask(Runnable r) {
        service.execute(r);
    }

    public static void main(String[] args) throws InterruptedException {

        String[] arr = {"a","b","c","d","e","f","g","h","1","4","n"};
        CountDownLatch countDownLatch = new CountDownLatch(arr.length);
        List<String> list = Arrays.asList(arr);
        ExecutorService es = getEs();

        System.out.println("开始处理...");
        int size = 0;
        List<Future<Integer>> resultList = new ArrayList<Future<Integer>>();
        try {

            for (String m : list) {
                SendEmail sendEmail = new SendEmail(m, countDownLatch);
                SendEmailCallBack sendEmailCallBack = new SendEmailCallBack(m, countDownLatch);
                Future<Integer> future = es.submit(sendEmailCallBack);
                resultList.add(future);
            }
            System.out.println("主线程等待...");
            countDownLatch.await();
            for (Future<Integer> future : resultList) {
                size += future.get();
            }
            System.out.println("处理数量: "+size);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            es.shutdown();
        }

        System.out.println("处理完成...");
    }

}

业务类

主要实现自己的业务逻辑

public class SendEmailCallBack implements Callable<Integer> {
    private final String mobile;
    private final CountDownLatch countDownLatch;

    public SendEmailCallBack(String mobile, CountDownLatch countDownLatch) {
        this.mobile = mobile;
        this.countDownLatch = countDownLatch;
    }

    private int sendEmail() {
        try {
            Thread.sleep(1000);
            System.out.println("线程:"+Thread.currentThread().getName()+ ", 邮件" + mobile +"发送成功");
        }catch (Exception e){
            e.printStackTrace();
        }
        return 1;
    }

    @Override
    public Integer call() throws Exception {
        int num = 0;
        try {
            synchronized (this){
                 num = this.sendEmail();
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            countDownLatch.countDown();
        }
        return num;
    }
}

CountDownLatch

使用CountDownLatch是为了同步时,主线程等待所有线程执行执行完毕后,获取返回数据,汇总同步总数。

控制台打印结果

开始处理...
主线程等待...
线程:order-pool-0, 邮件a发送成功
线程:order-pool-2, 邮件c发送成功
线程:order-pool-3, 邮件d发送成功
线程:order-pool-1, 邮件b发送成功
线程:order-pool-3, 邮件g发送成功
线程:order-pool-2, 邮件f发送成功
线程:order-pool-0, 邮件e发送成功
线程:order-pool-1, 邮件h发送成功
线程:order-pool-3, 邮件1发送成功
线程:order-pool-2, 邮件4发送成功
线程:order-pool-0, 邮件n发送成功
处理数量: 11
处理完成...
下面单独写了一个线程池创建类,修改下就可以使用,和上面Demo实现无关。
public class ThreadPoolFactory {
    /**
    * 下面的属性大小可以改成获取服务器配置来动态调整
    */
    /**
     *核心线程数
     */
    private static final int corePoolSize = 4;
    /**
     * 最大核心线程数
     */
    private static final int maximumPoolSize = 6;
    /**
     * 工作队列
     */
    private static final int workQueue = 1024;
    /**
     *线程空闲时间
     */
    private static final int keepAliveTime = 30;

    /**
     * 自定义线程名称
     */
    private static final ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("order-pool-%d").build();

    /**
     * corePoolSize    线程池核心池的大小
     * maximumPoolSize 线程池中允许的最大线程数量
     * keepAliveTime   当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间
     * unit            keepAliveTime 的时间单位
     * workQueue       用来储存等待执行任务的队列
     * threadFactory   创建线程的工厂类
     * handler         拒绝策略类,当线程池数量达到上线并且workQueue队列长度达到上限时就需要对到来的任务做拒绝处理
     */
    private static final ExecutorService service = new ThreadPoolExecutor(
            corePoolSize,
            maximumPoolSize,
            keepAliveTime,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(workQueue),
            NAMED_THREAD_FACTORY,
            new ThreadPoolExecutor.AbortPolicy()
    );

    /**
     * 获取线程池
     * @return 线程池
     */
    public static ExecutorService getEs() {
        return service;
    }

    /**
     * 使用线程池创建线程并异步执行任务
     * @param runnable 任务
     */
    public static void newTask(Runnable runnable) {
        service.execute(runnable);
    }

}
上一篇下一篇

猜你喜欢

热点阅读