【多线程】线程池

2021-11-12  本文已影响0人  嘻洋洋

1.概念

1.1 创建线程池的方法

创建线程池有两种方法:Executors和ThreadPoolExecutor。线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

1.2 关系

Executors和ThreadPoolExecutor之间也是有关系的,Executors类的底层实现便是ThreadPoolExecutor

2.ThreadPoolExecutor

创建时需要手动设置线程池参数

2.1 创建时参数说明

2.2 线程池的启动流程

(1)创建线程池,设置各个参数, 此时没有一个线程
(2)execute或submit传入一个任务,同时会唤醒休眠的线程。
(3)先判断线程数达到最大线程数没有, 如果没有达到就新建一个线程来运行任务
(4)如果达到最大线程数,就开始会把任务加入队列中
(5)任务队列也加满了,就会执行拒绝策略,
(6)当某个线程运行完任务后, 会再次从队列中获取新的任务运行。
(7)如果队列中没有任务,线程会休眠,休眠时间是传入的时间
(8)某个线程休眠结束后,会再次从任务队列中获取任务,如果任务队列是空的, 则判断当前存活线程数是否大于核心线程数, 如果大于则这个线程就会死亡。
(9)如果小于或者等于最小核心线程, 就会继续休眠。

例如:refundNotify,invoiceNotify,timingRetryNotify 是线程任务

//获取系统处理器个数,作为线程池数量
int nThreads = Runtime.getRuntime().availableProcessors();
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("member-pool-%d").build();
//创建线程池
ExecutorService pool = new ThreadPoolExecutor(5, 200,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
//传入任务,执行
pool.execute(refundNotify);
pool.execute(invoiceNotify);
pool.execute(timingRetryNotify);

3.Executors

Executors可以创建四种类型线程池(四个静态方法),下面一一介绍并解释不推荐使用Executors来创建线程池的原因

3.1 newCachedThreadPool

它是一个可以无限扩大的线程池,可能会创建大量的线程,从而导致 OOM。它比较适合处理执行时间比较小的任务;

public static ExecutorService newCachedThreadPool(){
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>());
}

参数说明:
corePoolSize为0,maximumPoolSize为无限大(Integer.MAX_VALUE),意味着线程数量可以无限大,
keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;
采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

3.2 newFixedThreadPool

它是一种固定大小的线程池,允许的请求队列长度为 Integer.MAX_VALUE(无界队列),可能会堆积大量的请求,从而导致 OOM。

public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

参数说明:
corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;
阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;
由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。

3.3 newSingleThreadExecutor

和newFixedThreadPool基本一样,唯一区别是它只会创建一条工作线程处理任务,所以也会可能导致 OOM。

public static ExecutorService newSingleThreadExecutor(){
    return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

3.4 newScheduledThreadPool

它用来处理延时任务或定时任务。一般可使用spring的定时任务模块代替。它接收SchduledFutureTask类型的任务,有两种提交任务的方式:scheduledAtFixedRate和scheduledWithFixedDelay。
(1)scheduleAtFixedRate
scheduleAtFixedRate() 指的是“以固定的频率”执行,period(周期)指的是两次成功执行之间的时间。如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
(2)scheduledWithFixedDelay
scheduleWithFixedDelay() 指的是“以固定的延时”执行,delay(延时)指的是一次执行终止和下一次执行开始之间的延迟。前一个任务没有执完,不会继续下一个任务。以下是监控pos打印失败,失败任务重新打印的例子:

    private static ScheduledExecutorService scheduledThreadPool = Executors
           .newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
     private static boolean ERROR_MONITOR_THREAD_INITED = false;
     static {
       if (!ERROR_MONITOR_THREAD_INITED) {
           scheduledThreadPool.scheduleWithFixedDelay(new Runnable() {
               @Override
               public void run() {
                   PrintTask printTask;
                   LOGGER.debug("=============Start a error print job process thread!~===========");
                   try {
                       while (true) {
                         //如果队列为空,则阻塞,就一直等待有数据进来(挂起)
                           printTask = errorJobQueue.take();
                           PrintTaskManager.doPrint(printTask);
                       }
                   } catch (Exception e) {
                   }
               }
           }
           , 0, 5, TimeUnit.SECONDS);
           ERROR_MONITOR_THREAD_INITED = true;
       }
   }
上一篇 下一篇

猜你喜欢

热点阅读