Java 杂谈

线程池的使用

2018-03-06  本文已影响0人  天未的博客

本文对 Java 中的线程池的使用进行学习。是对以下文章的摘录:

欢迎关注个人博客Github

Executor

Executor 接口

Java 类库中任务执行的抽象接口是 Executor。这个接口使得任务提交和任务如何被执行(包括线程使用细节、调度等)得到了解耦。该接口如下所示:

/*
 * @since 1.5
 * @author Doug Lea
 */
public interface Executor {
    /**
     * 在将来某个时刻执行任务( command )。这个任务可以通过这些方式执行:在一个新的线程中执行、
     * 在一个缓存的线程中执行、在调用者线程中执行。具体取决于 Executor 的实现方式。
     *
     * @param command 需要运行的任务
     * @throws RejectedExecutionException 如果执行器不接受任务,则抛出改异常。
     * @throws NullPointerException 如果任务为 null
     */
    void execute(Runnable command);
}

使用 Executor 执行任务

Executor 经常被用来代替显示地创建线程。
自己手工创建线程执行(不建议):

new Thread(new RunnableTask()).start()

使用 Executor 执行:

 Executor executor =  anExecutor;
 executor.execute(new RunnableTask1());
 executor.execute(new RunnableTask2());
 ...

Executor 的执行方案

Executor 接口并不强制异步执行任务。一个简单的例子是,执行器可以在调用线程中立刻运行提交任务。

class DirectExecutor implements Executor {
   public void execute(Runnable r) {
     r.run();
   }
}}

更加典型的方案是,在其它线程而非调用者线程中执行任务。下面的 executor 为每个任务创建了一个执行线程。

class ThreadPerTaskExecutor implements Executor {
   public void execute(Runnable r) {
     new Thread(r).start();
   }
}}

很多 Executor 的实现对任务何时以何种方式执行都会添加一些限制。下面的执行器将提交的任务按顺序在另一个执行器中执行,成为一个组合执行器。

class SerialExecutor implements Executor {
  final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
  final Executor executor;
  Runnable active;

  SerialExecutor(Executor executor) {
    this.executor = executor;
  }

  public synchronized void execute(final Runnable r) {
    tasks.offer(new Runnable() {
      public void run() {
        try {
          r.run();
        } finally {
          // 每个任务执行完后,执行下一个任务
          scheduleNext();
        }
      }
    });
    // 没有任务的时候,启动此任务
    if (active == null) {
      scheduleNext();
    }
  }

  protected synchronized void scheduleNext() {
    // 当下一个任务存在时,执行任务
    if ((active = tasks.poll()) != null) {
      executor.execute(active);
    }
  }
}}

ExecutorService

ExecutorService 接口

Executor 有一个扩展接口 ExecutorServiceExecutorService 能够提供方法来管理终止,也能够创建 Future 来跟踪一个或多个任务的异步执行过程。该接口如下图所示:

ExecutorService 接口

ExecutorService 关闭

ExecutorService 可以被关闭,关闭后它会拒绝新的任务。有两种关闭 ExecutorService 的方法:

一旦 ExecutorService 被关闭(termination),执行器将会没有正在运行的任务、没有正在等待执行的任务、没有新的任务被提交。一个没有使用的 ExecutorService 应该被关闭,从而使得资源可以被回收,

任务执行

submit 方法扩展于 Executor 中的基本方法 execute(Runnable),该方法创建和返回了一个 Future 对象,通过这个对象可以取消任务的执行或者等待任务的结束。

invokeAnyinvokeAll 方法可以用来执行非常通用有效的批量执行。执行一批任务,等待它们中的一个或者全部执行完成。ExecutorCompletionService 可以用来实现这些方法的变体。

使用例子

下面是一个网络服务的例子。该例子通过线程池中的线程来服务到达的请求,使用到了Executors中的 newFixedThreadPool 方法。

class NetworkService implements Runnable {
   private final ServerSocket serverSocket;
   private final ExecutorService pool;

   public NetworkService(int port, int poolSize)
       throws IOException {
     serverSocket = new ServerSocket(port);
     pool = Executors.newFixedThreadPool(poolSize);
   }
 
   public void run() { // run the service
     try {
       for (;;) {
         pool.execute(new Handler(serverSocket.accept()));
       }
     } catch (IOException ex) {
       pool.shutdown();
     }
   }
 }

 class Handler implements Runnable {
   private final Socket socket;
   Handler(Socket socket) { this.socket = socket; }
   public void run() {
     // read and service request on socket
   }
 }}

下面的代码展示了通过两个阶段关闭 ExecutorService:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }}

ThreadPoolExecutor

ThreadPoolExecutor 类提供了一个可扩展的线程池实现。

基本介绍

线程池用来解决两个不同方面的问题:

  1. 由于减少了每个任务的调用开销,通常可以在执行大量异步任务的时候提高性能。
  2. 提供了资源控制和管理的。

每个 ThreadPoolExecutor 具有一些基本的统计,例如:任务执行完成的数量。

为了适应广泛的应用场景,该类提高了很多可以调整的参数以及可以扩展的钩子。此外,更简单的一种方式是使用 Executors 中的一些工厂方法,包括:

