多线程核心源码--从自己实现到解读源代码

线程池(一)

2020-01-25  本文已影响0人  小蜗牛Aaron

Executor

边界

Executor ExecutorService AbstractExecutorService ThreadPoolExecutor ForkJoinPool ScheduledExecutorService ScheduledThreadPoolExecutor Executors FurtureTask

衍生点

ExecutorCompletionService
ThreadFactory
SynchronousQueue

接口简介

一个执行提交的Runnable任务的对象的实现接口。这个接口提供了一种将任务提交与每个任务如何运行的机制分离的方法。包含了线程使用调度的细节。通常用于代替显式创建线程。例如相比于对于每一个task执行

new Thread(new RunnableTask()).start()

你可能会使用

Executor executor = anExecutor();
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

的方式。然而Executor接口并不严格要求是异步的。简单来说就是一个任务可以被调用的线程立刻执行,通过

class DirectExecutor implements Executor {
   public void execute(Runnable r) {
      r.run();
   }
 }}

另外 任务可以被另一个线程执行而不是调用者的线程

class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
      new Thread(r).start();
    }
 }}

许多这个接口的实现对任务的安排方式和时间施加某种限制。下面的执行程序将任务的提交序列化到第二个执行程序,演示了复合执行程序。

class SerialExecutor implements Executor {
   final Queue<Runnable> tasks = new ArrayDeque<>();
   final Executor executor;
   Runnable active;

   SerialExecutor(Executor executor) {
      this.executor = executor;
   }
 
   public synchronized void execute(Runnable r) {
      tasks.add(() -> {
        try {
          r.run();
        } finally {
          scheduleNext();
        }
      });
      if (active == null) {
        scheduleNext();
      }
  }
 
   protected synchronized void scheduleNext() {
      if ((active = tasks.poll()) != null) {
        executor.execute(active);
      }
    }

ExecutorService

提供用于管理终止的方法的{@link Executor},以及可以生成用于跟踪一个或多个异步任务的进度的{@link Future}的方法。

一个ExecutorService可以被终止,也就是让他拒绝新的任务。有两种方法可以终止线程池,一种是shutdown,另外一种是shutdownNOw。
shutdown:提供用于管理终止的方法的{@link Executor},以及允许在终止之前执行先前提交的任务的方法

shutdownNow:防止启动正在等待的任务,并尝试停止当前正在执行的任务。
在终止时,执行程序没有正在执行的任务,没有等待执行的任务,也不能提交新的任务。应该关闭未使用的{@code ExecutorService}以允许回收其资源。

方法{@code submit}通过创建和返回一个{@link Future}来扩展基本方法{@link Executor#execute(Runnable)},该{@link Future}可用于取消执行和/或等待完成。方法{@code invokeAny}和{@code invokeAll}执行最常用的批量执行形式,执行一组任务,然后等待至少一个或全部任务完成。(Class {@link ExecutorCompletionService}可用于编写这些方法的定制变体。)

Executors类提供了这个包下面的工厂方法。

下面是一个网络服务的示意图,其中线程池服务中的线程传入请求。它使用预先配置的{@link execu# newFixedThreadPool}工厂方法:

class NetworkService implements Runnable {
   private final ServerSocket serverSocket;
   private final ExecutorService pool;

  public NetworkService(int port, int poolSize)
       throws IOException {
     serverSocket = new ServerSocket(port);
     pool = Executors.newFixedThreadPool(poolSize);
   }
 
  public void run() { // run the service
     try {
       for (;;) {
         pool.execute(new Handler(serverSocket.accept()));
       }
     } catch (IOException ex) {
       pool.shutdown();
     }
   }
 }

 class Handler implements Runnable {
   private final Socket socket;
   Handler(Socket socket) { this.socket = socket; }
   public void run() {
     // read and service request on socket
   }
 }}

下面的方法分两个阶段关闭{@code ExecutorService},首先调用{@code shutdown}来拒绝传入的任务,然后调用{@code shutdownNow},如果需要的话,取消任何延迟任务:

void shutdownAndAwaitTermination(ExecutorService pool) {
  pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
      }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }}

内存一致性效应:在向{@code ExecutorService}提交{@code Runnable}或{@code Callable}任务之前的线程操作发生—在该任务采取任何操作之前,而该操作又发生—在通过{@code Future.get()}检索结果之前。

