Android进阶之路Android开发经验谈Android技术知识

深入浅出Java(Android )线程池ThreadPoolE

2020-03-30  本文已影响0人  唠嗑008

前言

关于线程池
在Java/Android开发中,设计到并发的请求,那基本上是离不开线程池了。用线程池的好处:

要不要深度学习线程池

如何深度学习线程池
这也是我们今天的重点,本文将从下面几点带大家快速掌握线程池的要点:

预览

线程池类图.png

先对线程池的部分核心类/接口做个简介,大家有个印象就好。
Executor接口

public interface Executor {

    /**
     * 就一个方法,用来执行线程任务的,类似于Thread的start()方法
     */
    void execute(Runnable command);
}

由于Executor是一个接口,所以execute是由具体的实现类来完成的,调用这个方法,可能会出现如下情况:

ExecutorService接口
继承自Executor接口,我们常用的很多方法就是在这个接口中定义的。主要涉及到:提交任务关闭线程获取结果


public interface ExecutorService extends Executor {

    /**
     * 关闭线程池,新提交的任务会被拒绝,但是已经提交的任务会继续执行
     */
    void shutdown();

    /**
     * 关闭线程池,新提交的任务会被拒绝,并且尝试关闭正在执行的任务
     */
    List<Runnable> shutdownNow();

    /**
     * 线程池是否已关闭
    */
    boolean isShutdown();

    /**
     * 如果调用了shutdown或者shutdownNow之后,所有的任务都结束了,那么返回true,否则返回false
     */
    boolean isTerminated();

    /**
     * 当调用shutdown 或 shutdownNow之后,再调用这个方法,会
     *等待所有的任务执行完成,直到超时(超过timeout)或者说当前的线程被中断了
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;


    /**
     * 提交一个Runnable 任务
     */
    Future<?> submit(Runnable task);

