JavaJVM · Java虚拟机原理 · JVM上语言·框架· 生态系统

并发专题-一张图理解线程池

2022-06-09  本文已影响0人  pq217

前言

多线程Runnable任务的执行器Executor有很多,今天来看一下最常用的Executor:ThreadPoolExecutor,也就是线程池

ThreadPoolExecutor

ThreadPoolExecutor想必都用过,有的是直接new ThreadPoolExecutor来创建线程池,有的是通过Executors.newFixedThreadPool等工厂方法去创建线程池

阿里BB的规范中不允许使用Executors创建线程池,主要是怕忽略线程池创建的参数而造成资源耗尽的风险,那么我们先看看ThreadPoolExecutor可配参数有哪些

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    ...                       
}

图文理解

线程池

我们把线程池比作公司,线程比作员工,那么corePoolSize 核心线程数相当于一个公司的正式编制员工数,他们接受任务并执行,没任务时候等待新任务(公司聘请正式员工的方式是来一个任务聘用一个新员工)

当公司正式编制员工聘用满后,新来的任务放到workQueue 代办任务队列中,等待被员工处理

当任务变多了,正式员工都在忙,任务队列放不下新任务了,就要考虑聘请临时工帮忙,maximumPoolSize 代表公司可容纳的最多员工数,相当于办公位数,那么最大能聘请的临时工的数量就是maximumPoolSize-corePoolSize ,请多了也没有工位可干活,临时工在公司不忙时候就离开公司(线程销毁),给他一个缓冲时间避免刚走又忙起来,这个保持时间就是keepAliveTime,时间单位是unit

当任务队列都满了,所有员工(包括临时工)都在忙,而员工总数已经到达maximumPoolSize ,再不能新增员工了,这时如果还有任务那只能拒绝了,拒绝工作的处理者就是handler ,可以自行定义

threadFactory相当于员工的来源,比如来自某个大学,那么这个大学对于公司来说就是员工的生产工厂,可以通过配置自定义threadFactory来控制员工的实际生成工作,比如可以统一名称前缀、统一编号规则等

工作机制

回到程序本身,首先线程池初始化是没有任何线程的,执行逻辑如下

step1

当任务到来时,真实线程数少于corePoolSize ,就会创建线程,并执行这个任务,执行结束后线程并不关闭,因为是corePool,继续获取workQueue 里的任务,如果没有就阻塞等待

step2

如果上一步条件不满足(corePool已全部创建完成),会尝试把任务加入队列workQueue 中,那么空闲下来的线程就可以从队列获取并执行任务

step3

如果上一步失败-队列已满,线程数小于maximumPoolSize ,则会尝试新增一个临时线程去执行任务,这些临时线程工作完成后会存活一段时间,直到空闲了keepAliveTime设置的时间后就销毁(实际上可能销毁原线程而保留新增线程,属于一种淘汰机制)

代码

以上的线程创建并不是直接new Thread,而是通过ThreadFactory创建

public interface ThreadFactory {
    // 生成新线程
    Thread newThread(Runnable r);
}

我们可以自定义实现ThreadFactory让我们线程池的拥有一样的名称前缀或编号规则,方便jvm调试时识别,比如

private static ThreadFactory threadFactory = new ThreadFactory() {
    private AtomicInteger no = new AtomicInteger(0);
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "my-thread-"+(no.incrementAndGet()));
    }
};
// 生成的线程名字就是my-thread-1,my-thread-2,my-thread-3...

以上所说三步验证,可以看一下ThreadPoolExecutor.execute源码

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // step1:小于corePoolSize,新增线程:addWorker
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // step2:尝试添加到队列:workQueue.offer
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // step3:尝试新增线程:addWorker
    else if (!addWorker(command, false))
        reject(command);
}

其中addWorker代表是新增线程的方法,其定义如下

private void addWorker(Runnable firstTask, boolean core) 

第二参数boolean型的core,代表是否核心线程,可以看到execute的step1传入的是true,此时添加的是核心线程,step3传的是false,此时添加的是临时线程

总结

ThreadPoolExecutor的核心代码看起来还是挺费劲,主要考虑的线程安全的事太多,后续计划模仿手写一个线程池来更深一层看一下具体实现~

上一篇 下一篇

猜你喜欢

热点阅读