工作生活

J.U.C之线程池04:源码解析-任务提交

2019-07-02  本文已影响0人  贪睡的企鹅

任务提交

线程池的创建仅仅只是创建了线程池的对象,要想线程池运行起来需要我们从提交给线程池。本章只分析任务提交给work处理。work内部处理留在下一章。

回顾线程池如何处理任务

image

前面说到一个线程需要运作起来需要任务。当一个订单被指派到工厂时是如何运作的呢?

1 当员工人数 < corePoolSize,每接到一个任务就会去雇佣一个新员工来完成这个任务.对于线程池来说就创建一个work标记为core work线程执行接收的任务

2 如果 (员工人数> corePoolSize) 且(员工人数 < maximumPoolSize),将任务放入流水线,不在雇佣员工。对于线程来说线程会将任务放入工作队列。不在创建新的work.

3 如果情况2中流水线容量满了,说明当前任务已经超负荷。需要雇佣新员工来处理新的任务。对于线程池来说就创建一个work来执行新任务

4 如果雇佣的员工已达到上线maximumPoolSize,且流水线容量也满,则新任务只好让售前拒绝。对于线程池来说就是交给RejectedExecutionHandler处理。

[图片上传失败...(image-533abc-1562078216202)]

execute 本质是提交一个任务给线程池执行。

public interface Executor {

    void execute(Runnable command);
}

ThreadPoolExecutor源码实现:

执行任务 execute

    /**
     * 执行任务
     */
    public void execute(Runnable command) {
        /** 提交任务为null 抛出异常。 **/
        if (command == null)
            throw new NullPointerException();

        /** 获取ctl **/
        int c = ctl.get();

        /** work线程数量少于corePoolSize **/
        if (workerCountOf(c) < corePoolSize) {
            /** 创建新work线程并设置为核心线程执行任务 addWorker(command, true)  **/
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        /** 进入此逻辑表示work线程数量大于corePoolSize或者前一步执行失败 **/

        /** 判断线程池是Running运行状态,将任务添加到workQueue尾部成功(队列满了返回false) **/
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            /** Double Check下当前线程状态是不是Running运行状态,不是就删除刚刚添加的任务,执行拒绝任务 **/
            if (! isRunning(recheck) && remove(command))
                reject(command);
            /** 异常情况 前面workerCountOf(c) < corePoolSize说明当时还存在大量work,说明线程池突然停止,为保证任务都能处理,
             * 创建一个临时work去处理当前workQueue中的任务  **/
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

        /** 队列满了,创建一个非核心work执行新添加任务 **/
        else if (!addWorker(command, false))
             /** 执行失败,执行拒绝任务 **/
            reject(command);
    }

创建work执行任务 addWorker

校验

addWorker 返回boolean说明addWorker存在校验的逻辑

校验线程池的状态

     /**
     * rs < SHUTDOWN 正常运行状态,通过校验
     * rs > SHUTDOWN情况下,线程池状态处于,STOP、TIDYING、TERMINAT* ED状态都不接收新任务 退出
     * 当rs == SHUTDOWN 需要 firstTask == null && !workQueue.isEmpty() 表示创建一个work执行空任务。就是去执行任务队列任务可以通过校验
     * 其他情况 退出
     * **/
    if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                    firstTask == null &&
                    ! workQueue.isEmpty()))
        return false;

校验执行的线程数

      /** 使用CASwork数量+1 **/
        for (;;) {
            /** 获取当前work数量 **/
            int wc = workerCountOf(c);

            /** 核心work数量大于corePoolSize,总work大于maximumPoolSize直接返回 **/
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;

            /** worker + 1,成功跳出retry循环 **/
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            /** 如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断 **/
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }

创建work并执行任务

/** 创建work并执行任务 **/
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            /** 实例化:Worker,并分配任务firstTask **/
            w = new Worker(firstTask);
            final Thread t = w.thread;
            /** work中工作线程不为null **/
            if (t != null) {
                /** 获取主锁:mainLock **/
                final ReentrantLock mainLock = this.mainLock;
                /** 加锁  **/
                mainLock.lock();
                try {
                    /** 获取当前线程池状态 **/
                    int rs = runStateOf(ctl.get());

                    /** 当前线程池状态为运行,或当前线程池状态为SHUTDOWN,提交是null任务通过校验。
                     *  
                     * **/
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //将创建的work添加到workers集合中    
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    /** 释放锁  **/
                    mainLock.unlock();
                }
                /** 创建成功,启动work执行任务 **/
                if (workerAdded) {
                    /** 启动work **/
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                /** 失败创建work只能当前线程池状态不是运行状态 **/
                addWorkerFailed(w);
        }
        return workerStarted;

添加work失败

在addWorker()方法中,如果线程t==null,或者在add过程出现异常,会导致workerStarted == false,那么在最后会调用addWorkerFailed()方法:

 /**
     * 失败创建work只能当前线程池状态不是运行状态
     */
    private void addWorkerFailed(Worker w) {
        /** 获取主锁:mainLock **/
        final ReentrantLock mainLock = this.mainLock;
        /** 加锁 **/
        mainLock.lock();
        try {
            /** 尝试从workers删除 **/
            if (w != null)
                workers.remove(w);
            /** 将work数量-1 **/
            decrementWorkerCount();
            /** 尝试将线程池状态设置为Terminate **/
            tryTerminate();
        } finally {
            /** 释放 **/
            mainLock.unlock();
        }
    }

尝试将线程池状态设置为Terminate

/**
     * 尝试将线程池状态设置为Terminate
     */
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /**
             * 判断线程池能否进入TERMINATED状态
             * 如果以下3中情况任一为true,return,不进行终止
             * 1、还在运行状态
             * 2、状态是TIDYING、或 TERMINATED,已经终止过了
             * 3、SHUTDOWN 且 workQueue不为空
             * 4
             */
            if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;

            /** 线程池workQueue不为空 return,并中断workQueue其中一个work**/

            /**
             * 线程池为stop状态,且还存在work,中断唤醒一个正在等任务的空闲worker,
             * 再次调用getTask(),线程池状态发生改变,返回null,work工作线程退出循环
             */
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            /** 获取主锁:mainLock **/
            final ReentrantLock mainLock = this.mainLock;
            /** 加锁 **/
            mainLock.lock();
            try {
                /** 将线程池状态设置为TIDYING **/
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        /** 释放子类实现 **/
                        terminated();
                    } finally {
                        /** 将线程池状态设置为TERMINATED **/
                        ctl.set(ctlOf(TERMINATED, 0));
                        /** 释放锁 **/
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                /** 释放锁 **/
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
上一篇 下一篇

猜你喜欢

热点阅读