Java Consurrency 《Thread Pool》
线程池优点
Java是支持多线程的,而线程在应用当中是稀缺资源,所以在编写程序的时候需要特别注意合理的利用线程。
- 降低资源的消耗
- 提高响应速度
- 线程可管理 可复用
Java的线程管理是在java.util.concurrent下,
接口Executor提供了执行已提交的 Runnable 任务的对象的方法。此接口解耦了任务提交和每个任务将如何运行的机制(包括线程使用的细节、调度等)。通常使用 Executor 而不是显式通过 new Thread(new(RunnableTask())).start()创建线程。
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
不过, Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务。如:
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
Executor已知的子接口以及实现类架构图:
[图片上传失败...(image-7ee135-1529156779808)]
下面这件介绍一下子接口、子类的用途:
ExecutorService
ExecutorService 提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。
可以关闭 ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorService。shutdown() 方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务启动并试图停止当前正在执行的任务。在终止时,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService 以允许回收其资源。
通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法 submit 扩展了基本方法 Executor.execute(Runnable)。方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行任务 collection,然后等待至少一个,或全部任务完成(可使用 ExecutorCompletionService 类来编写这些方法的自定义变体)
- 方法列表
ExecutorService#shutdown
ExecutorService#shutdownNow
ExecutorService#isShutdown
ExecutorService#isTerminated
ExecutorService#awaitTermination
ExecutorService#submit(Callable<T>)
ExecutorService#submit(Runnable, T)
ExecutorService#submit(Runnable)
ExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>)
ExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>, long, TimeUnit)
ExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>)
ExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>, long, TimeUnit)
*** submit方法返回一个Future对象,对这对象可以设置超时时间 ***
- 实现一个线程支持执行的DEMO
public class Main {
static ExecutorService executorService = Executors.newFixedThreadPool(5);
public static void main(String[] args) {
executorService.execute(new OpenTV());
}
static class OpenTV implements Runnable{
@Override
public void run() {
System.err.println("i have open the tv");
}
}
}
Doc上列举的实例:
class NetworkService implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService pool;
public NetworkService(int port, int poolSize)
throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}
public void run() { // run the service
try {
for (;;) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) {
pool.shutdown();
}
}
}
class Handler implements Runnable {
private final Socket socket;
Handler(Socket socket) { this.socket = socket; }
public void run() {
// read and service request on socket
}
}
下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后调用 shutdownNow(如有必要)取消所有遗留的任务:
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
整体的测试类
public class ExecutorServiceMain {
//创建一个线程池
static ExecutorService pool = Executors.newFixedThreadPool(5);
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
// pool.execute(new OpenTV());
Future<String> openTvFuture = pool.submit(new OpenDoor());
System.err.println(openTvFuture.get(6L,TimeUnit.SECONDS));
shutDown(pool);
}
static class OpenTV implements Runnable{
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.println("I have open the tv");
}
}
static class OpenDoor implements Callable<String>{
@Override
public String call() throws Exception {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "I love java";
}
}
private static void shutDown(ExecutorService pool){
pool.shutdown();
try {
if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
pool.shutdownNow();
}
if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
System.err.println("something is wrong");
}
} catch (InterruptedException e) {
e.printStackTrace();
pool.shutdownNow();
Thread.interrupted();
}
}
}
ScheduledExecutorService
一个 ExecutorService,可安排在给定的延迟后运行或定期执行的命令
schedule 方法使用各种延迟创建任务,并返回一个可用于取消或检查执行的任务对象。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法创建并执行某些在取消前一直定期运行的任务。
用 Executor.execute(Runnable) 和 ExecutorService 的 submit 方法所提交的命令,通过所请求的 0 延迟进行安排。schedule 方法中允许出现 0 和负数延迟(但不是周期),并将这些视为一种立即执行的请求。
所有的 schedule 方法都接受相对 延迟和周期作为参数,而不是绝对的时间或日期。将以 Date 所表示的绝对时间转换成要求的形式很容易。例如,要安排在某个以后的 Date 运行,可以使用:schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)。但是要注意,由于网络时间同步协议、时钟漂移或其他因素的存在,因此相对延迟的期满日期不必与启用任务的当前 Date 相符。 Executors 类为此包中所提供的 ScheduledExecutorService 实现提供了便捷的工厂方法。
- 方法列表
ScheduledExecutorService#schedule(Runnable, long, TimeUnit)
ScheduledExecutorService#schedule(Callable<V>, long, TimeUnit)
ScheduledExecutorService#scheduleAtFixedRate
ScheduledExecutorService#scheduleWithFixedDelay
ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。如果任务的任何一个执行遇到异常,则后续执行都会被取消。否则,只能通过执行程序的取消或终止方法来终止该任务。如果此任务的任何一个执行要花费比其周期更长的时间,则将推迟后续执行,但不会同时执行。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。如果任务的任一执行遇到异常,就会取消后续执行。否则,只能通过执行程序的取消或终止方法来终止该任务。
- Demo
public class SchduledExecutorMain {
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledFuture<String> scheduled = scheduler.schedule(new RollTheBell(), 5, TimeUnit.SECONDS);
System.err.println(scheduled.get());
Job job = new Job();
ScheduledFuture<?> jobScheduled = scheduler.scheduleAtFixedRate(job, 10, 10, TimeUnit.SECONDS);
System.err.println(jobScheduled.get());
// ExecutorServiceMain.shutDown(scheduler);
}
static class RollTheBell implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("我是 5秒之后执行的");
return "I love java";
}
}
static class Job implements Runnable {
public void run() {
System.out.println("十秒之后每十秒执行一次");
}
}
}
AbstractExecutorService
提供 ExecutorService 执行方法的默认实现。此类使用 newTaskFor 返回的 RunnableFuture 实现 submit、invokeAny 和 invokeAll 方法,默认情况下,RunnableFuture 是此包中提供的 FutureTask 类。例如,submit(Runnable) 的实现创建了一个关联 RunnableFuture 类,该类将被执行并返回。子类可以重写 newTaskFor 方法,以返回 FutureTask 之外的 RunnableFuture 实现。
- 提供的方法列表
AbstractExecutorService#newTaskFor(Runnable, T)
AbstractExecutorService#newTaskFor(Callable<T>)
AbstractExecutorService#submit(Runnable)
AbstractExecutorService#submit(Runnable, T)
AbstractExecutorService#submit(Callable<T>)
AbstractExecutorService#doInvokeAny
AbstractExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>)
AbstractExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>, long, TimeUnit)
AbstractExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>)
AbstractExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>, long, TimeUnit)
ThreadPoolExecutor
ThreadPoolExecutor继承了AbstractExecutorService,总共提供了四个构造函数
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory)
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, RejectedExecutionHandler)
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory, RejectedExecutionHandler)
构造函数主要提供以下几个参数:
-
corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
-
BlockingQueue
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
- SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
- maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
- ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
- RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
- AbortPolicy:直接抛出异常。
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
- keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
- TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
线程池提交任务
- 一种是无返回值
pool.execute(new Runnable() {
@Override
public void run() {
// do something
}
});
- 一种是有返回值
我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法执行任务异常
} finally {
// 关闭线程池
executor.shutdown();
}
线程池的关闭
public static void shutDown(ExecutorService pool){
pool.shutdown();
try {
if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
pool.shutdownNow();
}
if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
System.err.println("something is wrong");
}
} catch (InterruptedException e) {
e.printStackTrace();
pool.shutdownNow();
Thread.interrupted();
}
}
线程池的工作原理分析
[图片上传失败...(image-4dd8a2-1529156779808)]
从上图我们可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:
- 首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
- 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
- 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。
合理的配置线程池
要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
- 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
- 任务的优先级:高,中和低。
- 任务的执行时间:长,中和短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能小的线程,如配置Ncpu+1个线程的线程池。IO密集型任务则由于线程并不是一直在执行任务,则配置尽可能多的线程,如2Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()* 方法获得当前设备的CPU个数。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。
线程池的监控
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用
- taskCount:线程池需要执行的任务数量。
- completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
- largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
- getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不加
- getActiveCount:获取活动的线程数。
通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。如:
protected void beforeExecute(Thread t, Runnable r) { }
参考:
- Java并发编程实战
- JDK1.6源码
- infoq