ThreadPoolExecutor详解

2018-12-14  本文已影响0人  永远的太阳0123

1 ThreadPoolExecutor中的状态

(1)RUNNING:可以接受新任务,可以处理阻塞队列中的任务。
(2)SHUTDOWN:不接受新任务,可以处理阻塞队列中的任务,中断空闲Worker。
(3)STOP:不接受新任务,不处理阻塞队列中的任务,中断所有Worker。
(4)TIDYING:所有任务执行完毕,workerCount等于0,开始执行terminated方法。
(5)TERMINATED:terminated方法执行完毕。

2 ThreadPoolExecutor中的状态转换

(1)RUNNING到SHUTDOWN:调用了shutdown方法。
(2)(RUNNING或SHUTDOWN)到STOP:调用了shutdownNow方法。
(3)SHUTDOWN到TIDYING:阻塞队列中和线程池中没有任务。
(4)STOP到TIDYING:线程池中没有任务。
(5)TIDYING到TERMINATED:terminated方法执行完毕。

3 ThreadPoolExecutor中的部分字段

(1)ctl的高3位代表线程池的状态,低29位代表workerCount。
(2)COUNT_BITS=32-3=29。
(3)CAPACITY=2^29-1=536870911,二进制为00011111111111111111111111111111。
(4)RUNNING的二进制为11100000000000000000000000000000。
(5)SHUTDOWN的二进制为00000000000000000000000000000000。
(6)STOP的二进制为00100000000000000000000000000000。
(7)TIDYING的二进制为01000000000000000000000000000000。
(8)TERMINATED的二进制为01100000000000000000000000000000。
(9)runStateOf方法可以获取线程池的状态。
(10)workerCountOf方法可以获取workerCount。
(11)ctlOf方法可以根据线程池的状态和workerCount获取ctl。
(12)runStateLessThan方法可以比较线程池的两个状态。
(13)runStateAtLeast方法可以比较线程池的两个状态。
(14)isRunning方法可以判断线程池是否处于Running状态。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

(15)workQueue:阻塞队列。
(16)mainLock:访问workers集合、访问largestPoolSize等线程池的统计型变量、执行shutdown方法、执行shutdownNow方法都需要获取该锁。
(17)workers:保存ThreadPoolExecutor.Worker对象的HashSet集合。只有获取mainLock,才能访问该集合。
(18)termination:和mainLock绑定的Condition对象,执行awaitTermination方法和tryTerminate方法时使用该Condition对象。
(19)threadFactory:线程工厂。
(20)handler:拒绝策略。
(21)keepAliveTime:空闲线程的存活时间。
(22)allowCoreThreadTimeOut:是否允许结束空闲的核心线程。如果allowCoreThreadTimeOut等于true,则keepAliveTime必须大于0。
(23)corePoolSize:核心线程数。
(24)maximumPoolSize:最大线程数。
(25)defaultHandler:默认的拒绝策略。

    private final BlockingQueue<Runnable> workQueue;
    private final ReentrantLock mainLock = new ReentrantLock();
    private final HashSet<Worker> workers = new HashSet<Worker>();
    private final Condition termination = mainLock.newCondition();
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;
    private volatile long keepAliveTime;
    private volatile boolean allowCoreThreadTimeOut;
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

4 ThreadPoolExecutor中的构造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

4.1 Executors中的defaultThreadFactory方法

    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

4.1.1 Executors.DefaultThreadFactory内部类

Executors.DefaultThreadFactory实现了ThreadFactory接口。

    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
4.1.1.1 ThreadFactory接口

ThreadFactory接口中只定义了newThread方法。

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

5 ThreadPoolExecutor.Worker内部类

ThreadPoolExecutor.Worker继承了AbstractQueuedSynchronizer,实现了Runnable接口。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        // Worker中的线程对象
        final Thread thread;
        // 线程的第一个任务
        Runnable firstTask;
        volatile long completedTasks;

        // 传入的firstTask可以等于null
        Worker(Runnable firstTask) {
            // 将state设为-1
            // 执行runWorker方法之前,state一直等于-1
            // 查看interruptIfStarted方法可以发现,如果state小于0,不允许中断线程对象
            setState(-1);
            this.firstTask = firstTask;
            // 使用线程工厂创建线程对象
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }

        // state等于0代表Worker锁未被占用,state等于1代表Worker锁已被占用
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

