ThreadPoolExecutor源码分析

2018-12-17  本文已影响0人  无敌小圈圈

由于工作需要,需要进行并行查询数据库,返回结果后进行计算,也就是说各个线程要全部运行完毕,才能进行下一步的计算,这时候要用到CountDownLatch。
先上代码,实现的比较简单,但是这不是重点,哈哈哈,主要是分析Executor相关的原理。
线程池相关参数及含义等可参考下面这篇文章,内部也有部分源码分析,但是个人认为不够详尽
https://blog.csdn.net/baidu_37107022/article/details/77415936

package com.gome.mars.utils;

import java.util.concurrent.*;

/**
 * @ClassName
 * @Description TODO
 * @Author oo
 * @Date 2018/12/10 18:00
 * @Version 1.0
 **/
public class SimpleParallelTaskExecutor {
    private CountDownLatch countDownLatch;
    // 此处实现了固定大小的线程池,可根据需要进行其他实现,每次不再新建线程池实例
    private static ExecutorService executor=Executors.newFixedThreadPool(20);
    //构造方法参数为并行线程的数量,并且每次new CountDownLatch对象,因为不可重复使用
    public SimpleParallelTaskExecutor(Integer nThreads) {
        this.countDownLatch = new CountDownLatch(nThreads);
    }
    //调用此方法向线程池中添加任务,此处对Callable进行了简单包装,为了执行完任务调用countDownLatch.countDown();
    public <V> Future<V> addTask(Callable<V> task) throws Exception {
        return executor.submit(new WrapperThread<V>(task,countDownLatch));
    }
    //可设置超时时间,检查任务是否运行完毕
    public boolean checkDone(long milliseconds) throws InterruptedException {
        return countDownLatch.await(milliseconds, TimeUnit.MILLISECONDS);
    }
    //Callable包装类,为了执行完任务调用countDownLatch.countDown();
    public class WrapperThread<V> implements Callable<V> {
        private Callable<? extends V> callable;
        private CountDownLatch countDownLatch;

        public WrapperThread(Callable callable, CountDownLatch countDownLatch) {
            this.callable = callable;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public V call() throws Exception {
            //此处直接调用callable.call();和直接调用thread.run()类似,没有起新的线程此处和加入的任务内部是同一个线程。
            V call = callable.call();
            this.countDownLatch.countDown();
            return call;
        }
    }
    public static void main(String[] args) throws Exception {
        SimpleParallelTaskExecutor simpleParallelTaskExecutor = new SimpleParallelTaskExecutor(2);
        Future<Integer> integerFuture = simpleParallelTaskExecutor.addTask(() -> {
            //此处模拟执行数据查询等任务
            Thread.sleep(2000);
            return 1;
        });
        Future<Integer> integerFuture1 = simpleParallelTaskExecutor.addTask(() -> {
            Thread.sleep(1000);
            return 2;
        });
        simpleParallelTaskExecutor.checkDone(3000);
        Integer integer = integerFuture.get();
        Integer integer1 = integerFuture1.get();
        System.out.println(integer);
        System.out.println(integer1); 
    }

}
Executor框架

我们在addTask方法处打断点,进入submit方法,可以看出我们进入的是AbstractExecutorService类的submit方法,此方法接受一个callable对象,返回Future<T>对象,我们可以在Future中获取执行结果。

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    } 
 /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

submit方法有几个重载方法,都是通过NewTaskFor方法,将任务包装成一个RunnableFuture对象,只不过Runnable没有返回结果,结果类型为null。
下面我们看看NewTaskFor都做了些什么事情。

     /**
     * Returns a {@code RunnableFuture} for the given runnable and default
     * value.
     *
     * @param runnable the runnable task being wrapped
     * @param value the default value for the returned future
     * @param <T> the type of the given value
     * @return a {@code RunnableFuture} which, when run, will run the
     * underlying runnable and which, as a {@code Future}, will yield
     * the given value as its result and provide for cancellation of
     * the underlying task
     * @since 1.6
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    /**
     * Returns a {@code RunnableFuture} for the given callable task.
     *
     * @param callable the callable task being wrapped
     * @param <T> the type of the callable's result
     * @return a {@code RunnableFuture} which, when run, will call the
     * underlying callable and which, as a {@code Future}, will yield
     * the callable's result as its result and provide for
     * cancellation of the underlying task
     * @since 1.6
     */
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

可以看出,直接创建了FutureTask对象,并且返回。所以,加入线程池的任务都被包装成FutureTask对象,没有返回值的返回值为空。
下面主要看execute(ftask);方法
ThreadPoolExecutor中的execute方法。
用一个32位数的高3位表示线程池状态,低29位表示正在运行的线程数量

public class ThreadPoolExecutor extends AbstractExecutorService {
    //初始状态为Running状态且运行线程数为0,所以是-1和0按位取或
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //32-3为29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //1左移29位再减1,低29位全为1,高位位0;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    //低29位取反都为0,高三位都为1,再和c进行按位与,只留下高三位,从而获取线程池状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //获取工作线程数量,与上面类似,取得低29位。
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
}

在程序运行中,反复使用了这几个方法,用来获取工作线程数或线程池状态

