Log

【面试必备】我跟面试官聊了一个小时线程池!

2022-09-03  本文已影响0人  java的小粉丝

大家好,这篇文章主要跟大家聊下 Java 线程池面试中可能会问到的一些问题。

全程干货,耐心看完,你能轻松应对各种线程池面试。

相信各位 Javaer 在面试中或多或少肯定被问到过线程池相关问题吧,线程池是一个相对比较复杂的体系,基于此可以问出各种各样、五花八门的问题。

若你很熟悉线程池,如果可以,完全可以滔滔不绝跟面试官扯一个小时线程池,一般面试也就一个小时左右,那么这样留给面试官问其他问题的时间就很少了,或者其他问题可能问的也就不深入了,那你通过面试的几率是不就更大点了呢。

image.png

下面我们开始列下线程池面试可能会被问到的问题以及该怎么回答,以下只是参考答案,你也可以加入自己的理解。

1. 面试官:日常工作中有用到线程池吗?什么是线程池?为什么要使用线程池?

一般面试官考察你线程池相关知识前,大概率会先问这个问题,如果你说没用过,不了解,ok,那就没以下问题啥事了,估计你的面试结果肯定也凶多吉少了。

作为 JUC 包下的门面担当,线程池是名副其实的 JUC 一哥,不了解线程池,那说明你对 JUC 包其他工具也了解的不咋样吧,对 JUC 没深入研究过,那就是没掌握到 Java 的精髓,给面试官这样一个印象,那结果可想而知了。

所以说,这一分一定要吃下,那我们应该怎么回答好这问题呢?

可以这样说:

计算机发展到现在,摩尔定律在现有工艺水平下已经遇到难易突破的物理瓶颈,通过多核 CPU 并行计算来提升服务器的性能已经成为主流,随之出现了多线程技术。

线程作为操作系统宝贵的资源,对它的使用需要进行控制管理,线程池就是采用池化思想(类似连接池、常量池、对象池等)管理线程的工具。

JUC 给我们提供了 ThreadPoolExecutor 体系类来帮助我们更方便的管理线程、并行执行任务。

下图是 Java 线程池继承体系:

image.png

使用线程池可以带来以下好处:

降低资源消耗。降低频繁创建、销毁线程带来的额外开销,复用已创建线程

降低使用复杂度。将任务的提交和执行进行解耦,我们只需要创建一个线程池,然后往里面提交任务就行,具体执行流程由线程池自己管理,降低使用复杂度

提高线程可管理性。能安全有效的管理线程资源,避免不加限制无限申请造成资源耗尽风险

提高响应速度。任务到达后,直接复用已创建好的线程执行

线程池的使用场景简单来说可以有:

快速响应用户请求,响应速度优先。比如一个用户请求,需要通过 RPC 调用好几个服务去获取数据然后聚合返回,此场景就可以用线程池并行调用,响应时间取决于响应最慢的那个 RPC 接口的耗时;又或者一个注册请求,注册完之后要发送短信、邮件通知,为了快速返回给用户,可以将该通知操作丢到线程池里异步去执行,然后直接返回客户端成功,提高用户体验。

单位时间处理更多请求,吞吐量优先。比如接受 MQ 消息,然后去调用第三方接口查询数据,此场景并不追求快速响应,主要利用有限的资源在单位时间内尽可能多的处理任务,可以利用队列进行任务的缓冲

image.png

2. 面试官:ThreadPoolExecutor 都有哪些核心参数?

其实一般面试官问你这个问题并不是简单听你说那几个参数,而是想要你描述下线程池执行流程。

青铜回答:

包含核心线程数(corePoolSize)、最大线程数(maximumPoolSize),空闲线程超时时间(keepAliveTime)、时间单位(unit)、阻塞队列(workQueue)、拒绝策略(handler)、线程工厂(ThreadFactory)这7个参数。

钻石回答:

回答完包含这几个参数之后,会再主动描述下线程池的执行流程,也就是 execute() 方法执行流程。

execute()方法执行逻辑如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

可以总结出如下主要执行流程,当然看上述代码会有一些异常分支判断,可以自己顺理加到下述执行主流程里

