java并发编程——线程池
1. 为什么使用线程池
诸如 Web 服务器、数据库服务器、文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务。一般处理方式:每当一个请求到达就创建一个新线程,然后在新线程中为请求服务。然而无限制创建线程存在以下问题:
- 线程创建开销非常高,同时需要jvm和操作系统提供一些辅助操作。
- 资源消耗,活跃的线程会消耗系统资源,尤其是内存。可运行的线程数量大于处理器数量,某些线程就会闲置,同时给垃圾回收带来压力。
- 稳定性,可创建线程的数量上存在一个上限,根据jvm配置和操作系统自身设置有关,超过这个限制有可能引起OutOfMemoryError。
在一定范围内,增加线程可以提高系统的吞吐率,但是如果超过这个范围,在创建的线程只会降低程序的执行速度。为了解决以上问题我们引入了线程池。
2. 线程池的优势
线程池——控制线程创建、释放,并通过某种策略尝试复用线程去执行任务的一个管理框架,从而实现线程资源与任务之间一种平衡。
好的软件设计不建议手动创建和销毁线程。线程池可以根据创建时选择的策略自动处理线程的生命周期。合理利用线程池能够带来三个好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
3. java线程池核心接口和类说明
在JDK1.5以后,为简化多线程并发编程,引入全新的并发编程包:java.util.concurrent及其并发编程框架(Executor框架)。
Executor框架主要包含三个部分:
- 任务:包括Runnable和Callable,其中Runnable表示一个可以异步执行的任务,而Callable表示一个会产生结果的任务。
- 任务的执行:包括Executor框架的核心接口Executor以及其子接口ExecutorService。在Executor框架中有两个关键类ThreadPoolExecutor和ScheduledThreadPoolExecutor实现了ExecutorService接口。
- 异步计算的结果:包括接口Future和其实现类FutureTask。
其中Executor接口作为基础与核心,其定义如下:
public interface Executor {
void execute(Runnable command);
}
它包含了一个方法execute,参数为一个Runnable接口引用,该接口主要目的将任务的提交与执行分离开来。
ExcutorService作为子接口定义了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。
下面就java线程池具体实现类进行说明和分析。
4. java线程池ThreadPollExecuator使用及原理
4.1 线程池构建说明
在JDK1.5以后推出了ThreadPollExecuator这个默认线程池,我们可以通过ThreadPoolExecutor来创建一个线程池:
new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, milliseconds,runnableTaskQueue, threadFactory,handler);
构建参数说明:
- corePoolSize:线程池主线程数量,有任务提交的时候,如果线程池中线程数量小于corePoolSize,则会直接创建新的线程放入workers中,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到等待任务队列当中。
- maximumPoolSize:线程池最大线程数,表示在线程池中最多能创建多少个线程。
- keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止。其中TimeUnit配合使用表示对应单位。
- runnableTaskQueue(等待任务队列):用于保存等待执行的任务的阻塞队列,根据需求可以在以下队列中进行挑选:
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO 排序元素,吞吐量通常高于ArrayBlockingQueue。
- SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。
- PriorityBlockingQueue:一个具有优先级得无限阻塞队列。
- ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助。
- handler:RejectedExecutionHandler(饱和策略接口)当队列和线程池都满了,必须采用一种策略处理新提交的任务,JDK1.5提供的四种策略:
- CallerRunsPolicy:直接让原先的client thread做为worker线程,进行执行。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
- AbortPolicy:java默认抛出异常RejectedExecutionException。
最后,你可以据应用场景需要来实现RejectedExecutionHandler接口自定义策略。
4.2 线程池执行任务流程
提交一个新任务到线程池时,线程池的处理流程如下:
- 首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
- 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
- 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。
//如果线程数小于基本线程数,则创建线程并执行当前任务
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
//如果线程池不处于运行中或任务无法放入队列,
并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务。
else if (!addIfUnderMaximumPoolSize(command))
//抛出RejectedExecutionException异常
reject(command); // is shutdown or saturated
}
4.3 线程池任务提交
可以使用两个方法向线程池提交任务,分别是execute()和submit()方法。
execute方法用于提交不需要返回值的任务,所以无法判断任务是否执行成功,其提交任务是一个Runnalbe类的实例执行实例如下:
threadpool.execute(new Runnable(){
public void run(){
}
})
submit方法用于提交需要返回值得任务,线程池会返回一个future类型的对象,通过这个future对象可以判断这个任务是否执行成功,并且通过future的get方法可以获取返回值。其提交的任务可以是一个Runnalbe类实例或Callable类实例
Future<Object> future = threadPool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
});
5. Executors说明
可以看出ThreadPollExecuator可以提供n种不同的实现策略,为方便用户使用jdk同时提供Executors工具类,生成一些常用的线程池。jdk注释说明没有特殊需求,强烈建议程序员使用较为方便的 Executors 工厂方法。下面介绍一下常用线程池:
- newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
FixedThreadPool满足了资源管理的需求,可以限制当前线程数量。适用于负载较重的服务器环境。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
**CachedThreadPool适用于执行很多短期异步任务的小程序,适用于负载较轻的服务器。
**
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
6. 合理配置线程池及建议
要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
- 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
- 任务的优先级:高,中和低。
- 任务的执行时间:长,中和短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1;如果是IO密集型任务,参考值可以设置为2*NCPU;优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理;执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
参考
http://www.cnblogs.com/micrari/p/5634447.html
https://www.oschina.net/question/565065_86540
http://www.cnblogs.com/exe19/p/5359885.html
http://www.cnblogs.com/waytobestcoder/p/5323130.html