带你进入java中的ExecutorService
1.概览
ExecutorService是一个由JDK提供的框架,它简化了以异步模式运行task的工作。通常来说,ExecutorService会自动提供一个线程池以及常用的API。
2.实例化ExecutorService
2.1 Executors类的工厂方法
创建ExecutorService的最简单的方法是使用Executors类的一个工厂方法。例如,下面的一行代码将会创建一个拥有10个线程的线程池。
ExecutorService executor = Executors.newFixedThreadPool(10);
还有其他几个工厂方法用于创建预定义的ExecutorService,它们会满足特定的使用场景。寻找最合适你的方法,可以咨询Oracle的官方文档。
2.2 直接创建一个ExecutorService
由于ExecutorService是一个接口,所以可以使用它的任一实现来创建一个实例。在java.util.concurrent包中有好几个实现可供选择,或者你可以创建自己的实现。例如,ThreadPoolExecutor类就有好几个构造方法可用于配置一个executor service以及它内部的线程池。
ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
你可能注意到了,上面的代码和工厂方法newSingleThreadExecutor()的源代码很像,在大多数情况下,详细的手工配置都不是必需的。
3.分配task给ExecutorService
ExecutorService可以运行 Runnable 以及Callable任务。为了让事情简单一点,我们将使用俩个最基本的任务。注意:这里我们使用拉姆达表达式来取代匿名内部类。
Runnable runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Callable callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
有好几个方法可以用于把task分配给ExecutorService,包括: execute()、 submit()、 invokeAny()、 invokeAll()。其中,execute()方法是继承自Executor接口。execute()方法的返回值是void,使用它无法获取任务的运行结果或者检查任务的状态(例如,是正在运行还是已经被执行了)。
executorService.execute(runnableTask);
submit()可以提交一个Callable或Runnable任务给ExecutorService,并且返回一个Future类型的结果。
Future future = executorService.submit(callableTask);
invokeAny()可以给ExecutorService分配一个任务集合。导致他们中的每一个都被执行,并且返回一个任务成功运行的结果(如果存在一个成功运行的话)。
String result = executorService.invokeAny(callableTasks);
invokeAll()也是可以给ExecutorService分配一个任务集合。导致他们中的每一个都被执行,并且以Future集合的形式返回所有任务的运行结果。
List> futures = executorService.invokeAll(callableTasks);
现在,在进行下一步之前,我们有两件事必须要讨论一下: 关闭ExecutorService并且处理Future返回类型。
4.关闭一个ExecutorService
在通常情况下,当没有待处理的任务(task)时,ExecutorService将不会自动销毁。它会一直保持存活并且等待执行新任务。
在某些情况下,这或许很有用;例如,有一个应用需要处理一些不规则的任务或大量在编译时不可知的任务。在另一方面,即使一个app在执行到最后,它也不会被停止,因为一个处于等待中的ExecutorService 将导致JVM保持运行。
为了正确地关闭一个ExecutorService,我们有两个API可供使用: shutdown() 以及shutdownNow()。
shutdown()方法不会导致ExecutorService立即销毁。它将让ExecutorService停止接收新任务,并且在所有运行中的线程完成他们当前工作之后,关闭ExecutorService。
executorService.shutdown();
而shutdownNow()方法会试图立即销毁ExecutorService,但是它并不保证在同一时刻下所有运行中的线程都能被终止。该方法会返回一个任务集合,这些任务都是等待被ExecutorService处理的。具体怎么处理这些任务则取决于开发者。
List notExecutedTasks = executorService.shutDownNow();
关闭ExecutorService的最好方式(同时也是Oracle所推荐的)是上面俩种方法与awaitTermination()的组合使用。使用这种方法,ExecutorService将会先停止接收新任务,然后等待一段时间好让所有任务都被完成。如果时间过期了,执行将被理解终止:
executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
5.Future接口
submit()和invokeAll()方法会返回一个对象或者一个Future类型的集合,这使得我们可以获取任务的执行结果或者检查任务的状态(是正在运行还是已经执行完成)。
Future接口提供了一个特殊的额阻塞方法get(),get()方法会返回一个Callable任务实际的执行结果,或者null(如果是Runnable任务的话)。在任务仍处于运行期间的时候,调用get()方法将会导致阻塞,直到任务被恰当地执行并且结果是可用时。
Future future = executorService.submit(callableTask);
String result = null;
try {
result = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
如果get()方法导致长久的阻塞的话,应用的执行性能就会下降。如果结果数据不重要的话,可以使用超时时间来避免这个问题:
String result = future.get(200, TimeUnit.MILLISECONDS);
如果执行时间比指定时间长的话(此时,即:200毫秒),就会抛出TimeoutException异常。isDone()方法可以用来检查分配的任务是否已经被处理了。Future接口也提供了cancel()方法用于取消任务的执行,并且可以使用isCancelled()方法来检查该任务是否已经被取消。
boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();
6.ScheduledExecutorService接口
ScheduledExecutorService会在预定义的延迟之后,运行任务。再次说明,实例化ScheduledExecutorService的最好方式是使用Executor类的工厂方法。在本章节中,我们将使用单线程的ScheduledExecutorService:
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
在一个固定延迟之后,调度一个单任务执行,可以使用ScheduledExecutorService的scheduled()方法。这里有两个scheduled()方法供你运行一个Runnable或Callable任务:
Future resultFuture = executorService.schedule(callableTask, 1, TimeUnit.SECONDS);
scheduleAtFixedRate()方法可以让我们在固定的延迟时间之后,周期性地执行一个任务。上面的代码会在执行callableTask之前,延迟一秒。
下面的代码块则会在100毫秒的初始延迟之后,执行一个任务,并且在那之后,它会每隔450毫秒执行同一个任务。如果处理器在处理一个任务时需要的时间比scheduledAtFixedRate()方法的period参数更长的话,ScheduledExecutorService将会在开始下一个任务之前一直等待,直到当前任务完成。
Future resultFuture = service .scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);
如果必需在两个任务迭代之间有一个固定长度的延迟的话,应当使用scheduleWithFixedDelay()。例如,下面的代码将会保证在结束当前运行和开始另一个任务之间有150毫秒的停滞。
service.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);
根据scheduledAtFixedRate()以及scheduleWithFixedDelay()方法的约定,如果ExecutorService终止或者在任务执行过程中有异常抛出的时,任务的周期执行将会结束。
7.ExecutorService VS Fork/Join
在java7发布之后,许多开发者任务,应该使用fork/join框架取代ExecutorService框架。但这并不总是合适的选择。虽然fork/join可以带来使用的简洁性以及运行的流畅性,但是它也让降低了我们对并发执行的控制。ExecutorService让开发者可以控制一定数量的线程,以及任务执行的粒度。
ExecutorService最适用于处理那些相互独立的任务。例如,事务 或 符合“单任务单线程”的请求等。
相比之下,根据Oracle的官方文档,fork/join在设计之初的目的就是加速那些可以被分解成小块的工作。
8.总结
尽管ExecutorService比较简单,但还是有很多坑需要我们注意。我们来总结一下:
1.保持一个未使用的ExecutorService存活: 我们在本篇文章的章节4中讲解了如何关闭一个ExecutorService。
2.在使用固定长度的线程池时,错误的线程池容量: 决定一个应用到底需要多少线程才可以有效地执行任务时非常重要的,线程池太大会导致 不 必要的花销,因为大多数线程将处于等待模式waiting mode。线程池太小的话,又会因为队列中任务的长时间地等待而降低应用的响应性。
3.在一个task取消之后,调用Future的get()方法:试图获取一个已经取消的任务的执行结果将会触发一个CancellationException。
4.Future.get()方法的长期阻塞:超时时间应该用于避免无谓的等待。
文章中的代码可以在github上找到,地址: sourceCode