java程序员Java学习笔记

Java 线程池剖析

2017-02-09  本文已影响219人  data4

概念

线程 & 线程池

提到线程,不得不提进程。进程是操作系统的最小资源单位,一个进程能使用多少虚拟内存,能打开多少文件标识符。线程是操作系统的最小调度单元,每个进程默认有一个主线程,它用来执行真正的任务。

多个任务并发执行,提高 CPU 利用率,响应速度,执行效率。例如:主线程负责接收请求,工作线程负责处理请求。

当你需要使用多个线程来执行相同类型任务时,你不希望自己来管理线程的创建和销毁,维护任务队列,分配任务,管理定时任务。ok,交给线程池吧,你只需要向线程池提交任务就好了。

Java 线程池体系

Java 库提供了一套完整的线程池解决方案,支持多种定制化配置,满足你的需求。java.util.concurrent 库与线程池相关的接口,类的继承关系如下所示:


Executors

接口

Executor

支持提交任务

voidexecute(Runnablecommand) #提交 Runnable 任务

ExecutorService

继承 Executor,支持提交异步任务和关闭线程池。

<T>Future<T>submit(Runnabletask, Tresult)
Future<?>submit(Runnabletask)
void shutdown()

ScheduledExecutorService

继承 ExecutorService, 增加支持定时任务和延迟任务

ScheduledFuture<?> schedule(
  Runnable command, 
  long delay, 
  TimeUnit unit)

<V> ScheduledFuture<V> schedule(
  Callable<V> callable, 
  long delay, 
  TimeUnit unit)

ScheduledFuture<?> scheduleAtFixedRate(
  Runnable command, 
  long initialDelay, 
  long period, 
  TimeUnit unit)

ScheduledFuture<?> scheduleWithFixedDelay(
  Runnable command, 
  long initialDelay, 
  long delay, 
  TimeUnit unit)

实现类

ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, 
  int maximumPoolSize, 
  long keepAliveTime, 
  TimeUnit unit, 
  BlockingQueue<Runnable> workQueue, 
  ThreadFactory threadFactory, 
  RejectedExecutionHandler handler)

实现 ExecutorService 接口功能外,提供了以下几个配置项:

corePoolSize 线程池保留线程个数,maximumPoolSize 线程池最大线程个数。
默认情况下,当提交任务时,才会创建线程。当提交任务时,如果当前 running 线程数小于 corePoolSize 的话,则创建一个线程;如果当前 running 线程数大于 corePoolSize 小于 maximumPoolSize 的话,当且仅当任务队列满的情况下,才会创建一个线程;

默认情况下,使用 Executors.defaultThreadFactory 来创建线程,它们同属于一个线程组,有相同的优先级,non-daemon 状态。

当线程个数超过 corePoolSize 之后,如果有线程空闲时间超过 keepAliveTime 的话,线程池就会释放该线程。

用来保存未提交和已提交的任务。任务队列的使用情况与线程池内部线程个数有关,如果线程个数小于 corePoolSize 的话,则不会缓存任务,而是创建线程执行任务;如果线程个数大于 corePoolSize 的话,则先缓存任务;如果队列满了,如果线程个数小于 maximumPoolSize 的话,则创建新线程;否则拒绝该任务。
目前提供 3 种队列:(1)Direct handoffs (2)Unbounded queues 默认(3) Bounded queues。详细参考:ThreadPoolExector

ThreadPoolExecutor.AbortPolicy 抛出异常 RejectedExecutionException(默认)
ThreadPoolExecutor.DiscardPolicy 直接丢掉
ThreadPoolExecutor.DiscardOldestPolicy 丢掉队列中最老的任务

ScheduledThreadPoolExecutor

继承 ThreadPoolExecutor,并实现 ScheduledExecutorService 接口行为。

工具接口和类

ThreadFactory

接口,定义创建线程

Thread newThread(Runnable r)

Executors

用来创建 ExecutorService, ScheduledExecutorService, ThreadFactory, Callable。

public static <T> Callable<T> 
  callable(
    Runnable task, 
    T result)

public static ThreadFactory 
  defaultThreadFactory()

public static ExecutorService 
  newCachedThreadPool(
    ThreadFactory threadFactory)

public static ExecutorService 
  newFixedThreadPool(
    int nThreads, 
    ThreadFactory threadFactory)

最佳实践

Executors.newFixedThreadPool(5);
Executors.newScheduledThreadPool(3);
new ThreadFactory() {
    public Thread newThread(Runnable r) {
        Thread t = Executors
          .defaultThreadFactory()
          .newThread(r);
        t.setName("myname");
        t.setDaemon(true);
        return t;
    }
}

遗憾的是,Executors 并没有提供支持定制化队列的接口,所以只能使用 ThreadPoolExecutor。

BlockingQueue<Runnable> queue = 
  new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(
  n, 
  n, 
  0L, 
  TimeUnit.MILLISECONDS, 
  queue);

shutdown 不接受新的任务,保证已经接受的任务提交执行,但是并不保证任务执行结束
awaitTermination 保证已经执行的任务完整执行结束。
shutdownNow 关闭正在执行的任务,但不保证正在执行的任务正常关闭。未执行的任务直接返回。

cxnResetExecutor.shutdown();
try {
  if (cxnResetExecutor.awaitTermination(
    5, TimeUnit.SECONDS)) {
      cxnResetExecutor.shutdownNow();
  }
} catch (Exception ex) {
  logger.error("Interrupted while waiting for connection reset executor " +
    "to shut down");
}
public long getCompletedTaskCount() #获取已经完成的 task 总个数
public int getActiveCount() #获取正在活的线程个数
public int getLargestPoolSize() #获取线程池曾经最大的线程数
public BlockingQueue<Runnable> getQueue()   #获取任务队列中任务个数

参考

Executors
Executor
ThreadPoolExecutor
ExecutorService-10个要诀和技巧
flume 源码
Java线程池分析

欢迎大家访问本人网站 程序员工具箱 致力于贡献便捷的工具,帮助程序员写出更优秀的代码,同时也欢迎大家一起来贡献。

上一篇下一篇

猜你喜欢

热点阅读