/**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        //获取c的值,判断工作线程数是否小于设定的核心线程数
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            //如果小于核心线程数,直接新建线程,新建成功则返回
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果上变判断不成立,那么工作线程数量大于等于核心线程数,
        //或者新建线程没有成功,那么如果线程池接受新任务,则往队列里添加一个任务
        //如果线程池中工作线程数已经达到了核心线程数,并且没有设置空闲销毁时间
        //新任务到来就只是向阻塞队列里增加一条新任务,线程池是如何拿到这个任务并执行的呢?
        if (isRunning(c) && workQueue.offer(command)) {
            //再次检查,防止并发状态下,线程池状态等有变化
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

重点方法是addWorker方法下面进一步分析

 /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    //第二个参数表示是否是核心线程,比较工作线程数目时,分别和corePoolSize 或者maximumPoolSize进行比较
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //此处为何还要有第二个判断?
            //我们分析一下当rs==SHUTDOWN 时,什么情况会返回false
            //1.线程池shutdown, 此时firstTask为null 并且workQueue为空时
            //2.线程池shutdown,firstTask不为null这时 workQueue状态已经没有用了
            //SHUTDOWN状态虽然不接受新任务,但是队列里的任务会执行完,
            //也就是说当线程池为SHUTDOWN时,为了执行完队列中的任务,
            //会不断添加firstTask为null的任务,firstTask为null代表要取队列中的任务
            //第一种情况表示队列中的任务已经清空了,无需再循环了,线程池可能将要进入stop状态了
            //第二种情况表示新任务到来,线程池已经不再接受了,所以返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //内层循环主要工作就是cas为增加一个工作线程
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //增加成功就退出外层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //线程池状态状态有变化就继续执行外层循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //包含一个Thread对象,传入的是Worker对象本身,后边有详细解释。
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    //正常运行状态或者队列中还有未执行的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //如果线程添加成功,就启动线程,执行任务。实际是执行runWorker方法。
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker实现了Runnable接口,继承AbstractQueuedSynchronizer类,所以本身就是一个线程类,有自己的run方法。

/**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //新建Thread对象,传入自身
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        //当调用Thread.start()时,新线程启动,调用runWorker方法,传入自身
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

runWorker方法是真正执行用户传进来的任务的地方,并且可以重写beforeExecute以及afterExecute方法,再任务执行前后加入自定义操作。

/**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     *
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread's
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     *
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果task!=null 就执行当前任务(工作线程数小于核心现场数),
            //如果为null(队列中有任务)就在队列中获取一个任务,
            //此处可以看出,如果队列中有任务,会一直while循环,直到队列为空,
            //队列为空时,由于是阻塞队列,线程将阻塞在这里,直到又有任务添加进队列
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //空方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //真正执行我们的任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //空方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //此方法用于处理当前工作线程退出的相关事宜
            processWorkerExit(w, completedAbruptly);
        }
    }

当此工作线程退出以后,相关清理及记录工作,当程序抛出异常,或队列不为空而没有工作线程时或工作线程数少于核心线程数时,会继续addWorker(null, false);替换当前工作线程。

/**
     * Performs cleanup and bookkeeping for a dying worker. Called
     * only from worker threads. Unless completedAbruptly is set,
     * assumes that workerCount has already been adjusted to account
     * for exit.  This method removes thread from worker set, and
     * possibly terminates the pool or replaces the worker if either
     * it exited due to user task exception or if fewer than
     * corePoolSize workers are running or queue is non-empty but
     * there are no workers.
     *
     * @param w the worker
     * @param completedAbruptly if the worker died due to user exception
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        //一个工作线程退出后会尝试终止线程池,通过判断当前线程池的状态,如果终止成功则不会进入下边的if判断,否则进入if判断
        tryTerminate();

        int c = ctl.get();
        //判断线程池是否是running或shutdown状态,再次判断completedAbruptly,
        //这个变量表示是否被打断,正常执行完毕一般为false,如果满足是就继续判断是否继续addWorker
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //1.如果设置了allowCoreThreadTimeOut 参数且队列为非空,则工作线程数为0的时候才addWorker
                //如果设置了allowCoreThreadTimeOut 且队列为空,则直接返回,不addWorker
                //如果没有设置allowCoreThreadTimeOut ,只要工作线程数小于核心线程数,都addWorker
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //关键这在这,加入一条firstTask为null的非核心线程任务,
            addWorker(null, false);
        }
    }

总结

线程池执行过程:

  1. submit(Callable<T> task)方法后,将task包装成一个FutureTask对象;
  2. 执行execute(ftask)方法
    if(工作线程数<核心线程数){
    addWorker(Runnable firstTask, boolean core)
    添加成功 return;
    }
    核心线程数已经达到最大
    if(线程池是running状态){
    向队列中添加一个任务
    workQueue.offer(command)
    }
  3. addWorker(Runnable firstTask, boolean core)
    自旋尝试改变workerCount数量
    compareAndIncrementWorkerCount(c)
    成功
    new Worker(firstTask);
    并启动线程
  4. runWorker(Worker w)
    while(如果当前Worker中task!=null 执行此任务
    否则从队列中task = getTask())
    调用task.run();
    队列为空时getTask()方法中根据参数设定判断何时返回null何时阻塞
  5. 当一个工作线程退出后执行processWorkerExit(w, completedAbruptly);方法中还会尝试终止线程池,如果线程池终止成功,则直接return
    否则判断是否继续addWorker(null, false)替换当前线程

over
有不正确的地方欢迎指正!

上一篇下一篇

猜你喜欢

热点阅读