ThreadPoolExecutor线程池原理分析

2021-01-12  本文已影响0人  feifei_fly

ThreadPoolExecutor 我们在开发过程中经常用到,它的主要作用就是提前创建好若干个线程放在一个容器中。如果有任务需要处理,则将任务直接分配给线程池中的线程来执行就行,任务处理完以后这个线程不会被销毁,而是等待后续分配任务。

那它是如何实现线程复用的?今天我们就回答这个问题。

一、ThreadPoolExecutor的基本用法

1.1、TheadPooolExecutor的构造函数

public ThreadPoolExecutor(int corePoolSize,         //核心线程数量
   int maximumPoolSize, //最大线程数
   long keepAliveTime, //超时时间,超出核心线程数量以外的线程空余存活时间
   TimeUnit unit, //存活时间单位
   BlockingQueue<Runnable> workQueue, //保存执行任务的队列
  ThreadFactory threadFactory,//创建新线程使用的工厂
  RejectedExecutionHandler handler //当任务无法执行的时候的处理方式)

创建线程池

  var threadPool = ThreadPoolExecutor(1,2,60,TimeUnit.SECONDS,LinkedBlockingDeque<Runnable>())

执行线程任务

  threadPool.execute(Runnable { 
            
        })

ThreadPoolExecutor的主要逻辑是:

1.2、常用线程池

newFixedThreadPool

newFixedThreadPool 的核心线程数和最大线程数都是指定值,当线程池中的线程数超过核心线程数后,任务都会被放到阻塞队列中。这里选用的阻塞队列是LinkedBlockingQueue,使用的是默认容量 Integer.MAX_VALUE,相当于没有上限。

public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
}

用途:FixedThreadPool 用于负载比较大的服务器,为了资源的合理利用,需要限制当前线程数量。
缺点:任务队列没有上限,一直追加可能造成OOM

newCachedThreadPool

newCachedThreadPool 创建一个可缓存线程池。核心线程数为0,最大线程数为Integer.MAX_VALUE。如果线程池长度超过处理需要,可灵活回收空闲线程。
接收到新任务将被立即执行:若有空闲线程,则放到空闲线程执行;若无空闲线程,则创建新的非核心线程来执行。
创建的线程空闲60s 则会被回收。

public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 60L, TimeUnit.SECONDS,
 new SynchronousQueue<Runnable>());
}

缺点:线程可以无限创建,当接收大量任务时 可能创建大量的线程,给JVM过大的负担。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行

二、线程池实现原理

ThreadPoolExecutor的核心属性和方法

public class ThreadPoolExecutor extends AbstractExecutorService {
    
        //线程池任务队列
        private final BlockingQueue<Runnable> workQueue;
        //线程池工作线程
        private final HashSet<Worker> workers = new HashSet<>();

        //线程池状态
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;

        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }

        public void execute(Runnable command) {
        
        }
        
        public void shutdown() {}
          
        public List<Runnable> shutdownNow() {}
}

2.1、workQueue;

workQueue 是线程池的任务队列,里面缓存待执行的Runnable任务

2.2、workers

workers 是当前正在执行的工作线程集合。

2.3、ctl 线程池状态

ctl是一个原子类,主要作用是用来保存线程数量和线程池的状态

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

一个 int 数值是 32 个 bit 位,这里采用高 3 位来保存运行状态,低 29 位来保存线程数量。

private static int ctlOf(int rs, int wc) { return rs | wc; }
private static final int COUNT_BITS = Integer.SIZE - 3; //32-3
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //将 1 的二进制向右位移 29 位,再减 1 表示最大线程容量
//运行状态保存在 int 值的高 3 位 (所有数值左移 29 位)
private static final int RUNNING = -1 << COUNT_BITS;// 接收新任务,并执行队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接收新任务,但是执行队列中的任务
private static final int STOP = 1 << COUNT_BITS;// 不接收新任务,不执行队列中的任务,中断正在执行中的任务
private static final int TIDYING = 2 << COUNT_BITS; //所有的任务都已结束,线程数量为 0,处于该状态的线程池即将调用 terminated()方法
private static final int TERMINATED = 3 << COUNT_BITS;// terminated()方法执行完成
状态转换

2.4、shundown() 和showdonwNow()

