Executor框架与线程池
[转载请注明出处] (http://www.jianshu.com/p/590b185e5910)
Executor框架简介
在Java 5之后,并发编程引入了一堆新的启动、调度和管理线程的API。Executor框架便是Java 5中引入的,其内部使用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。
Executor框架主要相关的接口和类有:Executor,ExecutorService,ThreadPoolExecutor ,ScheduledThreadPoolExecutor,Executors,CompletionService等。
Executor框架结构
Executor主要由三部分组成:任务产生部分,任务处理部分,结果获取部分。
任务的产生
产生一个新的任务通常有以下三种方法:
-
继承Thread类,重写run()方法
-
实现Runnerable接口,重写run()方法
-
实现Callable接口,重写call()方法
任务的处理部分
任务的处理主要是将任务丢到线程池中,由线程池提供线程将任务“消费掉”。在Java5之前,即Executor框架出现之前,还没有出现线程池,我们通常调用start()方法来开启线程。在Java5之后,可以通过创建线程池,然后将任务丢到线程池中,通过线程池开启关闭线程。
线程池有2类:ThreadPoolExecutor和ScheduledThreadPoolExecutor。2种线程池类均可以通过工厂类Executors来创建。
ThreadPoolExecutor类
Executor工厂类可以创建3种类型的ThreadPoolExecutor类:
-
CachedThreadPool
缓存型线程池,当任务丢到线程池中的时候,先查看池中有没有以前建立的线程。如果有,就重新使用。如果没有,就建一个新的线程加入池中。缓存型线程池通常用于执行一些生存期很短的异步型任务。 需要注意的是:能重复使用的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。同样,放入CachedThreadPool的线程也不必担心其结束。
-
FixedThreadPool:
FixedThreadPool与CacheThreadPool差不多,也是能重新使用,但不能随时建新的线程。和CacheThreadPool不同的时,任意时间点,FixedThreadPool最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子。此外,FixedThreadPool没有IDLE机制所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器。从方法的源代码看,cache池和fixed 池调用的是同一个底层池,只不过参数不同(fixed池线程数固定,并且是0秒IDLE。cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE )。
-
SingleThreadExecutor:
单例线程,任意时间池中只能有一个线程。从方法的源代码看,single线程池和cache池以及fixed 池调用的是同一个底层池,只是但线程数目是1-1,无IDLE机制。
ScheduleThreadPoolExecutor类
Executors工厂类可以创建2种类型的SchedulePoolExecutor类
-
ScheduleThreadPoolExecutor:
调度型线程池,它可另行安排在给定的延迟后运行命令,或者定期执行命令。包含若干线程。
-
SingleThreadScheduleExecutor:
调度型线程池,它可另行安排在给定的延迟后运行命令,或者定期执行命令。包含单干线程。
自定义线程池:
可以通过ThreadPoolExecutor的构造方法自定义线程池,它有多个构造方法来创建线程池,简要说明各个构造方法都含有的参数。
-
corePoolSize:线程池中所保存的核心线程数,包括空闲线程。
-
maximumPoolSize:池中允许的最大线程数。
-
keepAliveTime:线程池中的空闲线程所能持续的最长时间。
-
unit:持续时间的单位。
-
workQueue:任务执行前保存任务的队列,仅保存由execute方法提交的Runnable任务。
当试图通过excute方法讲一个Runnable任务添加到线程池中时,按照如下顺序来处理:
-
如果线程池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;
-
如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);
-
如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;
-
如果线程池中的线程数量等于了maximumPoolSize,有4种才处理方式(该构造方法调用了含有5个参数的构造方法,并将最后一个构造方法为RejectedExecutionHandler类型,它在处理线程溢出时有4种方式,这里不再细说,要了解的,自己可以阅读下源码)。
总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于corePoolSize,再看缓冲队列workQueue是否满,最后看线程池中的线程数量是否大于maximumPoolSize。另外,当线程池中的线程数量大于corePoolSize时,如果里面有线程的空闲时间超过了keepAliveTime,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。
结果的获取
首先,结果的获取涉及到一种设计模式,Future模式。Future模式的核心在于:去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他业务逻辑。举个例子,当网上购物提交订单后收到商品前的等待时间内可以做一些其他的事。
再谈结果的获取,结果的获取是通过Future接口的实现类FutureTask来实现的。FutureTask实现了两个接口,Runnable和Future,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值可以使用这个组合,用另一个线程去计算返回值,而当前线程在使用这个返回值之前可以做其它的操作,等到需要这个返回值时,再通过Future得到。futureTask对象的get()方法可以获得异步执行任务的返回值。
框架部分概念的区别比较
Executor和Executors
-
Executor和Executors都属于线程框架部分的内容。
-
Executor是线程池的顶级接口
-
Executors类提供了若干个静态方法,用于生成不同类型的线程池
Executor和ExecutorService
-
Executor和ExecutorService都属于线程框架部分的内容。
-
ExecutorService 接口继承了 Executor 接口,是 Executor 的子接口
-
Executor 接口定义了 execute()方法用来接收一个Runnable接口的对象,而 ExecutorService 接口中的 submit()方法可以接受Runnable和Callable接口的对象。
-
Executor 中的 execute() 方法不返回任何结果,而 ExecutorService 中的 submit()方法可以通过一个 Future 对象返回运算结果。可以通过Future.cancel()取消pending中的任务。
-
除了允许客户端提交一个任务,ExecutorService 还提供用来控制线程池的方法。比如:调用 shutDown() 方法终止线程池。
Runnerable和Callable
-
两者都是接口,都可以编写多线程程序,都需要调用Thread的start()方法来开启线程。
-
实现Callable接口的任务线程能返回执行结果;而实现Runnable接口的任务线程不能返回结果;
-
Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的异常只能在内部消化,不能继续上抛;
ExecutorService和CompletionService
-
两者都是接口,都提供了submit()方法,用于提交要执行的值返回任务,并返回表示任务结果的 Future。
-
在通过ExecutorService的submit()方法返回的Future对象调用get()方法获取返回值的时候,Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住。当前这个任务处于阻塞状态的时候,其他任务可能已将完成,显然这种获取返回值得方式不合适。
-
在通过CompletionService的submit()方法返回的Future对象调用get()方法获取返回值的时候,由于本身维护了一个保存Future对象的BlockingQueue,只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。取值方法是completionServcie.take().get()。
【参考文章】
[兰亭风雨] http://blog.csdn.net/ns_code/article/details/17465497
[你是我世界的光]
http://blog.csdn.net/qq_16811963/article/details/52161713