    /**
     * 执行所有任务,返回 Future 类型的一个 list
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
}

AbstractExecutorService
抽象类,实现了ExecutorService接口。主要封装了通过submit方式提交任务的一些操作。

注意: 不需要获取结果,可以用 execute 方法;需要获取结果(FutureTask)用 submit 方法。

由于篇幅有限,本文只针对execute方式做讲解,想了解submit 方式的同学可以参考深度解读 java 线程池设计思想及源码实现

Executors
这是大多数人最常用的一个类,实质上就是一个工具类。可以快速的构建一个线程池对象,常见的操作有如下:

   /**
     * 创建一个固定大小的线程池,而且全是核心线程,
     * 会一直存活,除非特别设置了核心线程的超时时间
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

   /**
     * 创建了一个没有大小限制的线程池,全是非核心线程;如果线程
     * 空闲的时间超过60s就会被移除
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

   /**
     * 这个线程池只有1个唯一的核心线程
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

   /**
     * 创建一个定长的线程池,可以执行周期性的任务
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

可以看出这几种方式最后都是通过ThreadPoolExecutor来实现的,所以下面就来研究一下今天的主角ThreadPoolExecutor,等理解了这个类,也就可以掌握线程池等工作原理,甚至可以根据自己的策略来自定义线程池。

ThreadPoolExecutor(关键类)

继承与抽象方法AbstractExecutorService,也就间接实现了ExecutorServiceExecutor等接口。

从构造方法谈起

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

一些重要属性和方法

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

这里很关键,一定要认真看,后面分析任务执行execute()方法就需要用到这些基础。

Integer.SIZE =32,这代表了java中,int最大是32位,所以COUNT_BITS等于29;CAPACITY等于1*2^29 -1,用它来表示线程池的最大容量是足够了的。

从线程池的生命周期来看,线程池有5种状态:

关于状态转换

线程状态转换.png

关于状态转换就讲完了,特别是前2个状态转换,更是常用。还有一个关键的属性ctl需要讲一下,初学者可能不太好理解,需要一点计算机基础。

首先ctl是一个AtomicInteger类型的对象,它其实是对int的包装,可以在多线程并发的情况下保证原子性,它传入的参数就是它表示的值。这里是通过ctlOf()方法来计算的。

在计算之前先补充2个小知识点:

1、 <<:是移位运算符,具体俩说是左移;右移用>>表示。左移的意思是将一个二进制数向左边移动1位,那么左移1位就等于这个数2,左移n位的话就是2^n;右移的话就是除以;

2、 由于10进制数有正负之分,所以转换成二进制数的时候,需要在最高位加上0/1来表示正负,正数用0表示;负数用1来表示。

3、原码:加上符号为之后的二进制数;反码:正数的反码是其本身,负数的反码:符号位不变,其余各位取反;补码:正数的补码就是其本身;负数的补码:即在反码的基础上+1。

4、针对二进制数的&、|、~。与(&)运算:相同位0,不同位1;或(|)运算:只要有1个为1就是1,否则为0;非(~)运算:取反,1变0,0变1。

首先RUNNING = -1 << COUNT_BITS;其中-1的二进制数1001,那转换成补码就是1111,然后左移29位就变成1110 0000 0000 0000 0000 0000 0000 0000,因为int最多32位,所以高位的1没了;然后再和0做或运算,所以结果还是它本身,所以ctl初始值为1110 0000 0000 0000 0000 0000 0000 0000,其中高3位存放线程状态,后面29位存放线程数量。

还有几个方法

  • runStateOf:获取运行状态;
  • workerCountOf:取出ctl低29位,来表示当前线程数;
  • ctlOf:获取运行状态和活动线程数的值。

execute(关键方法)

这是线程池的关键方法,用来提交任务的。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //表示 “线程池状态” 和 “线程数量” 的整数
        int c = ctl.get();
        /*
         * 如果当前活跃线程数小于核心线程数,就会添加一个worker来执行任务;
         * 具体来说,新建一个核心线程放入线程池中,并把任务添加到该线程中。
         */
        if (workerCountOf(c) < corePoolSize) {
            /*
             * addWorker()如果返回true表示添加成功,线程池会执行这个任务,那么本方法可以结束了,返回 false 代表线程池不允许提交任务,那么就会执行后面的方法。
             */
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

      //程序执行到这里,说明要么活跃线程数大于核心线程数;要么addWorker()失败

        /*
         * 如果当前线程池是运行状态,会把任务添加到队列
         */
        if (isRunning(c) && workQueue.offer(command)) {
            /*
            *这里的逻辑比较有意思,又重新检查了线程状态和数量;
            *如果线程不处于 RUNNING 状态,就会移除刚才添加到队列中的任务;
            *如果线程池还是 RUNNING 状态,并且线程数为 0,那么开启新的线程;
            * addWorker(null, false)参数分析:
            * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
            * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
         
            /
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

 //程序执行到这里,说明要么线程状态不是RUNNING;要么workQueue队列已经满了

         /*
          * 这时,再调用addWorker方法去创建线程,
          * 会把线程池的线程 数量的上限设置为maximum;
          * 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
          */
        else if (!addWorker(command, false))
            reject(command);
    }

为什么当任务添加到队列后,内部还执行了那么复杂的判断?

因为担心任务提交到队列中了,但是线程池却关闭了。

当执行execute方法提交一个任务的时候,如果线程池一直处于RUNNING状态,那流程如下:

注意:
addWorker(null, false);也是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,当worker在执行的时候,会直接从workQueue中获取任务。在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。

关于Worker类和addWorker方法

addWorker()是尝试在线程池中创建一个线程并执行任务,firstTask表示作为新创建的线程的第一个任务,core参数为true的时候,会用核心线程数做创建线程的边界;如果为false,会用最大线程数maximumPoolSize做为边界。如果addWorker()返回true,表示创建线程成功

private boolean addWorker(Runnable firstTask, boolean core) {}
任务执行流程.png

业务场景(分析2种常用的线程池)

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

固定大小的线程池。最大线程数与核心线程数相等,keepAliveTime的设置无效,因为核心线程默认不会销毁,阻塞队列为LinkedBlockingQueue,它是无界队列。

工作流程:

虽然线程数量是固定的,但是由于使用了无界队列LinkedBlockingQueue,如果线程的并发量比较大,任务的执行时间比较长,那还是可能会OOM的。适用于CPU密集型的任务,也就是那种长期的任务。

newCachedThreadPool

   public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

核心线程数为 0,最大线程数为 Integer.MAX_VALUE,所有线程空闲时间 为 60 秒,任务队列采用 SynchronousQueue。

用它去处理那种并发量很大的任务就不合适,由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。适用于那种任务可以快速完成的任务。

总结

线程池的内容其实是很多的,绝不是1,2篇文章就能讲完的。本文也主要是针对提交任务之后线程池的工作原理以及线程状态变化来做讲解。核心要义如下:
1、Executors这个工具类下创建的几种线程池的工作原理。

newFixedThreadPool、newCachedThreadPool等,需要注意每一个的优缺点和使用场景。

2、ThreadPoolExecutor这个类的构造方法和一些关键成员属性

Executors创建的多种线程池都是通过它的构造方法来实现的,读者需要熟悉它的参数的意义,这样的话,就可以自定义满足个性化需求的线程池。在文中列举出的一些成员属性也很重要,后面对线程池的各种操作离不开它们。

3、理解深刻线程池中的线程创建时机
主要是那个execute()方法和addWorker()方法,主要是根据线程池状态、当前线程数、核心线程数、队列大小、线程池最大线程数来结合来判断。

4、拒绝策略

添加任务到线程池,不一定会被接受。主要看一下哪些情况会执行reject(command)方法;还有几种不同的拒绝策略,默认是抛异常。

5、异常处理

如果某个任务执行出现异常,那么执行任务的线程会被关闭。

感谢以下作者

优雅的使用Java线程池
深入理解 Java 线程池:ThreadPoolExecutor
面试必备:Java线程池解析

上一篇 下一篇

猜你喜欢

热点阅读