这些覆盖了大部分通用的场景。

构造函数

当需要手工配置这个类或者对其参数进行调整时,就需要了解其构造函数。ThreadPoolExecutor 有很多构造函数,下面是最常见的形式。

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) { ... }

corePoolSize and maximumPoolSize

ThreadPoolExecutor 将自动通过基本大小(corePoolSize)、最大大小(maximumPoolSize)来调整线程池的大小。

当有新的任务通过 execute(Runnable) 方法提交时:

设置示例:

通常,这些参数在构造函数中被设置,但是它们也可以通过这些方法动态改变:

keepAliveTime

如果线程池中当前执行的线程数目大于 corePoolSize,那么对于多出的线程,当它们空闲的时间超过 keepAliveTime 时,它们将被终止。

这样的机制,使得当线程池不活跃的时候,可以减少资源的消耗。当线程池变得活跃的时候,新的线程会被创建。这个参数也通过 setKeepAliveTime(long TimeUnit) 方法动态改变。

通过设置这个参数为 Long.MAX_VALUE TimeUnit.NANOSECONDS,可以禁止空闲的线程被终止。

默认情况下,这个 keep-alive 策略只会在当前线程数目超过 corePoolSize 的时候才会起作用,也可以通过 allowCoreThreadTimeOut(boolean) 来控制,此时 keepAliveTime 的值不能为0.

Queuing

BlockingQueue 可以用来转移和持有提交的任务。它的使用时和线程池的大小相关的:

以下是一些通用的队列策略:

ThreadFactory

ThreadFactory 是用来创建新的线程。下面是该接口的声明。

/**
 * @since 1.5
 * @author Doug Lea
 */
public interface ThreadFactory {
    /**
     * 构建线程 Thread。实现中可以设置优先权、名字、守护状态、线程组等。
     *
     * @param r 一个可以被线程实例运行的任务
     * @return 创建的线程或者 null (创建被拒绝)
     */
    Thread newThread(Runnable r);
}

该接口一个简单的实现为:

 class SimpleThreadFactory implements ThreadFactory {
   public Thread newThread(Runnable r) {
     return new Thread(r);
   }
 }}

Executors 中的 defaultThreadFactory 方法提供了一个更加简单实用的实现,可以设置线程环境为已知值。

    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

Rejected tasks

在以下两种情况下,新的任务将会被拒绝:

任何一种情况下,都会调用 RejectedExecutionHandler 中的 rejectedExecution 方法。

预先定义的一些拒绝策略包括:

扩展例子

很多基于该类的扩展都是覆盖一个或多个受保护的函数。下面例子就展示了一个子类,该之类添加了简单的暂停和恢复特性:

class PausableThreadPoolExecutor extends ThreadPoolExecutor {
   private boolean isPaused;
   private ReentrantLock pauseLock = new ReentrantLock();
   private Condition unpaused = pauseLock.newCondition();

   public PausableThreadPoolExecutor(...) { super(...); }

   protected void beforeExecute(Thread t, Runnable r) {
     super.beforeExecute(t, r);
     pauseLock.lock();
     try {
       while (isPaused) unpaused.await();
     } catch (InterruptedException ie) {
       t.interrupt();
     } finally {
       pauseLock.unlock();
     }
   }

   public void pause() {
     pauseLock.lock();
     try {
       isPaused = true;
     } finally {
       pauseLock.unlock();
     }
   }

   public void resume() {
     pauseLock.lock();
     try {
       isPaused = false;
       unpaused.signalAll();
     } finally {
       pauseLock.unlock();
     }
   }
 }}

Executors

Executors 类提供了方便的工厂方法来创建这些执行器。

newFixedThreadPool

该方法创建的线程池可以重复使用固定数目的线程,使用无限制的队列。

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

newSingleThreadExecutor

单线程执行,使用无限制的队列。

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

newCachedThreadPool

使用无限制的线程池,空线程超过60秒被回收,线程执行采用直接交付策略。

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

newScheduledThreadPool

采用 ScheduledThreadPoolExecutor

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

newWorkStealingPool

采用 ForkJoinPool , 开始于 JDK 1.8。

    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

阿里编程规范

使用说明

线程池不允许实用 Executor 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式可以更加明确线程次的运行规则,规避资源耗尽的风险。

Executor 各个方法的弊端说明:

使用示例

例子1:

// org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
    new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());

例子2:

// com.google.common.util.concurrent.ThreadFactoryBuilder
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
    .setNameFormat("demo-pool-%d").build();
    
// Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5,200,
    0L,TimeUnit.MILLISECONDS,
    new LinkedBlockingDeque<Runnable>(1024),namedThreadFactory,
    new ThreadPoolExecutor.AbortPolicy());

pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown(); // gracefully shutdown

例子3:

<bean id="userThreadPool"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="10" />
        <property name="maxPoolSize" value="100" />
        <property name="queueCapacity" value="2000" />

        <property name="thradFactory"  value= thradFactory />
        <property name="rejectedExecutionHandler">
            <ref local="rejectedExecutionHandler">
        </property>
</bean>
上一篇 下一篇

猜你喜欢

热点阅读