并发包之CompletionService

2016-06-16  本文已影响135人  破晓追风

CompletionService简介

将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。

通常,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此方法的一个实现。

内存一致性效果:线程中向 CompletionService 提交任务之前的操作 happen-before 该任务执行的操作,后者依次 happen-before 紧跟在从对应 take() 成功返回的操作。

CompletionService接口

public interface CompletionService<V> {  
  Future<V> submit(Callable<V> task);  
  Future<V> submit(Runnable task, V result); 
  Future<V> take() throws InterruptedException; 
  Future<V> poll(); 
  Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

ExecutorCompletionService

ExecutorCompletionService 是CompletionService的实现类,使用提供的 Executor 来执行任务的 CompletionService。此类将安排那些完成时提交的任务,把它们放置在可使用 take 访问的队列上。该类非常轻便,适合于在执行几组任务时临时使用。

构造器

public class ExecutorCompletionService<V> implements CompletionService<V> {    
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
    
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
          throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
    public ExecutorCompletionService(Executor executor,                                 BlockingQueue<Future<V>> completionQueue) {  
        if (executor == null || completionQueue == null)  
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }
}

ExecutorCompletionService是Executor和BlockingQueue的结合体,任务的提交和执行都是委托给Executor来完成。

提交任务

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}
public Future<V> submit(Runnable task, V result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task, result);
    executor.execute(new QueueingFuture(f));
    return f;
}

当提交某个任务时,该任务首先将被包装为一个QueueingFuture,该类是FutureTask的一个子类,通过改写该子类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

获取结果

通过使用BlockingQueue的take或poll方法,则可以得到结果。在BlockingQueue不存在元素时,这两个操作会阻塞,一旦有结果加入,则立即返回。

public Future<V> take() throws InterruptedException {
    return completionQueue.take();
}
public Future<V> poll() {
    return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
    return completionQueue.poll(timeout, unit);
}

参考

http://xw-z1985.iteye.com/blog/1997077
http://www.tuicool.com/articles/umyy6b

上一篇下一篇

猜你喜欢

热点阅读