JavaAPIAndroid知识Android开发经验谈

Java多线程:Executor,Executors,Futur

2017-06-27  本文已影响384人  一只好奇的茂

平时工作中经常碰到个各种多线程,有时候搞不清它们之间到底有什么区别,这次来个总体的总结,主要是以下这些:
Executor,Executors,ExecutorService, CompletionServie,Future,Callable,Runnable,FutureTask

一、Runnable(interface)

public interface Runnable {
    public void run();
}

run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

二、Callable (interface)

public interface Callable<V> {
    V call() throws Exception;
}

与 Runnable 不同的是call()函数返回的类型就是传递进来的V类型,而且能够抛出异常。一般情况下是配合ExecutorService来使用的

三、Future( interface)

Future是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

也就是说Future提供了三种功能:

四、FutureTask(Runnable, Future<V>的具体实现)

public class FutureTask<V> implements RunnableFuture<V> 。。。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值,管理任务。
其中有两个构造方法

   //接受一个 Callable 参数
   public FutureTask(Callable<V> callable) {
       if (callable == null)
           throw new NullPointerException();
       this.callable = callable;
       this.state = NEW;       // ensure visibility of callable
   }

   //接受一个 Runnable ,利用 Executors.callable 将Runnable 转换为Callable
   public FutureTask(Runnable runnable, V result) {
       this.callable = Executors.callable(runnable, result);
       this.state = NEW;       // ensure visibility of callable
   }

具体使用可以参考 AsyncTask 中的使用

五、Executor(interface)

在Executor框架中,使用执行器(Exectuor)来管理Thread对象,从而简化了并发编程。并发编程的一种编程方式把任务拆分为一系列的小任务,即Runnable,然后将这些任务提交给一个Executor执行,Executor.execute(Runnalbe) 。Executor在执行时使用其内部的线程池来完成操作。

Executor 接口中之定义了一个方法 execute(Runnable command),该方法接收一个 Runable 实例,它用来执行一个任务,任务即一个实现了 Runnable 接口的类。

public interface Executor {
    void execute(Runnable command);
}

为了避免调用 new Thread(new RunnableTask()).start()这样的代码我们可以

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...

Executor 并不是严格的要求一步执行,我们可以简单的直接在调用者线程执行运行提交的任务

class DirectExecutor implements Executor {
  public void execute(Runnable r) {
    r.run();    // 在调用者线程执行
}}

一般来说任务在非调用者的线程中执行,比如产生一个新的线程

class ThreadPerTaskExecutor implements Executor {
  public void execute(Runnable r) {
    new Thread(r).start();   //新启一个线程,在非调用者线程中执行
}}

有很多Executor 的实现是为了实现任务的某种调度,比如 AsyncTask 中的串行任务队列

class SerialExecutor implements Executor {
   final Queue tasks = new ArrayDeque<>();
   final Executor executor;
   Runnable active;

   SerialExecutor(Executor executor) {
     this.executor = executor;
   

   public synchronized void execute(final Runnable r) {
     tasks.add(new Runnable() {
       public void run() {
         try {
           r.run();
         } finally {
           scheduleNext();
         }
       }
     });
     if (active == null) {
       scheduleNext();
     }
   }

   protected synchronized void scheduleNext() {
     if ((active = tasks.poll()) != null) {
       executor.execute(active);
     }
   }
 }}

六、ExecutorService(interface,继承自Executor)

ExecutorService 接口继承自 Executor 接口,它提供了更丰富的实现多线程的方法,比如,ExecutorService 提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以调用 ExecutorService 的 shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致 ExecutorService 停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭 ExecutorService。因此我们一般用该接口来实现和管理多线程。

execute(Runnable)  
submit(Runnable)  
submit(Callable)  
invokeAny()  
invokeAll() 

execute(Runnable)
方法 execute(Runnable) 接收一个java.lang.Runnable 对象作为参数,并且以异步的方式执行它。

ExecutorService executorService = Executors.newSingleThreadExecutor();  
executorService.execute(new Runnable() {  
    public void run() {  
        System.out.println("Asynchronous task");  
    }  
});        
executorService.shutdown();  

使用这种方式没有办法获取执行 Runnable 之后的结果,如果你希望获取运行之后的返回值,就必须使用接收 Callable 参数的 execute() 方法。
submit(Runnable)
方法 submit(Runnable) 同样接收一个Runnable 的实现作为参数,但是会返回一个Future 对象。这个Future 对象可以用于判断 Runnable 是否结束执行。如下是一个ExecutorService 的 submit() 方法的例子:

Future future = executorService.submit(new Runnable() {  
    public void run() {  
        System.out.println("Asynchronous task");  
    }  
});  
//如果任务结束执行则返回 null  
System.out.println("future.get()=" + future.get());  

submit(Callable)
方法 submit(Callable) 和方法 submit(Runnable) 比较类似,但是区别则在于它们接收不同的参数类型。Callable 的实例与 Runnable 的实例很类似,但是 Callable 的 call() 方法可以返回一个结果。方法 Runnable.run() 则不能返回结果。
Callable 的返回值可以从方法 submit(Callable) 返回的 Future 对象中获取。如下是一个 ExecutorService Callable 的样例:

