Java多线程:Executor,Executors,Futur
平时工作中经常碰到个各种多线程,有时候搞不清它们之间到底有什么区别,这次来个总体的总结,主要是以下这些:
Executor,Executors,ExecutorService, CompletionServie,Future,Callable,Runnable,FutureTask
一、Runnable(interface)
public interface Runnable {
public void run();
}
run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。
二、Callable (interface)
public interface Callable<V> {
V call() throws Exception;
}
与 Runnable 不同的是call()函数返回的类型就是传递进来的V类型,而且能够抛出异常。一般情况下是配合ExecutorService来使用的
三、Future( interface)
Future是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
- isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
- isDone方法表示任务是否已经完成,若任务完成,则返回true;
- get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
- get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
也就是说Future提供了三种功能:
- 判断任务是否完成;
- 能够中断任务;
- 能够获取任务执行结果。
四、FutureTask(Runnable, Future<V>的具体实现)
public class FutureTask<V> implements RunnableFuture<V> 。。。
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值,管理任务。
其中有两个构造方法
//接受一个 Callable 参数
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
//接受一个 Runnable ,利用 Executors.callable 将Runnable 转换为Callable
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
具体使用可以参考 AsyncTask 中的使用
五、Executor(interface)
在Executor框架中,使用执行器(Exectuor)来管理Thread对象,从而简化了并发编程。并发编程的一种编程方式把任务拆分为一系列的小任务,即Runnable,然后将这些任务提交给一个Executor执行,Executor.execute(Runnalbe) 。Executor在执行时使用其内部的线程池来完成操作。
Executor 接口中之定义了一个方法 execute(Runnable command),该方法接收一个 Runable 实例,它用来执行一个任务,任务即一个实现了 Runnable 接口的类。
public interface Executor {
void execute(Runnable command);
}
为了避免调用 new Thread(new RunnableTask()).start()这样的代码我们可以
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...
Executor 并不是严格的要求一步执行,我们可以简单的直接在调用者线程执行运行提交的任务
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run(); // 在调用者线程执行
}}
一般来说任务在非调用者的线程中执行,比如产生一个新的线程
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start(); //新启一个线程,在非调用者线程中执行
}}
有很多Executor 的实现是为了实现任务的某种调度,比如 AsyncTask 中的串行任务队列
class SerialExecutor implements Executor {
final Queue tasks = new ArrayDeque<>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
public synchronized void execute(final Runnable r) {
tasks.add(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}}
六、ExecutorService(interface,继承自Executor)
ExecutorService 接口继承自 Executor 接口,它提供了更丰富的实现多线程的方法,比如,ExecutorService 提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以调用 ExecutorService 的 shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致 ExecutorService 停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭 ExecutorService。因此我们一般用该接口来实现和管理多线程。
execute(Runnable)
submit(Runnable)
submit(Callable)
invokeAny()
invokeAll()
execute(Runnable)
方法 execute(Runnable) 接收一个java.lang.Runnable 对象作为参数,并且以异步的方式执行它。
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
executorService.shutdown();
使用这种方式没有办法获取执行 Runnable 之后的结果,如果你希望获取运行之后的返回值,就必须使用接收 Callable 参数的 execute() 方法。
submit(Runnable)
方法 submit(Runnable) 同样接收一个Runnable 的实现作为参数,但是会返回一个Future 对象。这个Future 对象可以用于判断 Runnable 是否结束执行。如下是一个ExecutorService 的 submit() 方法的例子:
Future future = executorService.submit(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
//如果任务结束执行则返回 null
System.out.println("future.get()=" + future.get());
submit(Callable)
方法 submit(Callable) 和方法 submit(Runnable) 比较类似,但是区别则在于它们接收不同的参数类型。Callable 的实例与 Runnable 的实例很类似,但是 Callable 的 call() 方法可以返回一个结果。方法 Runnable.run() 则不能返回结果。
Callable 的返回值可以从方法 submit(Callable) 返回的 Future 对象中获取。如下是一个 ExecutorService Callable 的样例:
Future future = executorService.submit(new Callable(){
public Object call() throws Exception {
System.out.println("Asynchronous Callable");
return "Callable Result";
}
});
System.out.println("future.get() = " + future.get());
输出结果
Asynchronous Callable
future.get() = Callable Result
inVokeAny()
方法 invokeAny() 接收一个包含 Callable 对象的集合作为参数。调用该方法不会返回 Future 对象,而是返回集合中某一个Callable 对象的结果,而且无法保证调用之后返回的结果是哪一个 Callable,只知道它是这些 Callable 中一个执行结束的 Callable 对象。如果一个任务运行完毕或者抛出异常,方法会取消其它的 Callable 的执行。
以下是一个样例:
ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> callables = new HashSet<Callable<String>>();
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});
String result = executorService.invokeAny(callables);
System.out.println("result = " + result);
executorService.shutdown();
输出结果:
以上样例代码会打印出在给定的集合中的某一个Callable 的返回结果。尝试运行后发现每次结果都在改变。有时候返回结果是"Task 1",有时候是"Task 2",等等。
invokeAll()
方法 invokeAll() 会调用存在于参数集合中的所有 Callable 对象,并且返回一个包含 Future 对象的集合,你可以通过这个返回的集合来管理每个 Callable 的执行结果。需要注意的是,任务有可能因为异常而导致运行结束,所以它可能并不是真的成功运行了。但是我们没有办法通过 Future 对象来了解到这个差异。
ExecutorService服务的关闭(shutdown() 或 shutdownNow())
当使用 ExecutorService 完毕之后,我们应该关闭它,这样才能保证线程不会继续保持运行状态。
举例来说,如果你的程序通过 main() 方法启动,并且主线程退出了你的程序,
如果还有一个活动的 ExecutorService 存在于程序中,那么程序将会继续保持运行状态。存在于 ExecutorService 中的活动线程会阻止Java虚拟机关闭。
为了关闭在 ExecutorService 中的线程,需要调用** shutdown() **方法。但ExecutorService 并不会马上关闭,而是不再接收新的任务,一旦所有的线程结束执行当前任务,ExecutorServie 才会真的关闭。所有在调用 shutdown() 方法之前提交到 ExecutorService 的任务都会执行。
如果你希望立即关闭 ExecutorService,你可以调用** shutdownNow() **方法。这个方法会尝试马上关闭所有正在执行的任务,并且跳过所有已经提交但是还没有运行的任务。但是对于正在执行的任务,是否能够成功关闭它是无法保证的,有可能他们真的被关闭掉了,也有可能它会一直执行到任务结束。这是一个最好的尝试。
ExecutorService 接口在 java.util.concurrent 包中有如下实现类:
ThreadPoolExecutor
ScheduledThreadPoolExecutor
七、Executors(class)
Executors 提供了一系列工厂方法用于创先线程池,返回的线程池都实现了 ExecutorService 接口。
public static ExecutorService newFixedThreadPool(int nThreads)
// 创建固定数目线程的线程池。
public static ExecutorService newCachedThreadPool()
// 创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。
// 如果现有线程没有可用的,则创建一个新线 程并添加到池中。
// 终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newSingleThreadExecutor()
// 创建一个单线程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
// 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
Executors的使用:
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
executorService.shutdown();
该示例代码首先使用 newFixedThreadPool() 工厂方法创建一个ExecutorService ,上述代码创建了一个可以容纳10个线程任务的线程池。其次,向 execute() 方法中传递一个异步的 Runnable 接口的实现,这样做会让 ExecutorService 中的某个线程执行这个Runnable 线程。
八、CompletionServie
为什么需要CompletionServie
如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务(Completion service)。
CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。
CompletionService与ExecutorService的对比使用
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// case1();
// case2();
case3();
}
/**
* <一>
* 1. 用List收集任务结果 (List记录每个submit返回的Future)
* 2. 循环查看结果, Future不一定完成, 如果没有完成, 那么调用get会租塞
* 3. 如果排在前面的任务没有完成, 那么就会阻塞, 这样后面已经完成的任务就没法获得结果了, 导致了不必要的等待时间.
* 更为严重的是: 第一个任务如果几个小时或永远完成不了, 而后面的任务几秒钟就完成了, 那么后面的任务的结果都将得不到处理
*
* 导致: 已完成的任务可能得不到及时处理
*/
private static void case1() throws ExecutionException, InterruptedException {
final Random random = new Random();
ExecutorService service = Executors.newFixedThreadPool(10);
List<Future<String>> taskResultHolder = new ArrayList<>();
for(int i=0; i<50; i++) {
//搜集任务结果
taskResultHolder.add(service.submit(new Callable<String>() {
public String call() throws Exception {
Thread.sleep(random.nextInt(5000));
return Thread.currentThread().getName();
}
}));
}
// 处理任务结果
int count = 0;
System.out.println("handle result begin");
for(Future<String> future : taskResultHolder) {
System.out.println(future.get());
count++;
}
System.out.println("handle result end");
System.out.println(count + " task done !");
//关闭线程池
service.shutdown();
}
/**
* <二> 只对第一种情况进行的改进
* 1. 查看任务是否完成, 如果完成, 就获取任务的结果, 让后重任务列表中删除任务.
* 2. 如果任务未完成, 就跳过此任务, 继续查看下一个任务结果.
* 3. 如果到了任务列表末端, 那么就从新回到任务列表开始, 然后继续从第一步开始执行
*
* 这样就可以及时处理已完成任务的结果了
*/
private static void case2() throws ExecutionException, InterruptedException {
final Random random = new Random();
ExecutorService service = Executors.newFixedThreadPool(10);
List<Future<String>> results = new ArrayList<>();
for(int i=0; i<50; i++) {
Callable<String> task = new Callable<String>() {
public String call() throws Exception {
Thread.sleep(random.nextInt(5000)); //模拟耗时操作
return Thread.currentThread().getName();
}
};
Future<String> future = service.submit(task);
results.add(future); // 搜集任务结果
}
int count = 0;
//自旋, 获取结果
System.out.println("handle result begin");
for(int i=0; i<results.size(); i++) {
Future<String> taskHolder = results.get(i);
if(taskHolder.isDone()) { //任务完成
String result = taskHolder.get(); //获取结果, 进行某些操作
System.out.println("result: " + result);
results.remove(taskHolder);
i--;
count++; //完成的任务的计数器
}
//回到列表开头, 从新获取结果
if(i == results.size() - 1) i = -1;
}
System.out.println("handle result end");
System.out.println(count + " task done !");
//线程池使用完必须关闭
service.shutdown();
}
/**
* <三> 使用ExecutorCompletionService管理异步任务
* 1. Java中的ExecutorCompletionService<V>本身有管理任务队列的功能
* i. ExecutorCompletionService内部维护列一个队列, 用于管理已完成的任务
* ii. 内部还维护列一个Executor, 可以执行任务
*
* 2. ExecutorCompletionService内部维护了一个BlockingQueue, 只有完成的任务才被加入到队列中
*
* 3. 任务一完成就加入到内置管理队列中, 如果队列中的数据为空时, 调用take()就会阻塞 (等待任务完成)
* i. 关于完成任务是如何加入到完成队列中的, 请参考ExecutorCompletionService的内部类QueueingFuture的done()方法
*
* 4. ExecutorCompletionService的take/poll方法是对BlockingQueue对应的方法的封装, 关于BlockingQueue的take/poll方法:
* i. take()方法, 如果队列中有数据, 就返回数据, 否则就一直阻塞;
* ii. poll()方法: 如果有值就返回, 否则返回null
* iii. poll(long timeout, TimeUnit unit)方法: 如果有值就返回, 否则等待指定的时间; 如果时间到了如果有值, 就返回值, 否则返回null
*
* 解决了已完成任务得不到及时处理的问题
*/
static void case3() throws InterruptedException, ExecutionException {
Random random = new Random();
ExecutorService service = Executors.newFixedThreadPool(10);
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(service);
for(int i=0; i<50; i++) {
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(random.nextInt(5000));
return Thread.currentThread().getName();
}
});
}
int completionTask = 0;
while(completionTask < 50) {
//如果完成队列中没有数据, 则阻塞; 否则返回队列中的数据
Future<String> resultHolder = completionService.take();
System.out.println("result: " + resultHolder.get());
completionTask++;
}
System.out.println(completionTask + " task done !");
//ExecutorService使用完一定要关闭 (回收资源, 否则系统资源耗尽! .... 呵呵...)
service.shutdown();
}
}
九、参考
Java并发编程:Callable、Future和FutureTask
Java中的Runnable、Callable、Future、FutureTask的区别与示例
Java并发编程 - Executor,Executors,ExecutorService, CompletionServie,Future,Callable