判断线程池的状态,如果不是RUNNING状态,直接执行拒绝策略

如果当前线程数 < 核心线程池,则新建一个线程来处理提交的任务

如果当前线程数 > 核心线程数且任务队列没满,则将任务放入阻塞队列等待执行

如果 核心线程池 < 当前线程池数 < 最大线程数,且任务队列已满,则创建新的线程执行提交的任务

如果当前线程数 > 最大线程数,且队列已满,则执行拒绝策略拒绝该任务

王者回答:

在回答完包含哪些参数及 execute 方法的执行流程后。然后可以说下这个执行流程是 JUC 标准线程池提供的执行流程,主要用在 CPU 密集型场景下。

像 Tomcat、Dubbo 这类框架,他们内部的线程池主要用来处理网络 IO 任务的,所以他们都对 JUC 线程池的执行流程进行了调整来支持 IO 密集型场景使用。

他们提供了阻塞队列 TaskQueue,该队列继承 LinkedBlockingQueue,重写了 offer() 方法来实现执行流程的调整。

 @Override
    public boolean offer(Runnable o) {
        //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }

可以看到他在入队之前做了几个判断,这里的 parent 就是所属的线程池对象

1.如果 parent 为 null,直接调用父类 offer 方法入队

2.如果当前线程数等于最大线程数,则直接调用父类 offer()方法入队

3.如果当前未执行的任务数量小于等于当前线程数,仔细思考下,是不是说明有空闲的线程呢,那么直接调用父类 offer() 入队后就马上有线程去执行它

4.如果当前线程数小于最大线程数量,则直接返回 false,然后回到 JUC 线程池的执行流程回想下,是不是就去添加新线程去执行任务了呢

5.其他情况都直接入队

具体可以看之前写过的这篇文章

动态线程池(DynamicTp),动态调整Tomcat、Jetty、Undertow线程池参数篇

可以看出当当前线程数大于核心线程数时,JUC 原生线程池首先是把任务放到队列里等待执行,而不是先创建线程执行。

如果 Tomcat 接收的请求数量大于核心线程数,请求就会被放到队列中,等待核心线程处理,这样会降低请求的总体响应速度。

所以 Tomcat并没有使用 JUC 原生线程池,利用 TaskQueue 的 offer() 方法巧妙的修改了 JUC 线程池的执行流程,改写后 Tomcat 线程池执行流程如下:

  1. 判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务
  1. 如果当前当前线程池数大于核心线程池,小于最大线程数,则创建新的线程执行提交的任务
  1. 如果当前线程数等于最大线程数,则将任务放入任务队列等待执行
  1. 如果队列已满,则执行拒绝策略

然后还可以再说下线程池的 Worker 线程模型,继承 AQS 实现了锁机制。线程启动后执行 runWorker() 方法,runWorker() 方法中调用 getTask() 方法从阻塞队列中获取任务,获取到任务后先执行 beforeExecute() 钩子函数,再执行任务,然后再执行 afterExecute() 钩子函数。若超时获取不到任务会调用 processWorkerExit() 方法执行 Worker 线程的清理工作。

3. 面试官:什么是阻塞队列?说说常用的阻塞队列有哪些?

阻塞队列 BlockingQueue 继承 Queue,是我们熟悉的基本数据结构队列的一种特殊类型。

当从阻塞队列中获取数据时,如果队列为空,则等待直到队列有元素存入。当向阻塞队列中存入元素时,如果队列已满,则等待直到队列中有元素被移除。提供 offer()、put()、take()、poll() 等常用方法。

JDK 提供的阻塞队列的实现有以下几种:

1)ArrayBlockingQueue:由数组实现的有界阻塞队列,该队列按照 FIFO 对元素进行排序。维护两个整形变量,标识队列头尾在数组中的位置,在生产者放入和消费者获取数据共用一个锁对象,意味着两者无法真正的并行运行,性能较低。

2)LinkedBlockingQueue:由链表组成的有界阻塞队列,如果不指定大小,默认使用 Integer.MAX_VALUE 作为队列大小,该队列按照 FIFO 对元素进行排序,对生产者和消费者分别维护了独立的锁来控制数据同步,意味着该队列有着更高的并发性能。

