线程池

2020-02-17  本文已影响0人  9283856ddec1

当我们需要频繁的创建多个线程进行耗时操作时,每次通过new Thread实现并不是一种好的方式,每次新建和销毁对象性能较差,线程缺乏统一管理,可能无限制新建线程,相互之间竞争,可能占用过多系统资源导致死锁,并且缺乏定时执行、定期执行、线程中断等功能。在开发过程中,合理地使用线程池能够带来如下好处。

  1. 降低资源消耗;
    通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  2. 提高响应速度;
    当任务到达时,任务可以不需要等到线程创建就能立即执行。

  3. 提高线程的可管理性;
    线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源, 还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。提供定时执行,定期执行,单线程,并发数控制等功能。

线程池原理

当向线程池提交一个任务之后,线程池 的主要处理流程如图所示。


线程池的主要处理流程.png

执行流程说明:
1)线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作 线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。

2)线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这 个工作队列里。如果工作队列满了,则进入下个流程。

3)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程 来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

线程池使用

Executor框架的使用示意图如图所示:


Executor框架的使用示意图.png

线程池的创建

通常我们都不会直接通过new的形式来创建线程池,由于创建参数过程相对复杂一些,JDK给我们提供了Executors工厂类来简化这个过程。
详细创建见下文:ThreadPoolExecutor

向线程池提交任务

可以使用两个方法向线程池提交任务,分别为execute()和submit()方法。
execute()
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。如下所示:

threadsPool.execute(new Runnable() { 
    @Override 
    public void run() { 
        // TODO Auto-generated method stub 
    } 
});

submit()
submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个 future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方 法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线 程一段时间后立即返回,这时候有可能任务没有执行完。

Future<Object> future = executor.submit(harReturnValuetask);
try { 
    Object s = future.get(); 
} catch (InterruptedException e) { 
    // 处理中断异常 
} catch (ExecutionException e) { 
    // 处理无法执行任务异常 
} finally { 
    // 关闭线程池 
    executor.shutdown(); 
}

关闭线程池

可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线 程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务 可能永远无法终止。
shutdownNow
首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

shutdown
只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线 程。

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务 都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪 一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭 线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

合理配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

线程池的监控

如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根 据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池的 时候可以使用以下属性。

属性 说明
taskCount 线程池需要执行的任务数量
completedTaskCount 线程池在运行过程中已完成的任务数量,小于或等于taskCount
largestPoolSize 线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是 否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
getPoolSize 线程池的线程数量
getActiveCount 获取活动的线程数

通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute和terminated方法,也可以在任务执行前、执行后和线程池关闭前执 行一些代码来进行监控。

Executor框架

Executor框架简介

Executor框架的两级调度模型

在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线 程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程 也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。两级调度模型如下所示:

任务的两级调度模型.png
上层,把应用分解为若干个任务,然后使用用户级的调度器(Executor)将这些任务映射到固定数量的线程;
下层,操作系统内核将这些线程映射到硬件处理器上。
Executor框架的结构

Executor框架包含的主要的类与接口如图所示:


Executor框架的类与接口.png

Executor框架主要由3大部分组成如下:
(1)任务
被执行任务需要实现的接口:Runnable接口或Callable接口。

(2)任务的执行
任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。

(3)异步计算的结果
接口Future和实现Future接口的FutureTask类。

Runnable接口和Callable接口

Runnable接口和Callable接口的实现类,都可以被Executor执行。它们之间的区别是Runnable不会返回结果,而Callable可以返回结果。可以使用工厂类Executors来把Runnable包装成一个Callable。

public static Callable<Object> callable(Runnable task)
public static <T> Callable<T> callable(Runnable task, T result)

ExecutorService

ExecutorService定义了线程池需要实现的接口,其生命生命周期包括3种状态:运行、关闭、终止。创建后便进入运行状态,当调用shutdown()方法时,便进入关闭状态,此时意味着ExecutorService不再接受新的任务,但它还在执行已经提交了的任务。当所有已经提交了的任务执行完后,就变成终止状态。

ThreadPoolExecutor

ThreadPoolExecutor构造函数如下:

public ThreadPoolExecutor(int corePoolSize, int maxinumPoolSize, 
                                             long keepAliveTime, TimeUnit unit, 
                                             BlockingQueue<Runnable> workQueue, 
                                             ThreadFactory threadFactory, 
                                             RejectedExecutionHandler handler)

参数详细说明如下所示:

参数名 作用
corePoolSize 线程池所保存的核心线程数,线程池启动后默认是空的,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的线程能够执行任务也会创建线程,等到需要执行的任务大于线程核心数后就不再创建。prestartAllCoreThreads方法可以在线程池启动后立即启动所有核心线程以等待任务
maxinumPoolSize 线程池允许创建的最大线程数。如果队列满了,并且已创建的线程小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没有效果。
keepAliveTime 当前线程池线程总数大于核心线程数时,终止多于的空闲线程的时间。如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
unit keepAliveTime参数的时间单元,可选值有天、小时、分钟、毫秒、微秒和纳秒。
workQueue 任务队列,如果当前线程池达到核心线程数corePoolSize,且当前所有线程都处于活动状态时,则将新加入的任务放到此队列中
threadFactory 线程工厂,可以通过线程工厂给每个创建出来的线程设 置更有意义的名字。用户可以定制线程的创建过程,通常不需要设置。
handler 拒绝策略,当线程池与workQueue队列都满了的情况下,说明线程池处于饱和状 态,那么必须采取一种策略处理提交的新任务。
FixedThreadPool详解