Future future = executorService.submit(new Callable(){  
    public Object call() throws Exception {  
        System.out.println("Asynchronous Callable");  
        return "Callable Result";  
    }  
});  
System.out.println("future.get() = " + future.get());  

输出结果

Asynchronous Callable  
future.get() = Callable Result  

inVokeAny()
方法 invokeAny() 接收一个包含 Callable 对象的集合作为参数。调用该方法不会返回 Future 对象,而是返回集合中某一个Callable 对象的结果,而且无法保证调用之后返回的结果是哪一个 Callable,只知道它是这些 Callable 中一个执行结束的 Callable 对象。如果一个任务运行完毕或者抛出异常,方法会取消其它的 Callable 的执行。
以下是一个样例:

ExecutorService executorService = Executors.newSingleThreadExecutor();  
Set<Callable<String>> callables = new HashSet<Callable<String>>();  

callables.add(new Callable<String>() {  
    public String call() throws Exception {  
        return "Task 1";  
    }  
});  
callables.add(new Callable<String>() {  
    public String call() throws Exception {  
        return "Task 2";  
    }  
});  
callables.add(new Callable<String>() {  
    public String call() throws Exception {  
        return "Task 3";  
    }  
});  
String result = executorService.invokeAny(callables);  
System.out.println("result = " + result);  
executorService.shutdown();  

输出结果:
以上样例代码会打印出在给定的集合中的某一个Callable 的返回结果。尝试运行后发现每次结果都在改变。有时候返回结果是"Task 1",有时候是"Task 2",等等。
invokeAll()
方法 invokeAll() 会调用存在于参数集合中的所有 Callable 对象,并且返回一个包含 Future 对象的集合,你可以通过这个返回的集合来管理每个 Callable 的执行结果。需要注意的是,任务有可能因为异常而导致运行结束,所以它可能并不是真的成功运行了。但是我们没有办法通过 Future 对象来了解到这个差异。
ExecutorService服务的关闭(shutdown() 或 shutdownNow())
当使用 ExecutorService 完毕之后,我们应该关闭它,这样才能保证线程不会继续保持运行状态。

举例来说,如果你的程序通过 main() 方法启动,并且主线程退出了你的程序,
如果还有一个活动的 ExecutorService 存在于程序中,那么程序将会继续保持运行状态。存在于 ExecutorService 中的活动线程会阻止Java虚拟机关闭。

为了关闭在 ExecutorService 中的线程,需要调用** shutdown() **方法。但ExecutorService 并不会马上关闭,而是不再接收新的任务,一旦所有的线程结束执行当前任务,ExecutorServie 才会真的关闭。所有在调用 shutdown() 方法之前提交到 ExecutorService 的任务都会执行。

如果你希望立即关闭 ExecutorService,你可以调用** shutdownNow() **方法。这个方法会尝试马上关闭所有正在执行的任务,并且跳过所有已经提交但是还没有运行的任务。但是对于正在执行的任务,是否能够成功关闭它是无法保证的,有可能他们真的被关闭掉了,也有可能它会一直执行到任务结束。这是一个最好的尝试。

ExecutorService 接口在 java.util.concurrent 包中有如下实现类:
ThreadPoolExecutor
ScheduledThreadPoolExecutor

七、Executors(class)

Executors 提供了一系列工厂方法用于创先线程池,返回的线程池都实现了 ExecutorService 接口。

public static ExecutorService newFixedThreadPool(int nThreads)
// 创建固定数目线程的线程池。

public static ExecutorService newCachedThreadPool()
// 创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。
// 如果现有线程没有可用的,则创建一个新线 程并添加到池中。
// 终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

public static ExecutorService newSingleThreadExecutor()
// 创建一个单线程化的Executor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
// 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

Executors的使用:

ExecutorService executorService = Executors.newFixedThreadPool(10);  
executorService.execute(new Runnable() {  
    public void run() {  
        System.out.println("Asynchronous task");  
    }  
});  
executorService.shutdown();  

该示例代码首先使用 newFixedThreadPool() 工厂方法创建一个ExecutorService ,上述代码创建了一个可以容纳10个线程任务的线程池。其次,向 execute() 方法中传递一个异步的 Runnable 接口的实现,这样做会让 ExecutorService 中的某个线程执行这个Runnable 线程。

八、CompletionServie

为什么需要CompletionServie
如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务(Completion service)。

CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。
CompletionService与ExecutorService的对比使用

