线程那些事

线程池那些事之ThreadPoolExecutor

2018-10-23  本文已影响33人  土豆肉丝盖浇饭

前言

我们都知道,线程是应用中宝贵的资源,创建一个线程需要很大的损耗,所以最好能够复用线程。因此我们一般都通过来线程池来维护管理线程。

ThreadPoolExecutor是JDK提供的线程池实现,我们在开发过程中最常使用到的就是这个。

使用

创建

首先我们要创建线程池,可以通过ThreadPoolExecutor的构造函数或者java提供的工具类Executors来创建

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0,100,20, TimeUnit.MICROSECONDS,new LinkedBlockingQueue<>(100));
        
ExecutorService executorService= Executors.newCachedThreadPool();

在Spring中也可以使用ThreadPoolTaskExecutor这个类来配置线程池,底层还是用的是java的ThreadPoolExecutor,但是能够作为bean放到ioc容器中去,也能spring其他组件适配。

使用

1.提交异步任务

        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("test");
            }
        });

2.提交异步任务并且进行跟踪执行状态

我们可以使用submit方法提交任务,这个方法返回的Future对象可以用于跟踪任务执行状态

        Future future = executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("hello");
            }
        });
        while (future.isDone()){
            System.out.println("world");
        }

解析

分析ThreadPoolExecutor的源码主要是想知道线程池底层是如何处理我们提交的的Runnable,以及它的生命周期和其他细节。

线程池初始化参数

在创建线程池的时候,我们会通过构造函数配置线程池的配置参数,了解这些参数的实际含义是十分重要的

参数 含义
corePoolSize 线程池核心工作线程的数量
maximumPoolSize 线程池工作线程最大值
keepAliveTime 工作线程的存活时间
unit keepAliveTime的单位
workQueue 放置任务的队列
threadFactory 线程工厂
handler 处理任务溢出的异常处理器

上面的介绍相对简单,实际上的作用不是这么几个字能描述的,想了解它们的作用往下看吧。

线程池生命周期

线程池运行状态 描述
RUNNING 线程池初始状态,这个状态下可以接受任务并且调度任务执行
SHUTDOWN 这个状态会拒绝接受新任务,但是会执行完已经加入的任务
STOP 不接受新任务,并且也不执行已经加入的任务,会中断执行中的任务
TIDYING 当所有任务执行结束,并且工作线程为0的时候会进入这个状态,会运行terminated方法
TERMINATED terminated方法运行结束的时候进入这个状态

一般的话,如果不执行shutdown或者shutdownNow方法是不会进入TIDYING状态的,因为核心工作线程都还存活(cachedThreadPool除外,因为coreSize=0)。

TIDYING和TERMINATED状态切换是连续的,在tryTerminate方法内执行

所以一般的状态转换为
RUNNING->SHUTDOWN/STOP->TIDYING->TERMINATED
RUNNING->SHUTDOWN->STOP->TIDYING->TERMINATED

配置了allowCoreThreadTimeOut=true后,核心线程也能被回收。但是不是说核心线程数为0了,就会进入TIDYING状态。必须由SHUTDOWN或STOP触发。

工作线程组机制

在线程池中会维护一组工作线程来处理我们的任务。

提交任务

我们先从提交任务的execute方法入手。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //1.如果当前工作线程数小于配置的核心线程池大小
        //直接创建一个工作线程来执行对应command任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //2.如果当前工作线程数大于等于配置的核心线程池大小  
        //把任务放到workQueue中去
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次检查线程池状态,如果不是RUNNING,
            //那么从workQueue移除这个任务,并且reject 
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3.如果上一步的workQueue加入不了,那么增加工作线程
        else if (!addWorker(command, false))
            //如果工作线程增加失败,那么reject
            reject(command);
    }

代码中的注释讲的很明白了,reject的话会通过我们配置的handler做对应处理。

工作线程运行机制

我们先看下工作线程实现类Worker



Worker继承了AbstractQueuedSynchronizer,在执行任务的时候会将自己锁住,因此就可以调用work的tryLock方法来判断当前worker是否正在执行任务。
同时实现了Runnable接口,因为工作线程也是线程。

直接看Worker的Run方法

public void run() {
            runWorker(this);
        }

委托给了外部类的runWorker方法,为什么放到外部类,因为有很多配置都在外部类,比如keepAliveTime

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //getTask用于从workQueue获取任务
            //这边是一个循环,会不断的拿任务,如果每次都拿得到,那么不断循环,一旦拿不到,那么这个线程就会死掉。
            //拿不拿得到取决于keepAliveTime的coreSize的配置
            while (task != null || (task = getTask()) != null) {
                //拿到任务后,执行前,会先把自己锁住
                w.lock();
                //判断线程池是不是STOP状态,如果是STOP状态的话,会对当前线程进行中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //任务执行前的hook
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //实际执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //任务执行后的hook
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    //解锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //线程结束时做的一些工作
            processWorkerExit(w, completedAbruptly);
        }
    }

