jdk提供的线程池ThreadPoolExecutor详解

2020-05-21  本文已影响0人  remax1

前言

线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。为解决资源分配问题,线程池采用了“池化思想”,即将所有线程统一管理。

构造方法

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

· corePoolSize:线程池中核心线程的个数。
· maximumPoolSize: 线程池中允许创建的最大的线程个数。
· keepAliveTime:保活时间。
· workQueue:阻塞队列,用来储存任务。
· RejectedExecutionHandler :拒绝策略,jdk提供四种,可以自定义拒绝策略。

任务调度

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       
        int c = ctl.get();
        //注释①
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))//启动新线程
                return;
            c = ctl.get();
        }
    //注释② 此时不满足小于corePoolSize条件
     if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)//注释③
                addWorker(null, false);//false代表的是与maximumPoolSize比较,true则与corePoolSize
        }
      //注释④ 此时,workQueue.offer(command)返回值为false
        else if (!addWorker(command, false)) 
            reject(command);//拒绝策略
    }

·注释①:如果workerCount(runable个数) < corePoolSize,则创建并启动一个线程来执行新提交的任务。
注释②:跳出if循环,此时corePoolSize>=workerCount ,且阻塞队列元素不满,添加任务到该阻塞队列中。
注释③:corePoolSize>=workerCount ,且阻塞队列元素满了,且corePoolSize<maximumPoolSize,新建线程并启动。
注释④:阻塞队列满了,并且workerCount > maximumPoolSize,此时根据拒绝策略来处理该任务,默认的是AbortPolicy(),直接抛异常。

任务缓冲

在上面execute()方法中,通过BlockQueue来完成任务的缓冲,这里的任务实际是个runable接口的对象。线程池中是以生产者消费者模式,通过阻塞队列来完成的。生产者则是往队列里添加元素的线程,消费者是从队列里拿元素的线程,即线程池中的线程。这与阻塞队列的特性有关,在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。

任务申请

在上面的excute()方法中,我们可以得知,任务执行有两种可能:
1.由新创建的线程来执行,即addWork(Runable,true);
2.线程从任务队列中获取任务来执行,即addWorker(null, false),占大多数情况;
接下来去看看源码,任务申请主要通过getTask()来完成对runable的取出。

 private Runnable getTask() {
        boolean timedOut = false; //最后一个任务是否超时

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 判断是否运行状态或者阻塞队列为空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 判断是否所有的工作线程被回收
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                  //开始取出任务并返回
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

任务拒绝

再看excute()中, reject(command)则是我们的拒绝策略,,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
拒绝策略实质上是一个接口:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

可以通过实现这个接口来自定义拒绝策略,也可以通过JDK提供的四种策略:
1.ThreadPoolExecutor.AbortPolicy():丢弃任务并抛出java.util.concurrent.RejectedExecutionException异常。
2.ThreadPoolExecutor.CallerRunsPolicy:由提交任务的线程取处理。这种情况是需要让所有任务都执行完毕。
3.ThreadPoolExecutor.DiscardPolicy:丢弃任务,不报出异常。
4.ThreadPoolExecutor.DiscardOldPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。

Worker线程管理

先看看Worker类:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /*初始化任务,可以为空 */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * 注释①
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

       
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //注释②
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

注释①:Worker这个工作线程,实现了runable接口,并持有一个线程thread,一个初始化任务
firstTask,即第一个任务,可以为空。如果不为空,线程启动时则会执行这个任务,应对线程池初期的情况。
注释②:Worker通过继承AQS,使用AQS来实现独占锁的功能。AQS不可重入。
1.lock()方法一旦获取了锁,表示当前线程正在执行任务。
2.如果正在执行任务,则不应该中断线程。
3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。

线程增加

再前面的excute()方法中出现的addWorker()则是对线程的增加。去看看里面做了啥事

 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 一些判断
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                  //注释①
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //注释②
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

注释①:core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。
注释②:设置可重入锁,完成worker线程的增加,添加成功后会启动这个工作线程。

Worker线程回收

 private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          //记录一下完成任务的个数
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
}

线程池中线程的销毁依赖JVM自动的回收,线程池自己决定那些线程需要回收,消除一下引用即可。

Worker线程执行任务

在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:
1.while()循环不断取出runable任务,通过getTask()取出。
2.执行任务
3.如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。

实际应用

在应用场景中主要分为IO密集型和CPU密集型。最主要还是设计核心线程个数和最大线程个数
的值。

上一篇下一篇

猜你喜欢

热点阅读