Java线程池ThreadPoolExecutor类使用详解

2021-07-14  本文已影响0人  GuangHui

ThreadPoolExecutor提供了四个构造方法:


我们以最后一个构造方法(参数最多的那个),对其参数进行解释:

 public ThreadPoolExecutor(int corePoolSize, // 1
                              int maximumPoolSize,  // 2
                              long keepAliveTime,  // 3
                              TimeUnit unit,  // 4
                              BlockingQueue<Runnable> workQueue, // 5
                              ThreadFactory threadFactory,  // 6
                              RejectedExecutionHandler handler ) { //7
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

(1) corePoolSize(核心线程池大小):保持在线程池中的核心线程数,对于这些线程,即便它们是空闲的,也不会进行回收,除非设置了参数allowCoreThreadTimeOut,即核心线程的超时时间。
(2)maximumPoolSize(最大线程池大小):允许在线程池中保存的最大线程数。
(3)keepAliveTime(线程最大空闲时间):当线程池中的线程数大于corePoolSize(核心线程池大小)时,等待新任务的空闲线程存活的最大空闲时间。
(4)unit(TimeUnit-时间单位):参数keepAliveTime(线程最大空闲时间)的时间单位。
(5) workQueue(BlockingQueue<Runnable> 任务队列):任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列和优先任务队列。
(6) threadFactory(ThreadFactory-线程创建工厂):线程工厂,用于创建线程,一般用默认即可。
(7)handler(RejectedExecutionHandler-拒绝策略):拒绝策略,当任务太多来不及处理时,如何拒绝任务。

接下来,再接着重点解释一下几个重要的参数:

1 workQueue(BlockingQueue<Runnable> 任务队列)

它们已经介绍了任务队列,一般分为直接提交队列、有界任务队列、无界任务队列和优先任务队列。

(1)直接提交队列:设置为SynchronousQueue队列,SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        for(int i=0;i<3;i++) {
            pool.execute(new ThreadTask());
        }   
    }
}

public class ThreadTask implements Runnable{
    public ThreadTask() {
        
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

输出结果为

pool-1-thread-1
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.hhxx.test.ThreadTask@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
    at com.hhxx.test.ThreadPool.main(ThreadPool.java:17)

可以代码运行结果可以看到,当任务队列为SynchronousQueue,创建的线程数大于maximumPoolSize时,直接执行了拒绝策略抛出异常。

使用SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据设置的handler执行拒绝策略。

因此这种方式提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,需要对程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略。

(2)有界任务队列

有界的任务队列可以使用ArrayBlockingQueue实现,如下所示

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用ArrayBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。

若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量。若大于maximumPoolSize,则执行拒绝策略。

在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。

(3)无界的任务队列

有界任务队列可以使用LinkedBlockingQueue实现,如下所示

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的。

哪怕任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

(4)优先任务队列

优先任务队列通过PriorityBlockingQueue实现,另外PriorityBlockingQueue其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数都不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列是可以自定义规则根据任务的优先级顺序先后执行。

2 handler(RejectedExecutionHandler-拒绝策略)

创建线程池时,为防止资源被耗尽,任务队列会选择创建有界任务队列,但这种模式下,如果出现任务队列已满且线程池创建的线程数达到设置的最大线程数时,这时就需要指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略来处理线程池"超载"的情况。

ThreadPoolExecutor自带的拒绝策略如下:

(1)AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;
(2)CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把新增的任务放在当前线程池所在的线程中运行。这个策略的缺点就是可能会阻塞线程池所在的主线程。
(3)DiscardOldestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交;
(4)DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理,也不会抛异常。当然使用此策略,业务场景中需允许任务的丢失;

以上内置的策略均实现了RejectedExecutionHandler接口,当然你也可以自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略。

参考文章:
(1)【 java线程池ThreadPoolExecutor类使用详解】
(2)【线程池之ThreadPoolExecutor使用】
(3)【java多线程-ThreadPoolExecutor的拒绝策略】

上一篇 下一篇

猜你喜欢

热点阅读