Java并发编程

Java并发编程 - Executor

2019-05-16  本文已影响0人  HRocky

一、Executor接口

1. Executor初识

Executor是一个接口,位于java.util.concurrent包下,它只包含一个方法,如下:

void execute(Runnable command);

下面文字为API说明。

执行已提交的可运行任务的对象。该接口提供了将任务提交与如何运行每个任务的机制进行解耦的方式,包括线程使用、调度等的细节。通常使用Executor而不是显式地创建线程。例如,不是为每个任务调用new Thread(new RunnableTask())).start(),而是使用:

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...

然而,Executor接口并没有强制性要求任务的执行是异步的。在比较简单的情况下,executor可以在调用方线程中立即运行提交的任务:

class DirectExecutor implements Executor {
   public void execute(Runnable r) {
     r.run();
   }
}

更典型的情况是,任务在调用方线程以外的某个线程中执行。下面的执行器为每个任务生成一个新线程。

class ThreadPerTaskExecutor implements Executor {
   public void execute(Runnable r) {
     new Thread(r).start();
   }
}

许多执行器实现对任务的调度方式和时间施加了某种限制。下面的执行器顺序化了向第二个执行器提交任务的过程,并举例说明了复合执行器。

class SerialExecutor implements Executor {
   final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
   final Executor executor;
   Runnable active;

   SerialExecutor(Executor executor) {
     this.executor = executor;
   }

   public synchronized void execute(final Runnable r) {
     tasks.offer(new Runnable() {
       public void run() {
         try {
           r.run();
         } finally {
           scheduleNext();
         }
       }
     });
     if (active == null) {
       scheduleNext();
     }
   }

   protected synchronized void scheduleNext() {
     if ((active = tasks.poll()) != null) {
       executor.execute(active);
     }
   }
 }

此包中提供的了ExecutorService,这是一个更广泛的Executor接口。ThreadPoolExecutor类提供了一个可扩展的线程池实现。Executors类为这些执行器提供了方便的工厂方法。

内存一致性效果:线程中将 Runnable 对象提交到 Executor 之前的操作 happen-before其执行开始(可能在另一个线程中)。

2. 为什么要有这个接口

上面的API文档中说明了这个接口的作用,也就是为什么使用这个接口。有以下两点:

二、ExecutorService接口

ExecutorService接口继承自Executor接口,提供了更多的功能,下面是API的说明。

Executor提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future的方法。

可以关闭ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorService。shutdown()方法在终止前允许执行以前提交的任务,而shutdownNow()方法阻止等待任务启动并试图停止当前正在执行的任务。在终止时,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的ExecutorService以允许回收其资源。

通过创建并返回一个可用于取消执行和/或等待完成的Future,方法submit扩展了基本方法Executor.execute(java.lang.Runnable)。方法invokeAny 和invokeAll 是批量执行的最常用形式,它们执行任务collection,然后等待至少一个,或全部任务完成(可使用ExecutorCompletionService 类来编写这些方法的自定义变体)。

Executors类供了用于此包中所提供的执行程序服务的工厂方法。

用法示例

下面给出了一个网络服务的简单结构,这里线程池中的线程作为传入的请求。它使用了预先配置的Executors.newFixedThreadPool(int)工厂方法:

class NetworkService implements Runnable {
   private final ServerSocket serverSocket;
   private final ExecutorService pool;

   public NetworkService(int port, int poolSize)
       throws IOException {
     serverSocket = new ServerSocket(port);
     pool = Executors.newFixedThreadPool(poolSize);
   }

   public void run() { // run the service
     try {
       for (;;) {
         pool.execute(new Handler(serverSocket.accept()));
       }
     } catch (IOException ex) {
       pool.shutdown();
     }
   }
 }

class Handler implements Runnable {
   private final Socket socket;
   Handler(Socket socket) { this.socket = socket; }
   public void run() {
     // read and service request on socket
   }
}

下列方法分两个阶段关闭ExecutorService。第一阶段调用shutdown拒绝传入任务,然后调用shutdownNow(如有必要)取消所有遗留的任务:

void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
}

内存一致性效果:线程中向 ExecutorService提交Runnable或Callable任务之前的操作happen-before由该任务所提取的所有操作,后者依次happen-before通过Future.get() 获取的结果。

谈谈ExecutorService

上面在谈Executor的时候说到了对Thread对象的管理,它管理的其实还要加一个对任务的管理,也就是说执行器可以说是任务和Thread对象的容器。类似与Spring的Bean容器,ExecutorService提出了对容器进行关闭的方法。Executor中定义了执行Runnable对象的方法,这个方法是没有返回值的,而在实际的场景中,对任务的执行是希望能够得到返回值,同时支持异步获取,ExecutorService中还定义了一系列返回Future对象的方法。

上一篇 下一篇

猜你喜欢

热点阅读