6 ThreadPoolExecutor中的execute方法

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 获取ctl的数值
        int c = ctl.get();
        // 如果workerCount小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 尝试启动一个新线程,并且将这个线程的第一个任务设为command
            // 如果新线程启动成功,直接返回
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 运行到这里有两种可能,要么workerCount大于等于核心线程数,要么新线程启动失败
        // 如果线程池处于RUNNING状态,并且成功将当前任务command添加到阻塞队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 成功将当前任务command添加到阻塞队列中之前,其它线程可能尝试关闭线程池,因此需要重新获取线程池的状态
            // 如果线程池已不处于RUNNING状态,并且成功从阻塞队列中移除了command
            if (! isRunning(recheck) && remove(command))
                // 执行拒绝策略
                reject(command);
            // 运行到这里有两种可能,要么线程池还处于RUNNING状态,要么线程池已不处于RUNNING状态但command已开始执行或执行完毕
            // 如果workerCount等于0
            else if (workerCountOf(recheck) == 0)
                // 尝试启动一个新线程,并且将这个线程的第一个任务设为null
                addWorker(null, false);
        }
        // 运行到这里有两种可能,要么线程池不处于RUNNING状态,要么阻塞队列已满
        // 尝试启动一个新线程,并且将这个线程的第一个任务设为command
        // 如果新线程启动失败,说明线程池不处于RUNNING状态或workerCount已达到最大线程数
        else if (!addWorker(command, false))
            // 执行拒绝策略
            reject(command);
    }

6.1 ThreadPoolExecutor中的addWorker方法

    // firstTask:线程的第一个任务,可以传入null
    // core:线程数量的界限。true代表以corePoolSize作为界限,false代表以maximumPoolSize作为界限
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 如果线程池处于STOP或TIDYING或TERMINATED状态,直接返回false
            // 如果线程池处于SHUTDOWN状态,firstTask不等于null,直接返回false
            // 如果线程池处于SHUTDOWN状态,firstTask等于null,阻塞队列中没有任务,直接返回false
            // 如果线程池处于SHUTDOWN状态,firstTask等于null,阻塞队列中还有任务,可以创建Worker
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                // 如果workerCount已达到线程数量的界限,直接返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 如果CAS操作成功,说明所有条件都已满足,可以开始创建Worker
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();
                // 如果CAS操作失败,说明还有其它线程正在创建Worker
                // 此时需要重新获取线程池的状态。如果状态不变,继续内层循环;如果状态改变,回到外层循环
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        // 运行到这里,开始创建Worker
        // workerStarted代表是否已启动新创建的Worker中的线程对象
        boolean workerStarted = false;
        // workerAdded代表是否已将新创建的Worker添加到workers这个HashSet中
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 传入firstTask,创建Worker
            w = new Worker(firstTask);
            // 获取Worker中的线程对象
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    // 获取mainLock之前,其它线程可能尝试关闭线程池,因此需要重新获取线程池的状态
                    // 如果(线程池处于RUNNING状态)或(线程池处于SHUTDOWN状态并且firstTask等于null)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 如果已启动Worker中的线程对象
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            // 更新largestPoolSize
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 如果已将新创建的Worker添加到workers这个HashSet中
                if (workerAdded) {
                    // 启动新创建的Worker中的线程对象
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果没有启动新创建的Worker中的线程对象
            if (! workerStarted)
                // 回滚操作
                addWorkerFailed(w);
        }
        return workerStarted;
    }

6.1.1 ThreadPoolExecutor中的addWorkerFailed方法

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            // workerCount减1
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

6.1.2 ThreadPoolExecutor.Worker中的run方法

启动新创建的Worker中的线程对象,JVM会自动调用Worker中的run方法。

    public void run() {
        runWorker(this);
    }
