JVM · Java虚拟机原理 · JVM上语言·框架· 生态系统编程语言爱好者Java 核心技术

ThreadPoolExecutor源码分析

2021-06-15  本文已影响0人  蓝梅

记得最开始接触并发编程是,看的第一块的源码就是ThreadPoolExecutor,但是之前没有做任何的笔记,今天再来复习一下

一、线程池主要属性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

线程池使用ctl代表线程状态和真正运行的线程数量,包括两个部分;第一个部分,就是高3位,代表线程池的状态;然后是低29位代表正在执行有效的线程数量;COUNT_BITS 等于 29;CAPACITY 表示二进制 29 个 1;这两个值是为了方便计算线程状态,以及线程数量的;

线程池状态主要有:
RUNNING:高三位为111,线程池正常运行状态
SHUTDOWN:高三位为000,该状态不接受新任务,处理已接收的任务
STOP:高三位为001,该状态不接受新任务,也不处理已接收的任务,并且会中断正在处理的任务,调用shutdownNow()方法时,可以从RUNNING或者是SHUTDOWN状态变为STOP状态
TIDYING:高三位为010,当所有线程都终止,ctl记录的线程数量变为0,则会变为TIDYING状态;
TERMINATED:高三位为011,当线程池彻底终止,就会变成TERMINATED状态

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                              BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

corePoolSize:核心线程数
maximumPoolSize:最大线程数
keepAliveTime:线程最大空闲时间
unit:空闲时间单位
workQueue:阻塞队列
threadFactory:生成线程的工厂类,默认使用Executors.defaultThreadFactory() 来创建线程
handler:拒绝策略,拒绝策略有一下几种
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;

二、线程池执行

当线程执行时,先看是否达到核心线程数,如果还没达到核心线程数则,直接起线程;当达到核心线程数,则把线程放入阻塞队列;当阻塞队列放满,则开始起最大线程数,当线程数量起到最大线程后;后续线程进入拒绝策略;当起了超过核心线程数的线程,去阻塞队列获取任务时,超过最大空闲时间则直接返回,然后关掉该线程。

三、源码分析

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    //当线程数小于核心线程数,则直接新起任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //优先判断线程池运行状态,然后再去把线程放入阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //再次判断运行状态,如果线程池状态关闭后,然后执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //如果当前线程池没有线程在跑,则启动一个非核心的线程,去执行
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //入队失败,则起非核心的线程去执行(最大线程数),如果失败,则执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

从上述代码就能看出,整个线程池执行的核心流程;我们再来分析一下细节,先看下内部类Worker

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        //当前执行的线程
        final Thread thread;
        //创建该woker时,传入的任务
        Runnable firstTask;
        //当前woker执行完成的任务数
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            //设置锁的初始状态
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //创建线程
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            //该线程调用的,是ThreadPoolExecutor runWorker方法
            runWorker(this);
        }

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        //使用CAS去获取锁
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        //释放锁
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        //调用该方法,去中断线程
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

线程池执行时,是先把每个任务封装成Worker,来执行的;我们来看看addWorker的代码分析

//参数为第一个执行的任务(有可能为空),是否创建的是核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 判断线程池运行状态
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            /*如果当线程达到CAPACITY最大数量
             *或者启用的是核心线程数,超过了核心线程数
             *或者超过了最大线程数,则添加woker失败
             */
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //使用cas对执行的线程数加一,如果成功则跳出循环,去添加woker
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            //如果执行到这里,线程池状态有变化,则重新判断
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //新建一个Worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //对当前线程池加锁,为了保证并发时,workers添加的安全性
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                //判断线程池的状态
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //添加到线程池维护的列表中引用
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            //添加成功,则运行当前woker线程
            if (workerAdded) {
                //woker线程执行,调用woker的run方法,又调用ThreadPoolExecutor的runWorker方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

接下来,我们再看看runWorker方法

final void runWorker(Worker w) {
    //wt目前只是判断了是否打断,beforeExecute,该方法为空,没有实现,如果我们自己实现线程池,可以做日志等其他的输出
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //先释放锁,是为了清除interrupt,任务还没开始,不允许打断
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //如果当前任务为空,则去阻塞队列获取任务
        while (task != null || (task = getTask()) != null) {
            //对当前woker加锁,这个加锁时为了防止在执行期间,被其他线程中断,主要是调用interruptIdleWorkers()这个方法
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            //判断线程池运行状态,和中断状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //直接执行我们传入的task的run方法,所以,线程池每次执行时,都是跑的自己woker线程,我们传入进去的Runnable对象,并没有调用start方法,只是跑了run方法
                    task.run();
                } catch (RuntimeException x) {
                    //如果有异常,则直接抛出,但是会在processWorkerExit方法中,新起一个woker
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                //执行完成则完成任务数加一,释放锁
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //处理线程跑完后的逻辑,主要是对是否是抛出异常做处理
        processWorkerExit(w, completedAbruptly);
    }
}

从runWorker方法中我们看到了,线程执行时,并不是去start,这样避免了线程上下文的切换,只是在当前线程执行了我们传入task的run方法,提高了系统的整体性能;接下来我们再看看getTask怎么来获取线程的;

private Runnable getTask() {
    boolean timedOut = false; 

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //如果当前线程池状态已经停止,则去把线程池清空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        //获取线程数量
        int wc = workerCountOf(c);
        /*
         *判断获取线程是否需要超时
         *jdk1.5之后,allowCoreThreadTimeOut可以设置是否允许核心线程数超时
         *一般是不会设置,则只有当线程数量超过了核心线程数,则从阻塞队列获取线程会超时
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        //如果该线程超时,或者超过了最大线程数,或者队列为空了,则去对当前运行的线程数减一
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //如果线程为核心线程数,则使用take方法阻塞获取线程,不会超时
            //如果不为核心线程数,则使用poll获取task,如果超时则获取为空,会进入上面一步,进行销毁,线程数量会减一
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            //如果获取到了,则跳出该方法,返回该task
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

从上面可以看出,当去从阻塞队列中获取任务时,是靠阻塞队列的特性去阻塞获取线程,如果获取不到,就直接销毁当前woker了;并且,woker中没有核心线程的标记,是根据数量去判断的,如果小于或者等于了我们设置的核心线程数,则会阻塞去获取task;

四、Executors

Executors工具类提供了几种线程池模型,这里分析一下,但是不建议大家用,最好还是根据自己的业务来新建线程池,因为这几种线程池可能会引起内存溢出

1.newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

FixedThreadPool核心线程数,最大线程数相同,队列是无界的阻塞队列,如果线程消费能力不够时,就有可能出现内存溢出的问题

2.newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

SingleThreadExecutor核心线程数和最大线程数都是1,和newFixedThreadPool一样,如果线程消费能力不够时,就有可能出现内存溢出的问题

3.newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

CachedThreadPool,核心线程数为0,最大线程数为MAX_VALUE,传入的阻塞队列长度只有1。如果线程消费能力不够时,可能会启用很多线程,所以也可能会出现内存溢出的问题;

上一篇下一篇

猜你喜欢

热点阅读