2.5、execute() 执行Runnable任务

 public void execute(Runnable command) {
   

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { //1、工作线程数 小于核心线程个数,则尝试直接创建一个新的核心线程来执行任务。
 
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        if (isRunning(c) && workQueue.offer(command)) { //2、核心线程数已满,任务队列未满,则将任务添加到任务队列
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) //线程状态二次检查,如果当前线程池关闭了,则移除该任务
                reject(command);
            else if (workerCountOf(recheck) == 0) //线程状态二次检查,如果当前工作线程数为0,则新建非核心线程来执行剩余任务
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) //3、任务队列已慢,则尝试创建一个新的非核心线程 处理该任务。
            reject(command);
    }

2.5.1、addWorker

如果工作线程数小于核心线程数的话,会调用 addWorker,创建一个工作线程。
大致做了两件事情:

1、状态检查,不允许创建线程的情况,直接返回false; 条件符合,则工作线程计数+1
2、创建Worker实例,添加到workers集合,并开启线程
 private boolean addWorker(Runnable firstTask, boolean core) {
    retry: //goto 语句,避免死循环

   //状态检查,不允许创建线程的情况,直接返回false;  workerCount计数+1。
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        //(1)如果线程处于非运行状态,不允许新创建线程,直接返回false
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                        firstTask == null &&
                        ! workQueue.isEmpty()))
            return false;
        for (;;) { //自旋
            int wc = workerCountOf(c);//获得 Worker 工作线程数
            //(2)如果工作线程数大于默认容量大小或者大于核心线程数大小,则直接返回 false 表示不能再添加 worker,返回false
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))//(3)通过 cas工作线程数加1
                break retry;
            c = ctl.get(); // Re-read ctl 
            if (runStateOf(c) != rs) //这里如果不相等,说明线程的状态发生了变化,继续重试
                continue retry;
     
        }
    }

    //创建Worker实例,添加到workers集合,并开启线程
    boolean workerStarted = false; 
    boolean workerAdded = false; 
    Worker w = null;
    try {
        w = new Worker(firstTask); //构建一个 Worker,传入了一个 Runnable 对象
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock(); 
            try {
           
                int rs = runStateOf(ctl.get());
                //将新创建的Workder 添加到workers工作线程集合当中。
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                  
                    workers.add(w); //(4)将新创建的 Worker 添加到 workers 集合中
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s; 
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock(); 
            }
            if (workerAdded) {
                t.start();//(5)如果 worker 添加成功,启动线程
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w); //如果添加失败,就需要做一件事,就是递减实际工作线程数(还记得我们最开始的时候增加了工作线程数吗)
    }
    return workerStarted;//返回结果
}
Worker 是什么?

Worker代表一个工作线程,内部持有一个thread和firstTask。

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        final Thread thread;
        Runnable firstTask;
    
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }

    }



runWorker 方法

runWorker 是工作线程 执行任务的主要场所。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock();
        boolean completedAbruptly = true;

        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
        
               ...
                try {
                    beforeExecute(wt, task);//任务执行前回调
                    Throwable thrown = null;
                    try {
                        task.run(); //执行具体的任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//任务执行后回调
                    }
                } finally {
                    task = null; //task设置成null,保证下次进入while循环执行getTask() 
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);//线程执行完成,将当前worker 从wokers集合移除
        }
    }

整体看就是一个无限循环:

从队列提取任务->执行任务->队列提取任务->执行任务

当getTask() 返回空任务时,循环结束,该工作线程也就结束了。

getTask

线程池之所以可以复用线程,关键点就在getTask。

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {//自旋
            int c = ctl.get();
            int rs = runStateOf(c);
            
            //(1)对线程池状态的判断:
            //线程池处于SHUTDOWN状态 且任务队列为空, 或者 线程池处于STOP状态,应该销毁当前线程,此时返回null。

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;//返回 null,则当前 worker 线程会退出
            }
            int wc = workerCountOf(c);
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // timed 变量用于判断是否需要进行超时控制。
            // allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时;
            // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
            // 对于超过核心线程数量的这些线程,需要进行超时控制
       

            //(2) 工作队列中有未执行的任务 且工作线程数量超过了maximumPoolSize 
           //或者上一次取任务已经超时,则 返回null,当前线程退出。

            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                /*根据 timed 来判断,如果为 true,则通过阻塞队列 poll 方法进行超时控制,如果在
                keepaliveTime 时间内没有获取到任务,则返回 null.
                否则通过 take 方法阻塞式获取队列中的任务*/

                //(3) timed = true,表示需要进行超时控制,采用workQueue.poll() 从队列中取任务,会阻塞keepAliveTime时间,超时后 timedOut = true,下次循环会直接返回null。 - 线程被回收。
                      timed = false,表示不需要进行超时控制,采用workQueue.take(),该线程会一直阻塞,直到有新的任务。 - 线程不被回收
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)//如果拿到的任务不为空,则直接返回给 worker 进行处理
                    return r;
                timedOut = true;//如果 r==null,说明已经超时了,设置 timedOut=true,在下次自旋的时候进行回收
            } catch (InterruptedException retry) {
                timedOut = false;// 如果获取任务时当前线程发生了中断,则设置 timedOut 为false 并返回循环重试
            }
        }
    }



