Java后端必备Java 杂谈

ThreadPoolExecutor看懂、学会

2018-10-02  本文已影响2人  关捷

我们知道,线程是任务执行的机制,为了让更多的任务更快的执行,通常会选择多线程技术,而线程池就是简化多线程开发的最佳方案。

使用线程池的好处,总结下主要有一下几点:

  1. 集中对线程资源进行管理,复用线程资源,使能够合理的规划服务器资源;
  2. 解耦任务提交过程和执行过程,开发人员能更加专注于任务本身,而不必过多的关注线程的执行;
  3. 平衡系统负载、吞吐量和响应时间,保护系统在任务过多时、性能平缓降低。

线程池实现

每个线程都有线程空间也就是线程栈,保存栈帧(包含:操作数栈、局部变量表等),也就是方法调用链。随着方法的调用完成,线程所占用的资源也将被系统回收。

线程要怎样才不被回收?或者说,控制它何时被回收?

思路就是一直循环,只要不跳出循环,线程就不会被回收。可以通过控制何时线程跳出循环,来达到销毁线程的目的。

ThreadPoolExecutor的实现
以下我修剪后的代码,可以看出,用的是阻塞队列的超时和挂起来控制线程的生命周期。

//这个是运行任务的具体方法
final void runWorker(Worker w) {
       Runnable task = w.firstTask;
       while (task != null || (task = getTask()) != null) //当getTask返回null时,循环跳出,线程回收
            task.run();       
 }
 //获取任务
 private Runnable getTask() {
         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //判断线程是否可以超时回收
         Runnable r = timed ?
              workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //通过keepAliveTime来控制线程空闲死亡时间
              workQueue.take();//如果线程不允许回收,那么就会一直挂起
          if (r != null)
              return r;
}

线程数控制

线程池的目的就是避免线程数的失控以及复用线程。主要有三个参数来控制:corePoolSizemaximumPoolSizeworkQueue

  1. 当前线程总数小于corePoolSize时,不管有没有线程闲置,都会为新的任务创建新的线程,不会进行线程的复用;
  2. 当线程数等于corePoolSize,新到来的任务会进入队列中等待,等待的任务会被将来空闲的线程执行;
  3. 如果线程数等于corePoolSize且队列已经满了,会new新线程,直到线程数量等于maximumPoolSize;
  4. 当上面条件都验证后,那么在到来的任务就会进入拒绝服务,是直接放弃,还是抛出异常等各种策略可以定制。

下面代码我进行了注释:

public void execute(Runnable command) {
        int c = ctl.get(); //ctl是atomicInteger类型,通过不同的计算方式得到线程池状态和线程数量
        if (workerCountOf(c) < corePoolSize) { //线程总数小于corePoolSize
            if (addWorker(command, true)) //为新的任务创建新的线程
                return;
        }
        if (isRunning(c) && workQueue.offer(command)) { //不能新建线程,那么就入队列(offer会立刻返回,失败为false)
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) //创建超过corePoolSize数量的线程
            reject(command);
    }

Executors线程池工厂

通过Executors提供的静态工厂方法,我们可以简单的使用线程池。

方法 介绍
newFixedThreadPool(int nThreads) 固定线程的线程池,任务会堆积在队列等待执行
newSingleThreadExecutor() 一个线程的线程池,任务在队列中等待串行执行
newCachedThreadPool() 最大线程数为Integer.MAX_VALUE的线程池,可以认为没有队列缓冲,没有空闲线程就创建新线程

以上线程池直接使用有没有问题?从上面介绍来看,好像问题很大,不管是任务过多的堆积,或是过多的创建线程。

1. newCachedThreadPool线程池
使用的是SynchronousQueue队列,offer方法直接把任务交个等待的线程,如果没有空闲线程挂起在队列上,那么直接返回false,从这里可以看出,当任务量急剧增大时,这可能导致线程无限增长,而线程创建是需要内存保存线程上下文信息的,可能会直接导致服务器内存不足,造成内存泄漏,同时线程对cup激烈的竞争,会导致服务器性能急剧下降。
2. newFixedThreadPool和newSingleThreadExecutor线程池
线程最大数量固定,通过LinkedBlockingQueue(容量很大的阻塞队列)来缓冲不能处理的任务,能够很好的保证合适数量的线程处于忙碌状态,但是同样会使得任务过多的堆积,导致响应能力下降。

从上分析来看,貌似并不能直接的使用工厂提供的线程池。如果自己去实例化ThreadPoolExecutor会不会更好呢?那么就可以根据使用场景,更精细的控制线程数量和队列长度,从而达到性能更优。

个性化定制

通过有界队列、线程数量和空闲死亡时间来控制线程池,能够构造具有很好伸缩性的线程池,但是在系统资源使用和响应能力之间进行平衡,进行调优,却是件非常困难的事情。

  1. 大容量队列,小数量线程,虽然可以减少cup竞争、上下文切换,但过少的线程,可能会造成吞吐量的下降;
  2. 小容量队列,大数量线程,会使得cup竞争激烈,上线文频繁切换,也会使吞吐量下降。

参数的确定非常的困难,需要根据具体的场景,参考生产速度、任务处理速度、任务能够容忍的延迟执行时间,进行合理的调整。

怎么关闭线程池

JVM只要有非后台线程运行,那么就不会关闭。所以停机时需要关闭每个线程,如何实现呢?。
ThreadPoolExecutor提供了2个方法:

方法 介绍
shutdown() 平缓的关闭线程池
shutdownNow() 暴力关闭线程池

何为平缓,何为暴力?分析前我们列举下线程池状态,以及代表的意思(括号中的数字就可以认为就是状态的常量值):

状态 介绍
RUNNING(-1) 运行状态:接受新的任务或者运行队列中等待的任务
SHUTDOWN(0) 关闭状态:不接受新的任务但运行队列中等待的任务
STOP(1) 暴力关闭状态:不接受新任务且不执行队列中任,设置所有线程中断状态
TIDYING(2) 当所有的任务都完成,所有的线程都消亡状态
TERMINATED(3) 线程池完全关闭状态

先看看shutdown()的源码:

   public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN); //设置线程池状态
            interruptIdleWorkers(); //中断所有线程
            onShutdown();
        } finally {
            mainLock.unlock();
        }
        tryTerminate(); //尝试终止线程池
    }

可以看到,shutdown方法主要是通过中断线程来关闭线程池的。

什么时候线程会响应中断?就是挂起在队列上的take或者poll操作,当队列中有任务时,会继续执行,只有队列中没有任务时,线程挂起响应中断,跳出循环,也就代表线程死亡。

可以总结,shutdown方法关闭的流程是:拒绝新任务加入–>执行完成所有剩余的任务–>关闭线程池

现在来看看 shutdownNow()的源码:

   public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP); //线程池状态为stop
            interruptWorkers();
            tasks = drainQueue(); //排干所有队列中等待的任务
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

对比shutdown,可以看到除了状态变化为stop外,还多一个步骤,排干所有队列中的任务,让所有线程取不到任务,响应中断,从而快速地关闭线程池。

最后,如果需要异步执行任务,仅仅使用线程池是不够保险的,任务的状态,任务的执行情况,无法达到可视化,系统就很容易失控。
我最近在写一个带有可视化页面的分布式的异步任务执行组件,有兴趣的可以留言。]

上一篇下一篇

猜你喜欢

热点阅读