一些收藏Java

Java并发编程——ExecutorCompletionServ

2022-12-05  本文已影响0人  小波同学

一、简介

在JDK并发包中有这么一个类ExecutorCompletionService,提交任务后,可以按任务返回结果的先后顺序来获取各任务执行后的结果。

该类实现了接口CompletionService:

public interface CompletionService<V> {
    
    Future<V> submit(Callable<V> task);
 
    Future<V> submit(Runnable task, V result);
 
    Future<V> take() throws InterruptedException;
 
    Future<V> poll();
 
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
 
}

该接口定义了一系列方法:提交实现了Callable或Runnable接口的任务,并获取这些任务的结果。

CompletionService接口定义了一组任务管理接口:

关于CompletionService和ExecutorCompletionService的类图如下:


ExecutorCompletionService实现了CompletionService,内部通过Executor以及BlockingQueue来实现接口提出的规范。其中,Executor由调用者传递进来,而Blocking可以使用默认的LinkedBlockingQueue,也可以由调用者传递。另外,该类还会将提交的任务封装成QueueingFuture,这样就可以实现FutureTask.done()方法,以便于在任务执行完毕后,将结果放入阻塞队列中。

QueueingFuture为内部类:

private static class QueueingFuture<V> extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task,
                   BlockingQueue<Future<V>> completionQueue) {
        super(task, null);
        this.task = task;
        this.completionQueue = completionQueue;
    }
    private final Future<V> task;
    private final BlockingQueue<Future<V>> completionQueue;
    protected void done() { completionQueue.add(task); }
}

其中,done()方法就是在任务执行完毕后,将任务放入队列中。

在提交任务时,将任务封装成QueueingFuture:

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture<V>(f, completionQueue));
    return f;
}

public Future<V> submit(Runnable task, V result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task, result);
    executor.execute(new QueueingFuture<V>(f, completionQueue));
    return f;
}

在调用take()、poll()方法时,会从阻塞队列中获取Future对象,以取得任务执行的结果。

二、原理

当我们向Executor提交一组任务,并且希望任务在完成后获得结果,此时可以考虑使用ExecutorCompletionService。

ExecutorCompletionService实现了CompletionService接口。ExecutorCompletionService将Executor和BlockingQueue功能融合在一起,使用它可以提交我们的Callable任务。这个任务委托给Executor执行,可以使用ExecutorCompletionService对象的take和poll方法获取结果。

ExecutorCompletionService的设计目的在于提供一个可获取线程池执行结果的功能,这个类采用了装饰器模式,需要用户提供一个自定义的线程池,在ExecutorCompletionService内部持有该线程池进行线程执行,在原有的线程池功能基础上装饰额外的功能。

下面是ExecutorCompletionService的原理图:


下面是使用队列和不使用队列的流程对比,从图中我们可以看出,在使用队列的场景下,我们可以优先获取到完成的线程,当我们要汇总所有的执行结果时,这无疑会缩减我们的汇总时间。

而不使用队列时,我们需要对FutureTask进行遍历,因为我们不知道哪个线程先执行完了,只能挨个去获取结果,这样已经完成的线程会因为前面未完成的线程的耗时而无法提前进行汇总。

如果算上汇总结果的耗时时间:


在使用队列的场景下,我们可以在其他任务线程执行的过程中汇总已完成的结果,节省汇总时间。不使用队列的场景下,只用等到当前的线程执行完成才能汇总。

代码演示

public class ExecutorCompletionServiceTest {
    public static void main(String[] args) throws Exception {
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        ExecutorCompletionService ecs = new ExecutorCompletionService(fixedThreadPool);
        Future future = ecs.submit(new Callable() {
            @Override
            public Integer call() throws Exception {
                return ThreadLocalRandom.current().nextInt(100);
            }
        });
        System.out.println("future:" + future.get());

        //用于取出最新的线程执行结果,注意这里是阻塞的
        //Future future1 = ecs.take();
        //System.out.println("future1:" + future1.get());

        //用于取出最新的线程执行结果,是非阻塞的,如果没有结果就返回null
        Future future2 = ecs.poll();
        if (future2.isDone()) {
            System.out.println(future2.get());
        }
    }
}


//多个线程,先执行完的进阻塞队列,然后可以按执行顺序获取结果
public class ExecutorCompletionServiceDemo {

    public static void main(String[] args) {

        //这里只是为了方便,真正项目中不要这样创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);

        completionService.submit(() -> {
            System.out.println("执行任务1开始");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行任务1结束");
            return "任务1执行成功";
        });

        completionService.submit(() -> {
            System.out.println("执行任务2开始");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行任务2结束");
            return "任务2执行成功";
        });

        completionService.submit(() -> {
            System.out.println("执行任务3开始");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行任务3结束");
            return "任务3执行成功";
        });

        for (int i = 0; i < 3; i++) {
            try {
                String result = completionService.take().get();
                System.out.println(result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

        }
        executorService.shutdown();
    }
}

源码

public class ExecutorCompletionService<V> implements CompletionService<V> {

    //执行任务的线程池
    private final Executor executor;
    
    //用于调用AbstractExecutorService的newTaskFor方法,来实例化一个实现了RunnableFuture接口的对象
    //如果executor继承了AbstractExecutorService ,则直接调用executor的newTaskFor方法
    //否则直接创建一个FutureTask对象
    private final AbstractExecutorService aes;
    
    //任务完成后放入该阻塞队列中
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * 用于放入执行完成的任务
     */
    private static class QueueingFuture<V> extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
        //重写了FutureTask的done方法,任务完成后,将任务放入阻塞队列中
        protected void done() { completionQueue.add(task); }
    }

    //将传入的Callable包装为RunnableFuture
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }
    
    //将传入的Callable包装为RunnableFuture
    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and a
     * {@link LinkedBlockingQueue} as a completion queue.
     *
     * @param executor the executor to use
     * @throws NullPointerException if executor is {@code null}
     */
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        //completionQueue默认为LinkedBlockingQueue
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and the supplied queue as its
     * completion queue.
     *
     * @param executor the executor to use
     * @param completionQueue the queue to use as the completion queue
     *        normally one dedicated for use by this service. This
     *        queue is treated as unbounded -- failed attempted
     *        {@code Queue.add} operations for completed tasks cause
     *        them not to be retrievable.
     * @throws NullPointerException if executor or completionQueue are {@code null}
     */
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    /**
     * 提交任务,任务被包装为QueueingFuture对象,主要重写FutureTask的done方法,
     * 使得任务执行完毕后被执行任务的线程放入到阻塞队列中
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }
    
    //从阻塞队列中获取任务
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }
    
    //如果完成队列中有数据就返回, 否则返回null
    public Future<V> poll() {
        return completionQueue.poll();
    }

    //如果完成队列中有数据就直接返回, 否则等待指定的时间, 到时间后如果还是没有数据就返回null
    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}

参考:
https://blog.csdn.net/ado1986/article/details/37758105

https://www.cnblogs.com/xfeiyun/p/16373387.html

https://blog.csdn.net/weixin_42103620/article/details/119080939

http://t.zoukankan.com/zouhong-p-14089982.html

上一篇下一篇

猜你喜欢

热点阅读