3)SynchronousQueue:不存储元素的阻塞队列,无容量,可以设置公平或非公平模式,插入操作必须等待获取操作移除元素,反之亦然。

4)PriorityBlockingQueue:支持优先级排序的无界阻塞队列,默认情况下根据自然序排序,也可以指定 Comparator。

5)DelayQueue:支持延时获取元素的无界阻塞队列,创建元素时可以指定多久之后才能从队列中获取元素,常用于缓存系统或定时任务调度系统。

6)LinkedTransferQueue:一个由链表结构组成的无界阻塞队列,与LinkedBlockingQueue相比多了transfer和tryTranfer方法,该方法在有消费者等待接收元素时会立即将元素传递给消费者。

7)LinkedBlockingDeque:一个由链表结构组成的双端阻塞队列,可以从队列的两端插入和删除元素。

4. 面试官:你刚说到了 Worker 继承 AQS 实现了锁机制,那 ThreadPoolExecutor 都用到了哪些锁?为什么要用锁?

1)mainLock 锁

ThreadPoolExecutor 内部维护了 ReentrantLock 类型锁 mainLock,在访问 workers 成员变量以及进行相关数据统计记账(比如访问 largestPoolSize、completedTaskCount)时需要获取该重入锁。

面试官:为什么要有 mainLock?

    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /**
     * Tracks largest attained pool size. Accessed only under
     * mainLock.
     */
    private int largestPoolSize;

    /**
     * Counter for completed tasks. Updated only on termination of
     * worker threads. Accessed only under mainLock.
     */
    private long completedTaskCount;

可以看到 workers 变量用的 HashSet 是线程不安全的,是不能用于多线程环境的。largestPoolSize、completedTaskCount 也是没用 volatile 修饰,所以需要在锁的保护下进行访问。

面试官:为什么不直接用个线程安全容器呢?
其实 Doug 老爷子在 mainLock 变量的注释上解释了,意思就是说事实证明,相比于线程安全容器,此处更适合用 lock,主要原因之一就是串行化 interruptIdleWorkers() 方法,避免了不必要的中断风暴

面试官:怎么理解这个中断风暴呢?
其实简单理解就是如果不加锁,interruptIdleWorkers() 方法在多线程访问下就会发生这种情况。一个线程调用interruptIdleWorkers() 方法对 Worker 进行中断,此时该 Worker 出于中断中状态,此时又来一个线程去中断正在中断中的 Worker 线程,这就是所谓的中断风暴。

面试官:那 largestPoolSize、completedTaskCount 变量加个 volatile 关键字修饰是不是就可以不用 mainLock 了?
这个其实 Doug 老爷子也考虑到了,其他一些内部变量能用 volatile 的都加了 volatile 修饰了,这两个没加主要就是为了保证这两个参数的准确性,在获取这两个值时,能保证获取到的一定是修改方法执行完成后的值。如果不加锁,可能在修改方法还没执行完成时,此时来获取该值,获取到的就是修改前的值。

2)Worker 线程锁
刚也说了 Worker 线程继承 AQS,实现了 Runnable 接口,内部持有一个 Thread 变量,一个 firstTask,及 completedTasks 三个成员变量。
基于 AQS 的 acquire()、tryAcquire() 实现了 lock()、tryLock() 方法,类上也有注释,该锁主要是用来维护运行中线程的中断状态。在 runWorker() 方法中以及刚说的 interruptIdleWorkers() 方法中用到了。
面试官:这个维护运行中线程的中断状态怎么理解呢?

  protected boolean tryAcquire(int unused) {
      if (compareAndSetState(0, 1)) {
          setExclusiveOwnerThread(Thread.currentThread());
          return true;
      }
      return false;
  }
  public void lock()        { acquire(1); }
  public boolean tryLock()  { return tryAcquire(1); }

在runWorker() 方法中获取到任务开始执行前,需要先调用 w.lock() 方法,lock() 方法会调用 tryAcquire() 方法,tryAcquire() 实现了一把非重入锁,通过 CAS 实现加锁。

