java编程积累

FutureTask、CompletionService和Lis

2019-01-12  本文已影响0人  风一样的存在

1.FutureTask

可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,Executor框架利用FutureTask来完成异步任务,并可以用来进行任何潜在的耗时的计算。一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。

/**
 * @Auther: jack
 * @Date: 2018/10/30 22:14
 * @Description:
 */
public class FutureTaskService {
    /**
     * 生成车票
     *
     * @return
     */
    public List<String> createTickets() {
        List<String> list = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            list.add("车票" + i);
        }
        return list;
    }

    public void sellTicket() {
        //获取车票
        List<String> list = createTickets();

        List<FutureTask<String>> taskList = new ArrayList<>();
        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < list.size(); i++) {
            // 传入Callable对象创建FutureTask对象
            FutureTask<String> futureTask = new FutureTask<>(new Task(list.get(i)));
            taskList.add(futureTask);
            // 提交给线程池执行任务,executorService.invokeAll(taskList)一次性提交所有任务;
            executorService.submit(futureTask);
        }
        try {
            for (FutureTask<String> futureTask : taskList) {
                //FutureTask的get方法会自动阻塞,直到获取计算结果为止
                String result = futureTask.get();
                System.out.println("处理的返回值:" + result);
            }
        } catch (InterruptedException e) {
                e.printStackTrace();
        } catch (ExecutionException e) {
                e.printStackTrace();
        }finally {
            System.out.println("操作完毕");
            executorService.shutdown();
        }
    }

    public static void main(String[] args) {
        new FutureTaskService().sellTicket();
    }

    class Task implements Callable<String> {
        private String ticket;

        /**
         * 构造方法,用于参数传递
         *
         * @param ticket
         */
        public Task(String ticket) {
            this.ticket = ticket;
        }

        @Override
        public String call() throws Exception {
            System.out.println("窗口:" + Thread.currentThread().getName() + ",已卖" + ticket);
            return ticket;
        }
    }
}

运行结果:可以看出是按顺序返回每个任务的处理结果,验证了阻塞效果,第一个任务没有处理完是不会处理第二个任务的。


FutureTask.png

2. CompletionService

当向Executor提交批处理任务时,并且希望在它们完成后获得结果,如果用FutureTask,你可以循环获取task,并用future.get()去获取结果,但是如果这个task没有完成,你就得阻塞在这里,这个实效性不高,其实在很多场合,其实你拿第一个任务结果时,此时结果并没有生成并阻塞,其实在阻塞在第一个任务时,第二个task的任务已经早就完成了,显然这种情况用FutureTask不合适的,效率也不高的。

/**
 * @Auther: jack
 * @Date: 2018/10/31 22:14
 * @Description: jdk实现的CompletionService(先执行完的先输出结果)
 */
public class CompleteService {
    /**
     * 生成车票
     *
     * @return
     */
    public List<String> createTickets() {
        List<String> list = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            list.add("车票" + i);
        }
        return list;
    }

    public void sellTicket() {
        //获取车票
        List<String> list = createTickets();

        //定义线程数
        ExecutorService pool = Executors.newFixedThreadPool(5);
        CompletionService<String> completionService= new ExecutorCompletionService<String>(pool);
        for (int i = 0; i < list.size(); i++) {
            completionService.submit(new CompleteService.Task(list.get(i)));
        }

        try {
            //所有都执行完毕
            for(int i=0;i<list.size();i++){
                String result=completionService.take().get();
                System.out.println("处理的返回值:" + result.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("操作完毕");
            pool.shutdown();
        }
    }

    public static void main(String[] args) {
        new CompleteService().sellTicket();
    }

    class Task implements Callable<String> {
        private String ticket;

        /**
         * 构造方法,用于参数传递
         *
         * @param ticket
         */
        public Task(String ticket) {
            this.ticket = ticket;
        }

        @Override
        public String call() throws Exception {
            System.out.println("窗口:" + Thread.currentThread().getName() + ",已卖" + ticket);
            return ticket;
        }
    }
}

运行结果:可以看出先执行完的任务先返回处理结果。


CompletionService.png

3. ListenableFuture

ListenableFuture是guava提供的一个类,ListenableFuture是对原有Future的增强,可以用于监听Future任务的执行状况,是执行成功还是执行失败,并提供响应的接口用于对不同结果的处理。

/**
 * @Auther: jack
 * @Date: 2018/9/19 17:57
 * @Description:  模拟
 */
@Slf4j
public class GuavaMultiThread {

    /**
     * 生成车票
     *
     * @return
     */
    public List<String> createTickets() {
        List<String> list = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            list.add("车票" + i);
        }
        return list;
    }

    public void sellTicket() {
        //获取车票
        List<String> list = createTickets();

        List<ListenableFuture<String>> futures = Lists.newArrayList();
        //定义线程数
        ExecutorService pool = Executors.newFixedThreadPool(5);
        ListeningExecutorService executorService = MoreExecutors.listeningDecorator(pool);
        for (int i = 0; i < list.size(); i++) {
            futures.add(executorService.submit(new Task(list.get(i))));
            //为每一个执行过程添加回调处理
            /*Futures.addCallback(executorService.submit(new Task(list.get(i))), new FutureCallback<String>() {
                @Override
                public void onSuccess(@Nullable String s) {
                    System.out.println("操作成功:"+s);
                }

                @Override
                public void onFailure(Throwable throwable) {
                    System.out.println("操作失败!");
                }
            },pool);*/
        }

        final ListenableFuture<List<String>> resultsFuture = Futures.successfulAsList(futures);
        try {//所有都执行完毕
            List<String> result=resultsFuture.get();
            System.out.println("处理的返回值:"+result.toString());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("操作完毕");
            pool.shutdown();
        }
    }

    public static void main(String[] args) {
        new GuavaMultiThread().sellTicket();
    }

    class Task implements Callable<String> {
        private String ticket;

        /**
         * 构造方法,用于参数传递
         *
         * @param ticket
         */
        public Task(String ticket) {
            this.ticket = ticket;
        }

        @Override
        public String call() throws Exception {
            System.out.println("窗口:"+Thread.currentThread().getName()+",已卖" + ticket);
            return ticket;
        }
    }
}

运行结果:


ListenableFuture.png
上一篇下一篇

猜你喜欢

热点阅读