方法

shutdown

启动有序关闭,在此过程中执行以前提交的任务,但不接受任何新任务。如果已经关闭,调用将没有其他效果。
此方法不等待以前提交的任务完成执行。使用{@link #awaitTermination awaitTermination}来完成。

shutdownNow

尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。

此方法不会等待正在执行的任务终止。使用{@link #awaitTermination awaitTermination}来完成。

除了尽最大努力停止处理正在执行的任务外,没有任何保证。例如,典型的实现将通过{@link Thread#interrupt}取消,因此任何未能响应中断的任务都可能永远不会终止。

isshutdown

返回当前线程池是否被终止

isTerminated

如果所有任务都在关闭后完成,则返回{@code true}。注意,{@code isTerminated}从来不是{@code true},除非首先调用{@code shutdown}或{@code shutdownNow}。

awaitTermination

阻塞,直到所有任务在关闭请求后完成执行,或超时发生,或当前线程中断,以先发生的情况为准。

submit(Callable)

提交一个返回值的任务以供执行,并返回一个表示该任务未决结果的Future。未来的{@code get}方法将在成功完成任务后返回任务的结果。

如果希望立即阻塞等待任务,可以使用{@code result = exec.submit(aCallable).get()的方式

注意:{@link executor}类包含一组方法,可以转换其他一些类似关闭的对象,例如{@link java.security。PrivilegedAction} to {@link Callable}表单,以便提交。

submit(Runnable)

提交一个可运行任务以供执行,并返回一个表示该任务的Future。将来的{@code get}方法将在成功完成时返回{@code null}。

invokeAll

执行给定的任务,返回一个期货列表,其中包含所有任务完成时的状态和结果。对于返回列表的每个元素,{@link Future#isDone}是{@code true}。请注意,已完成的任务可以正常终止,也可以通过抛出异常终止。如果在执行此操作时修改了给定的集合,则此方法的结果是未定义的。

invokeAll(限时)

执行给定的任务,当所有任务完成或超时结束时,返回包含状态和结果的期货列表(以先发生的情况为准)。对于返回列表的每个元素,{@link Future#isDone}是{@code true}。返回时,未完成的任务将被取消。请注意,已完成的任务可以正常终止,也可以通过抛出异常终止。如果在执行此操作时修改了给定的集合,则此方法的结果是不可预期的。

invokeAny

执行给定的任务,返回一个已成功完成的任务的结果(即,不抛出异常),如果有的话。在正常或异常返回时,未完成的任务将被取消。如果在执行此操作时修改了给定的集合,则此方法的结果是未定义的。

invokeAny(限时)

执行给定的任务,返回一个已成功完成的任务的结果(即,而不抛出异常),如果在给定超时超时之前有任何操作。在正常或异常返回时,未完成的任务将被取消。如果在执行此操作时修改了给定的集合,则此方法的结果是未定义的。

AbstractExecutorService

提供了ExecutorService接口的方法的默认实现,这个类使用{@code newTaskFor}返回的{@link RunnableFuture}实现{@code submit}、{@code invokeAny}和{@code invokeAll}方法,{@code newTaskFor}默认为这个包中提供的{@link FutureTask}类。例如,{@code submit(Runnable)}的实现创建了一个执行并返回的关联{@code RunnableFuture}。子类可以覆盖{@code newTaskFor}方法来返回{@code RunnableFuture}实现,而不是{@code FutureTask}。

扩展的例子。下面是一个类的示意图,它继承{@link ThreadPoolExecutor}来使用{@code CustomTask}类代替默认的{@code FutureTask}

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
 
   static class CustomTask<V> implements RunnableFuture<V> {...}

   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
       return new CustomTask<V>(c);
   }
   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
       return new CustomTask<V>(r, v);
   }
   // ... add constructors, etc.
 }}

类结构

image.png

newTaskFor(Runnable)

返回一个给定的Runnable和默认值的RunnableFuture

newTaskFor(Callable)

返回一个给定的Callable的RunnableFuture

ThreadPoolExecutor

一个{@link ExecutorService},它使用可能的几个池线程之一执行每个提交的任务,通常使用{@link execuors}工厂方法配置。

线程池解决了两个不同的问题:由于减少了每个任务的调用开销,线程池通常在执行大量异步任务时提供了更好的性能,并且提供了一种绑定和管理资源(包括执行任务集合时消耗的线程)的方法。每个{@code ThreadPoolExecutor}还维护一些基本统计信息,比如完成的任务数量。

为了在广泛的上下文中有用,这个类提供了许多可调参数和可扩展性挂钩。然而,程序员要求使用更方便{@link执行人}工厂方法{@link执行人# newCachedThreadPool}(无界的线程池,线程自动回收),{@link执行人# newFixedThreadPool}(固定大小的线程池)和{@link执行人# newSingleThreadExecutor}(单个后台线程),preconfigure设置为最常见的使用场景。否则,在手动配置和调优该类时,请使用以下指南:

核心和最大池大小

{@code ThreadPoolExecutor}将根据corePoolSize(参见{@link #getCorePoolSize})和maximumPoolSize(参见{@link #getMaximumPoolSize})设置的界限自动调整池的大小(参见{@link #getPoolSize})。

当在方法{@link #execute(Runnable)}中提交新任务时,如果运行的线程小于corePoolSize,则创建一个新线程来处理请求,即使其他工作线程处于空闲状态。否则,如果运行的线程小于maximumPoolSize,则只在队列满时创建一个新线程来处理请求。通过将corePoolSize和maximumPoolSize设置为相同的值,您可以创建一个固定大小的线程池。通过将maximumPoolSize设置为一个基本无界的值,例如{@code Integer。MAX_VALUE},允许池容纳任意数量的并发任务。最典型的情况是,核心和最大池大小仅在构建时设置,但也可以使用{@link #setCorePoolSize}和{@link #setMaximumPoolSize}动态更改它们。

默认情况下,即使是核心线程也只是在新任务到达时才创建和启动,但是可以使用方法{@link #prestartCoreThread}或{@link #prestartAllCoreThreads}动态地覆盖它。如果使用非空队列构造池,则可能需要预启动线程。

使用{@link ThreadFactory}创建新线程。如果没有另外指定,则使用{@link executor #defaultThreadFactory},它创建的线程都在相同的{@link ThreadGroup}中,并且具有相同的{@code NORM_PRIORITY}优先级和非守护状态。通过提供不同的ThreadFactory,您可以更改线程的名称、线程组、优先级、守护进程状态等。如果{@code ThreadFactory}通过从{@code newThread}返回null来创建线程失败,执行程序将继续执行,但可能无法执行任何任务。线程应该拥有“modifyThread”{@code RuntimePermission}。如果工作线程或使用池的其他线程不拥有此权限,服务可能会降级:配置更改可能不会及时生效,关闭池可能仍然处于可能终止但尚未完成的状态。

如果当前池中有超过corePoolSize的线程,那么如果空闲时间超过keepAliveTime(参见{@link #getKeepAliveTime(TimeUnit)}),则多余的线程将被终止。这提供了一种在池没有被积极使用时减少资源消耗的方法。如果以后池变得更活跃,就会构造新的线程。还可以使用方法{@link #setKeepAliveTime(long, TimeUnit)}动态更改此参数。使用一个值{@code Long。MAX_VALUE} {@link TimeUnit#纳秒}有效地禁止空闲线程在关闭之前终止。默认情况下,keep-alive策略只适用于拥有多个corePoolSize线程的情况,但是方法{@link #allowCoreThreadTimeOut(boolean)}也可以用于将这个超时策略应用于核心线程,只要keepAliveTime值不为0。

任何{@link BlockingQueue}都可以用来传输和保存提交的任务。此队列的使用与池大小调整相互作用

如果运行的线程小于corePoolSize,那么执行程序倾向于添加新线程而不是排队。

如果运行的线程大于corePoolSize,执行程序总是希望对请求进行排队,而不是添加新线程。

如果一个请求不能排队,那么将创建一个新线程,除非这个线程超过了maximumPoolSize,在这种情况下,任务将被拒绝。

排队有三种基本策略:
1、直接传递
工作队列的一个很好的默认选择是{@link SynchronousQueue},它将任务传递给线程,而不会占用线程。在这里,如果没有立即可用的线程来运行任务,则对任务进行排队的尝试将失败,因此将构造一个新线程。此策略在处理可能具有内部依赖项的请求集时避免锁定。直接移交通常需要无界的最大池大小,以避免拒绝新提交的任务。反过来,当命令的平均到达速度超过它们的处理速度时,就有可能出现无限的线程增长。

2.无界队列
使用无界队列(例如,没有预定义容量的{@link LinkedBlockingQueue})将导致新任务在所有corePoolSize线程繁忙时在队列中等待。因此,创建的线程不会超过corePoolSize。(因此,maximumPoolSize的值没有任何影响。)当每个任务完全独立于其他任务时,这可能是合适的,因此任务不会影响其他任务的执行;例如,在web页面服务器中。虽然这种类型的队列在平滑短暂的请求突发方面很有用,但它也承认,当命令平均到达速度超过处理速度时,可能会出现无限的工作队列增长。

  1. 有界队列
    有限的队列(例如,{@link ArrayBlockingQueue})在使用有限的maximumpoolsize时有助于防止资源耗尽,但可能更难调优和控制。队列大小和最大池大小可以相互交换:使用大队列和小池可以最小化CPU使用、操作系统资源和上下文切换开销,但是会导致人为的低吞吐量。如果任务经常阻塞(例如,它们受到I/O的限制),系统可能会为更多的线程安排时间。使用小队列通常需要更大的池大小,这会使cpu更忙,但可能会遇到无法接受的调度开销,这也会降低吞吐量。

在方法{@link #execute(Runnable)}中提交的新任务将在执行程序关闭时拒绝,并且在执行程序对最大线程和工作队列容量使用有限的界限并达到饱和时也是如此。无论在哪种情况下,{@code execute}方法都将调用其{@link RejectedExecutionHandler}的{@link RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)}方法。提供了四个预定义的处理程序策略

{@link ThreadPoolExecutor。DiscardPolicy},不能执行的任务可以简单地定义和使用其他类型的{@link RejectedExecutionHandler}类。这样做需要谨慎,特别是当策略设计为仅在特定容量或队列策略下工作时。

钩子方法

这个类提供了{@code protected}可覆盖的{@link #beforeExecute(Thread, Runnable)}和{@link #afterExecute(Runnable, Throwable)}方法,这些方法在执行每个任务之前和之后调用。这些可以用来操作执行环境;例如,重新初始化threadlocal、收集统计信息或添加日志条目。此外,可以覆盖方法{@link # ended}来执行任何需要在执行程序完全终止后执行的特殊处理。

如果hook、回调或BlockingQueue方法抛出异常,则内部工作线程可能会失败、突然终止并可能被替换。

方法{@link #getQueue()}允许访问工作队列,用于监视和调试。强烈反对将此方法用于任何其他目的。提供的两个方法{@link #remove(Runnable)}和{@link #purge}可用于在大量排队任务被取消时帮助回收存储

不再在程序中引用并且没有剩余线程的池可以被回收(垃圾回收),而不需要显式关闭。您可以配置一个池,通过设置适当的keep-alive时间(0个核心线程的下限)和/或设置{@link #allowCoreThreadTimeOut(boolean)}来允许所有未使用的线程最终死亡。

该类的大多数扩展都会覆盖一个或多个受保护的钩子方法。例如,这里有一个子类,添加了一个简单的暂停/恢复功能:

class PausableThreadPoolExecutor extends ThreadPoolExecutor {
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();
 
    public PausableThreadPoolExecutor(...) { super(...); }
 
    protected void beforeExecute(Thread t, Runnable r) {
     super.beforeExecute(t, r);
     pauseLock.lock();
     try {
       while (isPaused) unpaused.await();
     } catch (InterruptedException ie) {
       t.interrupt();
     } finally {
       pauseLock.unlock();
     }
    }
 
    public void pause() {
      pauseLock.lock();
      try {
        isPaused = true;
      } finally {
        pauseLock.unlock();
      }
    }
 
    public void resume() {
      pauseLock.lock();
      try {
        isPaused = false;
        unpaused.signalAll();
      } finally {
        pauseLock.unlock();
      }
    }
  }}
上一篇下一篇

猜你喜欢

热点阅读