worker会不断的从workQueue获取任务,一旦获取不到,那么就会退出while循环,调用processWorkerExit做这个工作线程退出的相关工作。

run方法运行到结束,也就代表这个线程工作完成,要被回收了。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //汇总工作线程完成的任务数
            completedTaskCount += w.completedTasks;
            //从workers中移除当前工作线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
         
        //判断是否能进入terminate状态
        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            //completedAbruptly用来判断是否是异常退出
            //如果是异常退出的话,跳过下面这个逻辑,直接补充增加一个worker
            //如果是正常退出,判断当前工作线程数是否小于corePoolSize,小于则增加worker,注意allowCoreThreadTimeOut=true的情况,会忽略corePoolSize
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //如果当前工作线程数大于min,那么不增加工作线程
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

processWorkerExit主要对结束的工作线程的数据进行收集,以及判断是否需要再添加一个工作线程。

接下来我们看下getTask的逻辑,看下它在什么情况下会返回null,让这个线程结束

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //如果线程池状态为SHUTDOWN并且workqueue为空
            //或者线程池状态为STOP
            //那么返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            //判断当前获取任务时是否需要设置超时,超时获取不到任务会返回null
            //allowCoreThreadTimeOut=true强制每个工作线程都会超时
            //当wc(当前工作线程数) > corePoolSize也会开启超时获取
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //在工作线程数大于maximumPoolSize或者已经获取超时的情况下,返回null
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                //cas失败进行重试
                continue;
            }
            //下面的poll是代超时的获取任务
            //take不会超时,一直阻塞直到获取任务
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    //如果任务不为空,直接返回
                    return r;
                //如果在规定时间内没获取到任务,那么设置timedOut = true;
                timedOut = true;
            } catch (InterruptedException retry) {
                //线程中断重试??
                timedOut = false;
            }
        }
    }

在getTask方法通过超时获取返回null的方式动态调节了线程池工作线程的数量。

在processWorkerExit中可以注意到有一个tryTerminate方法,用于判断当前线程池是否需要进行terminate相关工作

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            //线程池状态为RUNNING,TIDYING或者TERMINATED时
            //或者状态为SHUTDOWN并且workQueue不为空时
            //return,不执行后续操作
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            //在上次if的过滤后,执行到这边的线程池状态为
            //SHUTDOWN并且workQueue为空
            //STOP状态
            //在上面两个条件下,直接中断这个工作线程
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            
            //执行到这边的话,说明就是工作线程数为0,并且workqueue也为空
           //可以先将状态切换为TIDYING
          //并且执行terminated方法
          //最后将状态设置为TERMINATED,并且通知等待在termination的线程
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

tryTerminate会在线程池为STOP或者SHUTDOWN状态并且workqueue为空的时候才会生效。其他逻辑看源码注释。

增加工作线程

在熟悉了工作线程的运行机制后,我们来看下addWorker的逻辑。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 只有线程池状态为RUNNING
            //或状态为SHUTDOWN,workQueue不为空,并且不是由于execute调用该方法的时候(注意firstTask == null)
            //才能增加工作线程
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            
            //使用自旋+cas 修改当前线程数
            for (;;) {
                int wc = workerCountOf(c);
                //当前线程数wc超过限制,那么返回false,停止创建工作线程
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        
        //下面是实际创建工作线程的逻辑
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //构造worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    //只有RUNNING状态或者SHUTDOWN并且firstTask为空的情况下才能增加工作线程
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                         //如果Worker内的线程已经启动,那么报错
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        //将创建的工作线程添加到workers集合
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //启动Worker内的工作线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                //对添加失败进行处理
                addWorkerFailed(w);
        }
        return workerStarted;
    }

有一点很奇妙,线程是在Worker的构造函数就创建了,然后在外部进行启动

 Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

可以看到newThread传入了worker本身。

销毁逻辑

主要是shutdown 和 shutdownNow两个方法。

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //修改线程池状态为SHUTDOWN
            advanceRunState(SHUTDOWN);
            //中断空闲线程
            interruptIdleWorkers();
            //shutdown的钩子
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        //判断是否可以进行terminate相关工作
        tryTerminate();
    }
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //线程池状态更新为STOP
            advanceRunState(STOP);
            //中断所有工作线程
            interruptWorkers();
            //获取所有workqueue中的任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        //判断是否可以进行terminate相关工作
        tryTerminate();
        //返回这些未执行的任务
        return tasks;
    }

interruptIdleWorkers和interruptWorkers读者自行查看。

从源码中可以看到shutdown和shutdownNow的主要区别是,shutdown会等所有任务执行完毕后才terminate,shutdownNow是直接强制中断所有工作线程,并且返回那些workqueue中没有被处理的任务。

最后

本文只讲解了execute这种提交异步任务的方式,在某些场景下,我们需要关注或者等待异步任务的执行,所以就需要调用submit得到Future这种方式处理,请听下回分解。

上一篇下一篇

猜你喜欢

热点阅读