[并发] 6 线程池之ThreadPoolExecutor

2019-11-06  本文已影响0人  LZhan
1.Thread使用

new Thread的弊端:

所以,在实践中使用多线程时,并不会使用Thread,而是使用线程池。

2.线程池使用

线程池的好处:

3.ThreadPoolExecutor类的注释说明
关系图.png

ExecutorService(ThreadPoolExecutor的顶层接口)使用线程池中的线程执行每个提交的任务,通常我们使用Executors的工厂方法来创建ExecutorService。
线程池解决了两个不同的问题:
<1> 提升性能:它们通常在执行大量异步任务时,由于减少了每个任务的调用开销,并且它们提供了一种限制和管理资源(包括线程)的方法,使得性能提升明显;
<2> 统计信息:每个ThreadPoolExecutor保持一些基本的统计信息,例如完成的任务数量。

为了在广泛的上下文中有用,此类提供了许多可调参数和可扩展性钩子。 但是,在常见场景中,我们预配置了几种线程池,我们敦促程序员使用更方便的Executors的工厂方法直接使用。

4.ThreadPoolExecutor类参数

重要初始化参数:
<1> corePoolSize:核心线程数量
<2> maximumPoolSize:线程池最大线程数
<3> workQueue:阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响
<4> keepAliveTime:线程没有任务执行时最多保持多久时间终止
<5> unit:keepAliveTime的时间单位
<6> threadFactory:线程工厂,用来创建线程
<7> rejectHandler:当拒绝处理任务时的策略

4.1 workQueue
workQueue是一个BlockingQueue,用于存放提交的任务,队列的实际容量与线程池大小相关联。
主要有三种队列策略:
<1> Direct handoffs 直接握手队列
Direct handoffs 的一个很好的默认选择是 SynchronousQueue,它将任务交给线程而不需要保留。这里,如果没有线程立即可用来运行它,那么排队任务的尝试将失败,因此将构建新的线程。
此策略在处理可能具有内部依赖关系的请求集时避免锁定。Direct handoffs 通常需要无限制的maximumPoolSizes来避免拒绝新提交的任务。 但得注意,当任务持续以平均提交速度大于平均处理速度时,会导致线程数量会无限增长问题。

<2> Unbounded queues 无界队列
当所有corePoolSize线程繁忙时,使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将导致新任务在队列中等待,从而导致maximumPoolSize的值没有任何作用。当每个任务互不影响,完全独立于其他任务时,这可能是合适的; 例如,在网页服务器中, 这种队列方式可以用于平滑瞬时大量请求。但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致队列无限增长问题。

<3> Bounded queues 有界队列
一个有界的队列(例如,一个ArrayBlockingQueue)和有限的maximumPoolSizes配置有助于防止资源耗尽,但是难以控制。队列大小和maximumPoolSizes需要 相互权衡:

4.2 拒绝策略(RejectedExecutionHandler的实现类)
拒绝任务有两种情况:1.线程池被关闭 2.任务队列已满且maximumPoolSize已满;
无论哪种情况,都会调用RejectedExecutionHandler的rejectedExecution方法。预定义了四种处理策略:
<1> AbortPolicy
默认的策略,直接抛出RejectedExecutionException运行时异常
<2> CallerRunsPolicy
提供一个简单的反馈控制机制,可以减慢提交新任务的速度
<3> DiscardPolicy
直接丢弃新提交的任务
<4> DiscardOldestPolicy
如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程)

4.ThreadPoolExecutor的几种状态
状态.png

RUNNING:运行态,可以处理新任务并执行队列中的任务
SHUTDOWN:关闭态,不接受新任务,但是处理队列中已存在的任务
STOP:停止态,不接受新任务,不处理队列中任务,并且打断运行中任务
TIDYING:整理态,所有任务已经结束,workerCount=0,将执行terminated()方法
TERMINATED:结束态,terminated()方法已完成

5.ThreadPoolExecutor的几种方法

<1> execute():提交任务,交给线程池执行
<2> submit():提交任务,能够返回执行结果,相当于execute+Future
<3> shutdown():关闭线程池,等待任务都执行完(进入shutdown状态)
<4> shutdownNow():关闭线程池,不等待任务执行完(进入stop状态)
适用于监控
<5> getTaskCount():线程池已执行和未执行的任务总数
<6> getCompletedTaskCount():已完成的任务数量
<7> getPoolSize():线程池当前的线程数量
<8> getActiveCount():当前线程池中正在执行任务的线程数量

线程池类图.png
6.Executor框架接口
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
上一篇下一篇

猜你喜欢

热点阅读