《Java核心技术系列二》ThreadPoolExecutor

2020-03-08  本文已影响0人  逍遥无极

该系列统一使用java8的源码进行讲解

上一篇中对ThreadPoolExecutor的源码以及工作原理进行了讲解。今天来讲解一下在使用的过程中我们应该注意哪些问题

一. 参数冲突问题

在上一篇文章中,我们对ThreadPoolExecutor的构造函数的参数进行了讲解。其中

  1. 当corePoolSize==maximumPoolSize,且核心线程数不允许超时时,设置keepAliveTime与unit是没有意义的(因为没有需要超时的Worker)
  2. 当workQueue为无界队列时,其永远也放不满,这种情况maximumPoolSize就不会生效

二. Exectors 提供的三种线程池能不能用

2.1. 固定大小的线程池
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

看实现可知,线程数固定,且采用的是无界阻塞队列。当任务提交的速度快与Worker消耗任务的速度时,任务会在workQueue中不断挤压,最终撑垮整个进程的内存空间,造成OOM,因此线上谨慎使用

2.2 单个线程的线程池
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

与1相比都是固定大小的线程池,唯一不同的是该线程数为1,存在于1一样的问题。

2.3 Cached线程池
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

与固定大小的线程池正好相反,核心线程数为0,阻塞队列容量为1,有任务就开辟新的Worker执行,空闲时Worker会自动销毁。如果任务提交的任务非常的多,应用进程达到开辟线程的最大上限开辟不了线程抛出OOM,或者因为线程占用太多的内存空间,导致内存被耗尽抛出OOM(一个线程栈的默认大小为1m),或者因为线程太多,线程上下文切换的开销不可忽略,导致任务执行缓存从而拖垮整个应用进程。

综上:上面三种线程池都不推荐使用,而是应该由我们自己制定线程池参数,保证线程个数与任务队列的长度都在可控的范围内,而不是任由其无限制增长,在线上应用中,是不推荐存在这种不可控的情况出现的。资源都可控制了,那就延伸出另一种问题,达到了线程池的最大处理能力了怎么办?

三. 拒绝策略 RejectedExecutionHandler

当线程池达到自己最大处理能力时(任务队列满,每个Worker都在不停工作),再添加新任务时就会触发拒绝策略。JDK提供了4中拒绝策略

3.1. AbortPolicy 默认

直接拒绝,就是会抛出异常

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }

注意是谁抛出异常,是执行提交任务的线程抛出异常。如果提交任务的线程正在循环的处理一批数据,且没有对异常进行捕获处理,那么在执行拒绝策略抛出异常后,后面的任务都不会被执行。如果提交任务的线程是一个用户请求线程,且业务中没有捕获该异常进行处理,用户请求会因为该异常而请求失败。并且这是默认的拒绝策略,初学者很容易掉进这个坑

3.2 DiscardPolicy

直接丢弃,实现就是什么都不做

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}

针对不能对任务的场景是不能使用的

3.3 DiscardOldestPolicy

丢弃最老的一个任务

       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }

先从workQueue中弹出一个任务,然后再提交任务。同样会丢失数据,在不允许丢数据的场景不能使用

3.4 CallerRunsPolicy

调用者线程执行,谁提交任务,谁执行

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

通过直接调用任务run方法的方式自己执行,虽然会延长线程的执行时间,但是这样不会丢失数据

四. 线程池如何关闭

4.1 shutdown

将线程池的状态修改为SHUTDOWN状态,在该状态不允许提交新任务

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //修改线程池的状态
            advanceRunState(SHUTDOWN);
            //终端空闲的worker
            interruptIdleWorkers();
            //空方法
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

首先将线程池状态修改为shutdown状态,然后中断所有空闲的Worker。为什么这里需要中断线程呢,因为调用线程的interrupt方法,能够让阻塞在getTask中的take方法与poll方法上的Worker唤醒,唤醒之后,重新检查线程池的状态为shutdown状态,getTash方法就会返回null,从而Thread的run方法,线程销毁。最后调用tryTerminate尝试终止,只要有任务没有执行完成,有Worker在运行就不会转为terminate状态。看下tryTerimate的源码:

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // stop 状态 或者shutdown状态,且workQueue没有内容
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
           // stop状态 或者shutdown状态,worker都已经销毁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //将状态修改为tidying状态
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        //将状态修改为terminate状态
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
  1. 首先判断状态,如果是running, tidying,terminate或者shutdown状态且workQueue不为空,就直接返回。总结就是:只有状态为stop 或者 shutdown且workQueue为空了,才能往下走。
  2. 如果Worker还存在,则中断一个空闲的worker,然后返回。
  3. 将状态修改为tidying状态,接着修改为terminate状态,在ThreadPoolExecutor中由于terminated方法时空方法,所以这两个状态可以认为是一样的。

什么样的状态才能转化为tidying状态, 首先前提是Worker都关闭了,其次是 stop,或者 shutdown且队列中没有任务了

4.2 shutdownNow

不同于shutdown的是,这里将状态修改为stop状态

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

首先将线程池状态修改为stop状态,然后唤醒所有的worker,然后调用tryTerminate方法,该方法已经讲过了。shutdownNow方法会返回阻塞队列中所有的任务。这些任务都没有被执行。

首先我们了解了shutdown与shutdownNow两个方法一个将状态修改为shutdown,一个将状态修改为stop,一个没有返回值,一直返回了workQueue中的任务列表。为了能够更加清晰的认清这两个方法,我们还是有必要再看一次getTask方法对于shutdown状态与stop状态有什么不同的处理操作。

// Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

如果是shutdown状态,且任务队列中有任务,则继续执行。如果是stop状态,直接返回null,销毁worker。
总结:两个方法修改任务状态后都不允许再提交新的任务,shutdown方法将线程池的状态修改为shutdown状态,会等待所有已经提交的任务全部执行完成然后各个worker自动退出,shutdownNow不会执行完成所有的任务,但是会将没有执行完成的任务列表返回,其在getTask中会直接返回null也会引导各个worker自动退出。最终Worker全部销毁,整个线程池关闭

4.3 线程池都是事后自行退出,那是谁在修改线程池的状态到最终terminate状态?

这个问题的答案隐藏在Worker的run方法中,run方法调用了runWorker方法,在runWorker方法中,退出循环后会执行finnaly语句,在这里会调用processWorkerExit方法,而在这个方法中会调用tryTerminate方法,也就是说每有一个Worker执行完成,都会去执行下tryTerminate方法,尝试将线程池的状态修改为terminate状态。代码如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
         ... 省略
        tryTerminate();
       ... 省略
}

在我们调用shutdown或者shutdownNow方法后,可以调用awaitTerminate方法,等待线程池关闭。

总结

针对ThreadPoolExecutor的使用事项,就先总结到这,大家如果有好的意见也可以提出来,我们一起探讨。

上一篇下一篇

猜你喜欢

热点阅读