安卓

ThreadPoolExecutor原理解析

2021-07-27  本文已影响0人  luncene_e110

ThreadPoolExecutor的运行状态有5种,分别为:


image.png

其生命周期转换如下入所示:

图片
状态值 32位二进制值
RUNNING 1110 0000 0000 0000 0000 0000 0000 0000
SHUTDOWN 0000 0000 0000 0000 0000 0000 0000 0000
STOP 0010 0000 0000 0000 0000 0000 0000 0000
TIDYING 0100 0000 0000 0000 0000 0000 0000 0000
TERMINATED 0110 0000 0000 0000 0000 0000 0000 0000

从数值来看running<shutdown<stop<tidying<terminated.

变量名 变量值
CAPACITY 0001 1111 1111 1111 1111 1111 1111 1111
~CAPACITY 1110 0000 0000 0000 0000 0000 0000 0000

获取线程池状态方法:

    private static int runStateOf(int c)     { return c & ~CAPACITY; }

从代码和数值上看就是取高三位的值,也就是用高三位来确定线程池状态

这个理解清楚以后我们再来看任务如何提交:


image.png

看看源码:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
       //默认情况ctl的初始值是RUNNING,然后获取当前的工作线程,如果工作线程<核心线程,则添加work
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
     //判断线程池当前处于running状态,且能往队列里插入任务
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
           //再次检查,如果线程池不是running状态,需要任务从队列中移除(非running状态不再接收task)
            if (! isRunning(recheck) && remove(command))
               //然后直接拒绝
                reject(command);
            else if (workerCountOf(recheck) == 0)
               //如果工作线程为0,需要新启线程执行队列里的任务
                addWorker(null, false);
        }
     //队列满了,需要启动非核心线程来执行任务
        else if (!addWorker(command, false))
            reject(command);
    }

这个是任务提交的整体逻辑。
我们再来关注下addworker的逻辑


image.png

看下源码:

/core 为true 核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

           //线程池状态>=shutdown不再接收新的任务
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

从addWorker逻辑里可以看到会启动线程执行任务,会调用Worker里的runWork方法,看下流程:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

上面有一个getTask逻辑是一个doWhile循环,线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。
再来看一卡getTask的逻辑:


image.png
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 线程池已停止
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
上一篇 下一篇

猜你喜欢

热点阅读