并发处理利器-CompletionService

2017-07-22  本文已影响0人  小吴酱呵呵
版权声明:本文为博主原创文章,未经博主允许不得转载。

摘要

考虑这样一个需求,并发处理一批任务,每个任务都完成之后,对结果做一些后续处理,最后汇总结果。第一个方案:启动多个线程并发处理任务,并循环监控每一个线程的处理结果Futrue,直到所有Future返回为止。这个方案可行,但还需要自己监控所有的结果完成情况,是不是很乏味。来试试CompletionService吧。

CompletionService

先看看这个接口定义了哪些方法:

从获取结果的几个接口可以看到,返回的都是最新完成的结果。这就是重点所在,我们可以不需要去监控等待每一个结果(如果等待的第一个Future是最慢的,岂不是会妨碍其他先完成的任务吗),而是按结果完成顺序得到了每一个返回结果,先完成的结果可以先继续执行后续处理,这不是挺好嘛。

ExecutorCompletionService是该接口的实现类,内部有一个线程池和BlockingQueue队列。它的实现原理其实挺简单:每个提交给ExecutorCompletionService的任务,都会被封装成一个QueueingFuture(FutureTask的子类),它重写了done()方法(该方法会在任务执行完成之后回调),将执行完成的FutureTask加入到内部队列,take()等方法其实是到内部队列中获取得到最新完成的结果FutrueTask。

对比

从代码层面来看看两种方案的差异:

public void test1() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    List<Future<String>> results = new ArrayList<Future<String>>(10);
    for(int i=0; i<10; i++) {
        Future<String> result = executorService.submit(new MyRunnable());
        results.add(result);
    }
    for(Future result : results) {
        String str = result.get();//遍历等待每一个Future, 如果第一个任务是最慢的,那么整个进度就会被拖慢
        //do something
    }
    //汇总操作
    executorService.shutdown();
}
public void test2() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    CompletionService<String> completionService = new executorCompletionService<String>(executorService);
    for(int i=0; i<10; i++) {
        completionService.submit(new MyRunnable());
    }
    for(int i=0; i<10; i++) {
        String str = completionService.take().get();//先完成的结果先执行后续处理
        //do something
    }
    //汇总操作
    executorService.shutdown();
}

从代码量上看差异比较少,但是方案2不用单独维护一个List来保存所有的处理结果Future。重要的是,completionService因为任务结果按完成顺序陆续到来,每个任务的进度不会相互干扰,那么后续操作也不会相互影响,而第一种方案中如果第一个任务很慢,那么其他任务都要空闲等待第一个任务完成,才能继续后面的操作,这一点就明显影响到了性能。

小心踩坑

在测试方法中,为了演示而创建了线程池,方法结束时也关闭了线程池。如果你的代码与示例代码类似,那么请记住关闭线程池,否则即使方法退出之后,创建的线程也得不到回收和关闭,迟早将耗尽资源或撑爆内存。如果你的线程池是全局共享的,那么不存在这个问题,JVM关闭时会关闭线程池。

public void test3() {
    Future<String> future = completionService.submit(new MyRunnable());//这里的线程池是共享的
    String str = future.get();
    //do something
}

该场景也许不太合适使用completeService,但是这里要说明的是另一个问题,直接使用completionService.submit的返回结果Future会造成内存泄漏,因为该方式只关心获取当前返回的结果,而忽略了BlockingQueue中保存的Future对象,BlockingQueue队列会不断变大(默认实现是LinkedBlockingQueue,无界队列),迟早将内存撑爆。正确的使用方式还是通过completionService.take()来获取Future对象。

如有什么地方描述不对,欢迎指出。

上一篇下一篇

猜你喜欢

热点阅读