关键代码如下;

  boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();

默认 allowCoreThreadTimeOut为false

至此 ThreadPoolExecutor 可以服用线程的原理也就清除了。

processWorkerExit

runWorker 的 while 循环执行完毕以后,在 finally 中会调用 processWorkerExit,将当前线程从worker中移除,并执行tryTerminate()

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

2.6、拒绝策略

当工作线程数超过corePoolSize,且工作队列已满,且线程总数也达到了maximumPoolSize(非核心线程也无法创建),此时就会拒绝新任务:

final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;

三、其他

3.1、如何取消任务

ThreadPoolExecutor.execute() 提交一个任务是无法取消该任务的。

那提交了一个任务之后,想取消执行该任务 有没有途径呢?

答案是submit()方法

   public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

submit方法返回一个Future对象

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future.cancel(true) 可以取消Runnable任务。

  • Attempts to cancel execution of this task. This attempt will
    • fail if the task has already completed, has already been cancelled,
    • or could not be cancelled for some other reason. If successful,
    • and this task has not started when {@code cancel} is called,
    • this task should never run. If the task has already started,
    • then the {@code mayInterruptIfRunning} parameter determines
    • whether the thread executing this task should be interrupted in
    • an attempt to stop the task.

测试代码


fun testThreadPool(){
        var threadPool = ThreadPoolExecutor(1,5,60,
            TimeUnit.SECONDS,
            LinkedBlockingDeque<Runnable>()
        );

        var fu =  threadPool.submit(MyRunaable())

        Thread.sleep(10000)
        fu.cancel(true)
        Log.d("feifei","fu cancel")
    }
    
  class MyRunaable():Runnable{
        var index = 0;
        override fun run() {

            while(true){
                Thread.sleep(1000)
                Log.d("feifei","MyRunaable run():${index++}")
            }
        }
    }

输出结果:

2021-01-12 15:12:29.925 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():0
2021-01-12 15:12:30.925 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():1
2021-01-12 15:12:31.926 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():2
2021-01-12 15:12:32.926 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():3
2021-01-12 15:12:33.927 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():4
2021-01-12 15:12:34.928 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():5
2021-01-12 15:12:35.928 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():6
2021-01-12 15:12:36.929 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():7
2021-01-12 15:12:37.929 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():8
2021-01-12 15:12:38.925 6847-6847/com.sogou.iot.myapplication D/feifei: fu cancel

3.2、线程池的监控

如果在项目中大规模的使用了线程池,那么必须要有一套监控体系,来指导当前线程池的状态,当出现问题的时候可以快速定位到问题。而线程池提供了相应的扩展方法,我们通过重写线程池的 beforeExecute、afterExecute 和 shutdown 等方式就可以实现对线程的监控,简单给大家演示一个案例

3.3、使用线程池注意事项

3.3.1、 使用ThreadPoolExecutor而非Executors创建线程池

用 Executors创建线程池 使得用户不需要关心线程池的参数配置,意味着大家对于线程池的运行规则也会慢慢的忽略。这会导致一个问题。
比如我们用 newFixdThreadPool 或者 singleThreadPool.允许的队列长度为Integer.MAX_VALUE,如果使用不当会导致大量请求堆积到队列中导致 OOM 的风险而 newCachedThreadPool,允许创建线程数量为 Integer.MAX_VALUE,也可能会导致大量线程的创建出现 CPU 使用过高或者 OOM 的问题。

3.3.2、合理设置线程池大小。

需要分析线程池执行的任务的特性: CPU 密集型还是 IO 密集型

四、参考文章

https://www.jianshu.com/p/a977ab6704d7

https://www.jianshu.com/p/7b2da1d94b42

上一篇下一篇

猜你喜欢

热点阅读