线程池你真的懂了吗

2021-07-21  本文已影响0人  QI的咖啡

阅读任何源码,我们都应该带着几个问题去阅读,从源码中找出这些问题的答案,这样才能彻底搞明白某个知识点。

下面我们就带着这样几个问题,一起看一下ThreadPoolExecutor的源码

  1. 为什么要用线程池
  2. 为什么不推荐使用juc直接创建的线程池
  3. 线程池的几个核心参数
  4. 线程池是什么时候创建线程的?
  5. 线程池是如何重复利用线程的?
  6. 任务提交的顺序和执行的顺序是一样的吗?

1、为什么要使用线程池

这个其实可以写一个简单的程序去跑一下,比如使用线程池去跑1000个task和开1000个线程去跑这1000个task,线程池的效率会高出很多倍,原因是线程池能够重复利用线程,没有创建和销毁线程的开销。
其实池化的技术在很多地方都会用到比如数据库的连接池,字符串常量池,netty的对象池等等

2、为什么不推荐使用juc直接创建线程池的方式

我们找两个Exectors创建线程池的源码

a、newCachedTreadPool的源码

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

可以看到这里的最大线程数是0xffff个,这个最大线程数在大并发提交任务的情况下会创建大量线程,会导致CPU100%

b、newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

//看一下LinkedBlockingQueue的实现
 public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

会初始化一个容量为0xffff的队列,由于这个队列太大,如果我们提交的任务数很多并且自定义的线程里的对象又很大的话,就很容易发生oom的问题。

从这个工具类创建线程的参数我们可以看到底层调用的都是ThreadPoolExecutor的构造方法,所以我们建议根据具体业务的规模设置合适的线程池参数。

new ThreadPoolExecutor()

3、线程池的几个核心参数

ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) 

corePoolSize:最大线程数
maximumPoolSize:最大线程数
keepAiveTime:线程存活时间
TimeUnit:存活时间的参数
BlockingQueue:线程池的任务队列

task投递到线程池中的整个过程如下


image.png

看一下具体的代码

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();

        //这里判断线程数量是否小于corePoolSize,如过小于corePoolSize则直接创建worker线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //到这个if说明worker线程数量大于了corePoolSize了,这里直接添加到任务队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //到这个if说明任务加入队列失败,队列满了,则再创建线程worker线程,如果创建失败则执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

看到这里从宏观上我们就能看到整个task投递到线程池的一个过程,其实这里最主要的方法是addWorker,其实addworker里才是线程池的精华,里面有如何创建线程及start的逻辑,如何回收过期的线程,如何重复利用创建的线程去运行task的逻辑

4、线程池是什么时候创建线程的

线程池的线程不是线程池创建的时候创建的,线程池的线程是在调用addWorker方法,并且addWorker执行成功才会创建
下面我们一起分析一下addworker代码,这个方法很长,其实要看明白这个方法我们要先看一下Worker这个类,可以看到这个类实际上实现了Runnable接口,其实线程池中运行的runnable任务都会被包装成Worker对象

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
     
        private static final long serialVersionUID = 6138294804551838833L;
        //当前worker会绑定一个线程
        final Thread thread;
       
        //当前worker处理的第一个任务
        Runnable firstTask;
        volatile long completedTasks;
        //创建worker时就会生成 一个线程及赋值firstTask,
        //注意线程池的线程就是在这里被创建的
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //注意看这里,这个线程创建的时候传的是this,也就意味着一会this.thread.start时,
            //执行的是worker对象的run方法
            //这个大家想一下,回味一下
            this.thread = getThreadFactory().newThread(this);
        }
}

下面在具体分析一下addWorker是如何start线程及重复利用线程运行任务的

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //获取worker数量
            int c = ctl.get();
            //获取线程池状态
            int rs = runStateOf(c);

            // 如果线程池状态为SHUTDOWN就不接收任务了直接returnfalse
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //这里会判断一下worker数量
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        
        //如果代码运行到了这里,就表示线程池状态正常,任务达到了创建worker线程的条件
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //这里会创建一个worker对象,
            //结合上面的代码,worker对象中会包含一个线程和一个firstTask
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //下面这一部分是用来判断需不需要将worker线程进行缓存,给其他的任务使用
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //注意看这个workers对象,这是一个hashset,用来缓存创建好的worker对象
                        //注意在强调一下这个worker对象包含一个Thread引用及Runnable引用
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }

                //缓存好worker线程后,这里会执行线程的start逻辑
                //注意线程池的任务就是在这里开启start使任务真正进入runnable状态的
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

下面我们看一下start的具体逻辑:
刚刚我们已经看到了,thread是worker对象的一个属性,实际上创建线程时,传的参数就是worker对象本身,所以线程start执行的逻辑就是worker对象的run方法

//worker的run方法很简单,下面我们看一下runWorker方法
public void run() {
            runWorker(this);
        }

看完这个我们就基本上看完了线程池的核心逻辑了,但是还有一些细节没有仔细看,后面我会提到,留给大家思考

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //获取一下fisrtTask
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           //这个task什么时候不为空?
           //注意这里会有一个隐含的问题,我们提交的任务存放的顺序是 核心worker->队列->最大线程worker
          //这里的getTask是从队列里获取的任务
          //这里task的执行顺序就变成了,核心worker->最大线程worker->队列
          //不知道大家有没有get到我的点,可以做一个实验就是为task编一个号,比如安1-10的顺序提交任务,最终执行的结果却是 1,2,3,4,8,9,10,5,6,7这种顺序,这里就是出现这种提交顺序和执行顺序不一样的原理
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //这里会最终调用task的run方法,到此线程池的创建到运行任务就看结束了
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {

            //这里会销毁线程,在队列为空的时候,会销毁线程,让线程数量停留在corePoolsize范围内,什么时候会执行到这,当task为空的时候,什么时候task为空,看getTask的逻辑,getTask会判断队列是否为空及活跃线程的时间来返回task的值,这里就不细讲了
            processWorkerExit(w, completedAbruptly);
        }
    }

5、线程池是如何重复利用线程的

看完上面的分析,其实就能回答这个问题了,当任务提交时会创建worker对象,这个对象里会有绑定一个线程,同时会将worker对象放入workers Set中,创建成功后,会立马调用worker.thread.start方法启动线程,这个线程的run方法进行了包装,首先判断fisrtTask是否为空如果不为空则直接运行,否则会while循环拿缓存队列中的任务,知道缓存队列为空,或者空闲线程超过了keepalive时间就会销毁线程,以保证线程维持在corePoolSize的大小

6、任务提交的顺序和执行的顺序是一样的吗?

不一样,提交顺序是 核心worker线程->队列->非核心worker线程,
但是执行的顺序却是核心worker任务->非核心worker任务->队列任务
如果能理解这个,线程池就真的理解的差不多了

7、其他

上面几个问题有些是我在看源码时的困惑点,有些是我看完源码之后的一些想法,除了这些问题外,线程池还有很多精妙的地方比如,
a、线程池的状态和核心线程数其实是用一个4个字节int表示的,为什么要这么表示
b、线程池中使用到的设计模式有哪些
c、线程池本身就是并发场景下提交任务的,那它自己的安全性是如何保证的,execute方法是如何保证安全性的

上一篇下一篇

猜你喜欢

热点阅读