FixedThreadPool被称为可重用固定线程数的线程池。下面是FixedThreadPool的源代码实现:

public static ExecutorService newFixedThreadPool(int nThreads){
        ruturn new ThreadPoolExecutor(nThreads, nThreads, 
                                                            0L, TimeUnit.MILLISECONDS
                                                            new LinkedBlockingQueue<Runnable>());
}

设置它的corePoolSize和maxinumPoolSize值都是nThreads,并且设置keepAliveTime参数为0毫秒,最后设置无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。该线程池中就含有了固定个数的线程,并且能够容纳无限个任务。
FixedThreadPool的execute()的运行示意图如下所示:


FixedThreadPool的execute()的运行示意图.png

使用无界队列LinkedBlockingQueue作为工作队列会对线程池带来如下影响:
1> 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中 的线程数不会超过corePoolSize。
2> 拒绝策略不会生效。

适用场景
FixedThreadPool适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场 景,它适用于负载比较重的服务器。

SingleThreadExecutor详解

SingleThreadExecutor是使用单个worker线程的Executor。下面是SingleThreadExecutor的源代码实现:

public static ExecutorService newSingleThreadExecutor() { 
    return new FinalizableDelegatedExecutorService (
                     new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, 
                                                      new LinkedBlockingQueue<Runnable>())
); }

SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。其他参数与 FixedThreadPool相同。运行示意图如下:


SingleThreadExecutor的execute()的运行示意图.png

适用场景
SingleThreadExecutor适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多 个线程是活动的应用场景。

CachedThreadPool详解

CachedThreadPool是一个会根据需要创建新线程的线程池。下面是创建CachedThread- Pool的源代码:

public static ExecutorService newCachedThreadPool() { 
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 
                                                   60L, TimeUnit.SECONDS, new 
                                                   SynchronousQueue<Runnable>()); 
}

CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为 Integer.MAX_VALUE,即maximumPool是无界的。这里把keepAliveTime设置为60L,意味着 CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。

CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但 CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于 maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下, CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

运行示意图如下:


CachedThreadPool的execute()的运行示意图.png

1)首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程 正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方 法执行完成;否则执行下面的步骤2。
2)当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤1将失 败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成。
3)在步骤2中新创建的线程将任务执行完后,会执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执行步骤1),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于 空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。

适用场景
SingleThreadExecutor适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多 个线程是活动的应用场景。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运 行任务,或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但 ScheduledThreadPoolExecutor功能更强大、更灵活。Timer对应的是单个后台线程,而 ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。


ScheduledThreadPoolExecutor的任务传递示意图.png

ScheduledThreadPoolExecutor的执行主要分为两大部分。
1)当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWith- FixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了 RunnableScheduledFutur接口的ScheduledFutureTask。

2)线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。

ScheduledThreadPoolExecutor的任务执行步骤.png

下面是对这4个步骤的说明:
1)线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask的time大于等于当前时间。
2)线程1执行这个ScheduledFutureTask。
3)线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。
4)线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。

ScheduledThreadPoolExecutor

适用场景
ScheduledThreadPoolExecutor适用于需要多个后台线程执行周期任务,同时为了满足资源 管理的需求而需要限制后台线程的数量的应用场景。

SingleThreadScheduledExecutor

适用场景
SingleThreadScheduledExecutor适用于需要单个后台线程执行周期任务,同时需要保证顺 序地执行各个任务的应用场景。

FutureTask

FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给 Executor执行,也可以由调用线程直接执行FutureTask.run()。

根据FutureTask.run()方法被执行 的时机,FutureTask可以处于下面3种状态,如下图所示:


FutureTask的状态迁移示意图.png

1)未启动
当创建一 个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。

2)已启动
FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。

3)已完成
FutureTask.run()方法执行完后正常结束,或被取消,或 执行时抛出异常而异常结束,FutureTask处于已完成状态。

FutureTask调用get方法和cancel方法的执行示意图:


FutureTask的get和cancel的执行示意图.png
FutureTask实现

FutureTask的实现基于AbstractQueuedSynchronizer(以下简称为AQS),AQS是一个同步框架,它提供通 用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。

基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类 Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。AQS被作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只 需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。具 体来说,Sync实现了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通过这 两个方法来检查和更新同步状态。

FutureTask的设计示意图如下图所示:


FutureTask的设计示意图.png
参考资料:

[1] Java并发编程的艺术,方腾飞,魏鹏,程晓明
[2] Android开发进阶--从小工到专家,何红辉

上一篇 下一篇

猜你喜欢

热点阅读