JavaJavaConcurrent

ThreadPoolExecutor(3) —— 干活的人 Wo

2019-11-30  本文已影响0人  若琳丶

一、前言

前一篇文章,大体说明了一下线程池如何添加一个新的Worker去执行任务。本篇来详细分析 Worker 本身。

二、Worker 的结构

2.1 Worker 整体结构

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        /** Worker所绑定的执行任务的线程. */
        final Thread thread;
        /** 初始化时需要执行的任务,有可能为空 */
        Runnable firstTask;
        /** 完成任务数 */
        volatile long completedTasks;

        /**
         * 通过给定的任务(有可能为空)来创建初始化,初始化时会创建一条线程进行绑定
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** 实现Runnable接口的run方法  */
        public void run() {
            runWorker(this);
        }

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

可以看到 Worker 继承了AbstractQueuedSynchronizer,并实现了Runnable。

2.2 runWorker 方法

runWorker 方法是ThreadPoolExecutor的方法。

/**
 * 执行 Worker
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 调用unlock()是为了让外部可以中断
    w.unlock(); // allow interrupts
    // 这个变量用于判断是否进入过自旋(while循环)
    boolean completedAbruptly = true;
    try {
        // 这儿是自旋
        // 1. 如果firstTask不为null,则执行firstTask;
        // 2. 如果firstTask为null,则调用getTask()从队列获取任务。
        // 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
        while (task != null || (task = getTask()) != null) {
            // 这儿对worker进行加锁,是为了达到下面的目的
            // 1. 降低锁范围,提升性能
            // 2. 保证每个worker执行的任务是串行的
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 如果线程池正在停止,则对当前线程进行中断操作
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            // 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
            // 这两个方法在当前类里面为空实现。
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                // 帮助gc
                task = null;
                // 已完成任务数加一 
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 自旋操作被退出,说明线程池正在结束
        processWorkerExit(w, completedAbruptly);
    }
}

总结一下runWorker方法的执行过程:

  1. while循环中,不断地通过getTask()方法从workerQueue中获取任务
  2. 如果线程池正在停止,则中断线程。否则调用
  3. 调用task.run()执行任务;
  4. 如果task为null则跳出循环,执行processWorkerExit()方法,销毁线程workers.remove(w);

这个流程图非常经典:


Worker 的执行

2.3 processWorkerExit 方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 这个变量用于表示是否进入过自旋。
    // 1. 如果没有进入过,该值为false
    // 2. 进入过,该值为true
    // 只有进入过自旋,worker的数量才需要减一
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    // 通过全局锁的方式移除worker
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 尝试终止线程池
    tryTerminate();

    int c = ctl.get();
    // 如果线程池状态为`SHUTDOWN`或`RUNNING`,
    // 则通过调用`addWorker()`来创建线程,辅助完成对阻塞队列中任务的处理。
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

2.4 Worker 是如何被启动的

在上一篇 ThreadPoolExecutor(二) —— 线程池源码分析 中,ThreadPoolExecutor 成功将 Worker 添加到集合中后,调用的是 Worker 中 thread 的 start 方法(t.start())。我们知道 Thread 的 start 方法是会启动一个内存中的线程单元,并执行 run 方法:

public
class Thread implements Runnable {
    //此处的 target 将会是 Worker 实例
    public Thread(ThreadGroup group, Runnable target, String name,
                  long stackSize) {
        init(group, target, name, stackSize);
    }

    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }
}

而在 Thread 执行 run 方法时,实际上调用的是它自身的 target 的run方法,此处的 target 就是与 Thread 绑定的 Worker 实例。我们再看一下两者绑定的过程。
在 Worker 初始化时,会通过 ThreadFactory 创建一个 Thread 实例:

   Worker(Runnable firstTask) {
       setState(-1); // inhibit interrupts until runWorker
       this.firstTask = firstTask;
       // 在创建 Thread 实例并赋值时,Worker 将自己作为参数传入线程工厂的方法内
       this.thread = getThreadFactory().newThread(this);
   }

此处 getThreadFactory 方法,返回的实际实例时 DefaultThreadFactory:

    /**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

此处执行了它的 newThread 方法,其中 Runnable 对象 r,就是 Worker 对象,在此处将 Worker 对象传入 Thread 的构造方法中,与 Thread 完成绑定。

所以,在线程池直接调用 Thread 的 start 方法,可以直接启动 Worker,执行 Worker 的 run 方法。

三、线程是如何在线程池中运作的

1、在线程池成功的 addWorker ,并且成功启动了 Worker 对应的 Thread以后,这个Thread就开始运作,运作的第一个任务,是 Worker 对象中的firstTask。
2、当 firstTask运作结束,会通过 getTask() 方法从队列中获取任务。在这里获取到的 task,无需与当前的 Thread 对象有什么绑定关系,只需要在当前 Thread 中执行这个 task 的 run 方法即可。
3、getTask 是从队列中获取 task 的核心逻辑,其中包含对线程数的判断以及是否允许核心线程数超时的判断。这些判断会影响从队列中获取task等待的时长,当然还有些比较细的内容需要额外的去说。
4、当当前的 Thread 从 getTask 方法中获取的 task 为空时,就说明这个线程已经没用了,就会消亡

三、有关 Worker 的一些疑问

3.1 为什么 Worker 要继承 AQS

3.2 为什么 Worker 要实现 Runnable

3.3 为什么一定要与 Thread 绑定?为何在 ThreadPoolExecutor 启动 Worker 执行任务要调用 Worker 的 Thread,而不是 Worker 本身呢?

3.4 为什么不能启动 Worker 方法的run,然后 Worker 中 run 的内容,是启动 Worker 对象持有的 Thread 对象的 start 方法呢?这样做也可以启动一条新的线程单元啊?

3.5 在 runWorker 方法中,只要当前 Worker 完成了所有任务,就跳出了 while 循环,并执行 finally 中的移除过程,那核心的线程也会被移除吗?

3.6 Worker为什么不使用ReentrantLock来实现呢?

四、总结

本篇文章介绍了 ThreadPoolExecutor 中真正去执行任务的对象 —— Worker,Worker 与 Thread 之间的关系,以及 ThreadPoolExecutor 是如何去启动 Worker 的。再加上个人愚见,如果有理解错误或者缺失的地方,还请下方留言,大家一起交流,一起学习,一起成长。

上一篇下一篇

猜你喜欢

热点阅读