6.1.2.1 ThreadPoolExecutor中的runWorker方法
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 将Worker中的state设为0,允许中断线程对象
        w.unlock();
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                // 执行任务前,获取Worker锁
                w.lock();
                // 如果线程池处于STOP或TIDYING或TERMINATED状态,确保当前线程已被中断
                // 如果线程池处于RUNNING或SHUTDOWN状态,确保当前线程未被中断
                // interrupted方法会清除当前线程的中断状态
                // 如果线程池处于RUNNING或SHUTDOWN状态,interrupted方法返回true,说明在此之前当前线程已被中断,现在当前线程的中断状态已被清除。其它线程可能调用shutdownNow方法,因此需要重新判断线程池是否处于RUNNING或SHUTDOWN状态
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    // 中断当前线程
                    wt.interrupt();
                try {
                    // 空方法,子类中可以重写该方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行Runnable对象中的run方法不会抛出Checked Exception
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 空方法,子类中可以重写该方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    // 执行任务后,释放Worker锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 运行到这里,需要结束当前线程
            // 运行到这里有两种可能,要么getTask方法返回null,要么执行任务时抛出了异常或错误
            // 第一种情况,completedAbruptly等于false,执行getTask方法时workerCount已减1
            // 第二种情况,completedAbruptly等于true,workerCount未减1
            processWorkerExit(w, completedAbruptly);
        }
    }
6.1.2.1.1 ThreadPoolExecutor中的getTask方法
    private Runnable getTask() {
        // timedOut代表上一次调用poll方法获取任务是否超时
        boolean timedOut = false;
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 如果(线程池处于STOP或TIDYING或TERMINATED状态)或(线程池处于SHUTDOWN状态并且阻塞队列中没有任务)
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // workerCount减1
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // timed代表如果获取任务超时是否允许结束Worker
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 如果workerCount大于最大线程数,尝试CAS操作
            // 如果(上一次调用poll方法获取任务超时)并且(获取任务超时允许结束Worker)并且(workerCount大于1),尝试CAS操作
            // 如果(上一次调用poll方法获取任务超时)并且(获取任务超时允许结束Worker)并且(阻塞队列中没有任务),尝试CAS操作
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                // 如果CAS操作成功,workerCount减1
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                // 如果获取任务超时允许结束Worker,调用poll方法获取任务
                // 如果获取任务超时不允许结束Worker,调用take方法获取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 如果成功获取任务
                if (r != null)
                    return r;
                // 运行到这里,说明调用poll方法获取任务超时
                timedOut = true;
            } catch (InterruptedException retry) {
                // 调用poll方法或take方法获取任务时,当前线程可能被中断,此时需要捕获InterruptedException,重置timedOut,重新开始循环
                timedOut = false;
            }
        }
    }
6.1.2.1.2 ThreadPoolExecutor中的processWorkerExit方法
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果completedAbruptly等于true
        if (completedAbruptly)
            // workerCount减1
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 从workers这个HashSet中移除当前Worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        // 如果线程池处于RUNNING或SHUTDOWN状态
        if (runStateLessThan(c, STOP)) {
            // 如果completedAbruptly等于false,说明getTask方法返回null,需要判断是否尝试启动一个新线程
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果允许结束空闲的核心线程,并且阻塞队列中还有任务
                if (min == 0 && ! workQueue.isEmpty())
                    // 至少保留一个线程
                    min = 1;
                if (workerCountOf(c) >= min)
                    return;
            }
            // 尝试启动一个新线程,并且将这个线程的第一个任务设为null
            addWorker(null, false);
        }
    }

6.2 ThreadPoolExecutor中的reject方法

    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

6.2.1 ThreadPoolExecutor中的四种拒绝策略

(1)CallerRunsPolicy:如果线程池处于Running状态,直接在提交任务的线程中执行任务;如果线程池不处于Running状态,直接丢弃任务。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    public boolean isShutdown() {
        return ! isRunning(ctl.get());
    }

(2)AbortPolicy:抛出异常。ThreadPoolExecutor默认使用该拒绝策略。

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

(3)DiscardPolicy:直接丢弃任务。

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

(4)DiscardOldestPolicy:如果线程池处于Running状态,从阻塞队列中移除头部任务并重新调用execute方法。

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

6.3 ThreadPoolExecutor中的remove方法

    public boolean remove(Runnable task) {
        // 从阻塞队列中移除指定任务
        boolean removed = workQueue.remove(task);
        tryTerminate();
        return removed;
    }
上一篇 下一篇

猜你喜欢

热点阅读