image.png

interruptIdleWorkers() 方法会中断那些等待获取任务的线程,会调用 w.tryLock() 方法来加锁,如果一个线程已经在执行任务中,那么 tryLock() 就获取锁失败,就保证了不能中断运行中的线程了。


image.png

所以 Worker 继承 AQS 主要就是为了实现了一把非重入锁,维护线程的中断状态,保证不能中断运行中的线程。

image.png

5. 面试官:你在项目中是怎样使用线程池的?Executors 了解吗?

这里面试官主要想知道你日常工作中使用线程池的姿势,现在大多数公司都在遵循阿里巴巴 Java 开发规范,该规范里明确说明不允许使用 Executors 创建线程池,而是通过 ThreadPoolExecutor 显示指定参数去创建

你可以这样说,知道 Executors 工具类,很久之前有用过,也踩过坑,Executors 创建的线程池有发生 OOM 的风险。

Executors.newFixedThreadPool 和 Executors.SingleThreadPool 创建的线程池内部使用的是无界(Integer.MAX_VALUE)的 LinkedBlockingQueue 队列,可能会堆积大量请求,导致 OOM

Executors.newCachedThreadPool 和Executors.scheduledThreadPool 创建的线程池最大线程数是用的Integer.MAX_VALUE,可能会创建大量线程,导致 OOM

自己在日常工作中也有封装类似的工具类,但是都是内存安全的,参数需要自己指定适当的值,也有基于 LinkedBlockingQueue 实现了内存安全阻塞队列 MemorySafeLinkedBlockingQueue,当系统内存达到设置的剩余阈值时,就不在往队列里添加任务了,避免发生 OOM

我们一般都是在 Spring 环境中使用线程池的,直接使用 JUC 原生 ThreadPoolExecutor 有个问题,Spring 容器关闭的时候可能任务队列里的任务还没处理完,有丢失任务的风险。

我们知道 Spring 中的 Bean 是有生命周期的,如果 Bean 实现了 Spring 相应的生命周期接口(InitializingBean、DisposableBean接口),在 Bean 初始化、容器关闭的时候会调用相应的方法来做相应处理。

所以最好不要直接使用 ThreadPoolExecutor 在 Spring 环境中,可以使用 Spring 提供的 ThreadPoolTaskExecutor,或者 DynamicTp 框架提供的 DtpExecutor 线程池实现。

也会按业务类型进行线程池隔离,各任务执行互不影响,避免共享一个线程池,任务执行参差不齐,相互影响,高耗时任务会占满线程池资源,导致低耗时任务没机会执行;同时如果任务之间存在父子关系,可能会导致死锁的发生,进而引发 OOM。

  1. 如果使用的 Future 方式,则可通过 Future 对象的 get 方法接收抛出的异常
  1. 为工作线程设置 setUncaughtExceptionHandler,在 uncaughtException 方法中处理异常
  1. 可以重写 afterExecute(Runnable r, Throwable t) 方法,拿到异常 t

3)共享线程池问题。整个服务共享一个全局线程池,导致任务相互影响,耗时长的任务占满资源,短耗时任务得不到执行。同时父子线程间会导致死锁的发生,今儿导致 OOM

4)跟 ThreadLocal 配合使用,导致脏数据问题。我们知道 Tomcat 利用线程池来处理收到的请求,会复用线程,如果我们代码中用到了 ThreadLocal,在请求处理完后没有去 remove,那每个请求就有可能获取到之前请求遗留的脏值。

5)ThreadLocal 在线程池场景下会失效,可以考虑用阿里开源的 Ttl 来解决

以上提到的线程池动态调参、通知告警在开源动态线程池项目 DynamicTp 中已经实现了,可以直接引入到自己项目中使用。

关于 DynamicTp

DynamicTp 是一个基于配置中心实现的轻量级动态线程池管理工具,主要功能可以总结为动态调参、通知报警、运行监控、三方包线程池管理等几大类。

image.png

经过多个版本迭代,目前最新版本 v1.0.8 具有以下特性

特性

上一篇下一篇

猜你喜欢

热点阅读