并发处理利器-CompletionService
版权声明:本文为博主原创文章,未经博主允许不得转载。
摘要
考虑这样一个需求,并发处理一批任务,每个任务都完成之后,对结果做一些后续处理,最后汇总结果。第一个方案:启动多个线程并发处理任务,并循环监控每一个线程的处理结果Futrue,直到所有Future返回为止。这个方案可行,但还需要自己监控所有的结果完成情况,是不是很乏味。来试试CompletionService吧。
CompletionService
先看看这个接口定义了哪些方法:
- Future<V> submit(Callable<V> task); 提交Callable任务,并返回Future结果。
- Future<V> submit(Runnable task, V result); 与上一个方法类似,当任务完成时返回指定的result对象。
- Future<V> take() throws InterruptedException; 获取并移除最新完成的任务结果,该过程是阻塞的。
- Future<V> poll(); 获取并移除最新完成的任务结果,如果没有结果则返回null。
- Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; 指定超时时间等待获取并移除最新的任务结果。
从获取结果的几个接口可以看到,返回的都是最新完成的结果。这就是重点所在,我们可以不需要去监控等待每一个结果(如果等待的第一个Future是最慢的,岂不是会妨碍其他先完成的任务吗),而是按结果完成顺序得到了每一个返回结果,先完成的结果可以先继续执行后续处理,这不是挺好嘛。
ExecutorCompletionService是该接口的实现类,内部有一个线程池和BlockingQueue队列。它的实现原理其实挺简单:每个提交给ExecutorCompletionService的任务,都会被封装成一个QueueingFuture(FutureTask的子类),它重写了done()方法(该方法会在任务执行完成之后回调),将执行完成的FutureTask加入到内部队列,take()等方法其实是到内部队列中获取得到最新完成的结果FutrueTask。
对比
从代码层面来看看两种方案的差异:
public void test1() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Future<String>> results = new ArrayList<Future<String>>(10);
for(int i=0; i<10; i++) {
Future<String> result = executorService.submit(new MyRunnable());
results.add(result);
}
for(Future result : results) {
String str = result.get();//遍历等待每一个Future, 如果第一个任务是最慢的,那么整个进度就会被拖慢
//do something
}
//汇总操作
executorService.shutdown();
}
public void test2() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletionService<String> completionService = new executorCompletionService<String>(executorService);
for(int i=0; i<10; i++) {
completionService.submit(new MyRunnable());
}
for(int i=0; i<10; i++) {
String str = completionService.take().get();//先完成的结果先执行后续处理
//do something
}
//汇总操作
executorService.shutdown();
}
从代码量上看差异比较少,但是方案2不用单独维护一个List来保存所有的处理结果Future。重要的是,completionService因为任务结果按完成顺序陆续到来,每个任务的进度不会相互干扰,那么后续操作也不会相互影响,而第一种方案中如果第一个任务很慢,那么其他任务都要空闲等待第一个任务完成,才能继续后面的操作,这一点就明显影响到了性能。
小心踩坑
- 关闭线程池
在测试方法中,为了演示而创建了线程池,方法结束时也关闭了线程池。如果你的代码与示例代码类似,那么请记住关闭线程池,否则即使方法退出之后,创建的线程也得不到回收和关闭,迟早将耗尽资源或撑爆内存。如果你的线程池是全局共享的,那么不存在这个问题,JVM关闭时会关闭线程池。
- 错误的使用方式
public void test3() {
Future<String> future = completionService.submit(new MyRunnable());//这里的线程池是共享的
String str = future.get();
//do something
}
该场景也许不太合适使用completeService,但是这里要说明的是另一个问题,直接使用completionService.submit的返回结果Future会造成内存泄漏,因为该方式只关心获取当前返回的结果,而忽略了BlockingQueue中保存的Future对象,BlockingQueue队列会不断变大(默认实现是LinkedBlockingQueue,无界队列),迟早将内存撑爆。正确的使用方式还是通过completionService.take()来获取Future对象。
如有什么地方描述不对,欢迎指出。