public class Main {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
//        case1();
//        case2();
        case3();
    }


    /**
     * <一>
     * 1. 用List收集任务结果 (List记录每个submit返回的Future)
     * 2. 循环查看结果, Future不一定完成, 如果没有完成, 那么调用get会租塞
     * 3. 如果排在前面的任务没有完成, 那么就会阻塞, 这样后面已经完成的任务就没法获得结果了, 导致了不必要的等待时间.
     *    更为严重的是: 第一个任务如果几个小时或永远完成不了, 而后面的任务几秒钟就完成了, 那么后面的任务的结果都将得不到处理
     *
     * 导致: 已完成的任务可能得不到及时处理
     */
    private static void case1() throws ExecutionException, InterruptedException {
        final Random random = new Random();
        ExecutorService service = Executors.newFixedThreadPool(10);
        List<Future<String>> taskResultHolder = new ArrayList<>();
        for(int i=0; i<50; i++) {
            //搜集任务结果
            taskResultHolder.add(service.submit(new Callable<String>() {
                public String call() throws Exception {
                    Thread.sleep(random.nextInt(5000));
                    return Thread.currentThread().getName();
                }
            }));
        }
        // 处理任务结果
        int count = 0;
        System.out.println("handle result begin");
        for(Future<String> future : taskResultHolder) {
            System.out.println(future.get());
            count++;
        }
        System.out.println("handle result end");
        System.out.println(count + " task done !");

        //关闭线程池
        service.shutdown();
    }

    /**
     * <二> 只对第一种情况进行的改进
     *      1. 查看任务是否完成, 如果完成, 就获取任务的结果, 让后重任务列表中删除任务.
     *      2. 如果任务未完成, 就跳过此任务, 继续查看下一个任务结果.
     *      3. 如果到了任务列表末端, 那么就从新回到任务列表开始, 然后继续从第一步开始执行
     *
     *      这样就可以及时处理已完成任务的结果了
     */
    private static void case2() throws ExecutionException, InterruptedException {
        final Random random = new Random();
        ExecutorService service = Executors.newFixedThreadPool(10);
        List<Future<String>> results = new ArrayList<>();

        for(int i=0; i<50; i++) {
            Callable<String> task = new Callable<String>() {
                public String call() throws Exception {
                    Thread.sleep(random.nextInt(5000)); //模拟耗时操作
                    return Thread.currentThread().getName();
                }
            };
            Future<String> future = service.submit(task);
            results.add(future); // 搜集任务结果
        }

        int count = 0;
        //自旋, 获取结果
        System.out.println("handle result begin");
        for(int i=0; i<results.size(); i++) {
            Future<String> taskHolder = results.get(i);

            if(taskHolder.isDone()) { //任务完成
                String result = taskHolder.get(); //获取结果, 进行某些操作
                System.out.println("result: " + result);
                results.remove(taskHolder);
                i--;

                count++; //完成的任务的计数器
            }

            //回到列表开头, 从新获取结果
            if(i == results.size() - 1) i = -1;
        }
        System.out.println("handle result end");
        System.out.println(count + " task done !");

        //线程池使用完必须关闭
        service.shutdown();
    }


    /**
     * <三> 使用ExecutorCompletionService管理异步任务
     * 1. Java中的ExecutorCompletionService<V>本身有管理任务队列的功能
     *    i. ExecutorCompletionService内部维护列一个队列, 用于管理已完成的任务
     *    ii. 内部还维护列一个Executor, 可以执行任务
     *
     * 2. ExecutorCompletionService内部维护了一个BlockingQueue, 只有完成的任务才被加入到队列中
     *
     * 3. 任务一完成就加入到内置管理队列中, 如果队列中的数据为空时, 调用take()就会阻塞 (等待任务完成)
     *    i. 关于完成任务是如何加入到完成队列中的, 请参考ExecutorCompletionService的内部类QueueingFuture的done()方法
     *
     * 4. ExecutorCompletionService的take/poll方法是对BlockingQueue对应的方法的封装, 关于BlockingQueue的take/poll方法:
     *    i. take()方法, 如果队列中有数据, 就返回数据, 否则就一直阻塞;
     *    ii. poll()方法: 如果有值就返回, 否则返回null
     *    iii. poll(long timeout, TimeUnit unit)方法: 如果有值就返回, 否则等待指定的时间; 如果时间到了如果有值, 就返回值, 否则返回null
     *
     * 解决了已完成任务得不到及时处理的问题
     */
    static void case3() throws InterruptedException, ExecutionException {
        Random random = new Random();

        ExecutorService service = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(service);

        for(int i=0; i<50; i++) {
            completionService.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    Thread.sleep(random.nextInt(5000));
                    return Thread.currentThread().getName();
                }
            });
        }

        int completionTask = 0;
        while(completionTask < 50) {
            //如果完成队列中没有数据, 则阻塞; 否则返回队列中的数据
            Future<String> resultHolder = completionService.take();
            System.out.println("result: " + resultHolder.get());
            completionTask++;
        }

        System.out.println(completionTask + " task done !");

        //ExecutorService使用完一定要关闭 (回收资源, 否则系统资源耗尽! .... 呵呵...)
        service.shutdown();
    }
}

九、参考

Java并发编程:Callable、Future和FutureTask
Java中的Runnable、Callable、Future、FutureTask的区别与示例
Java并发编程 - Executor,Executors,ExecutorService, CompletionServie,Future,Callable

上一篇下一篇

猜你喜欢

热点阅读