Android 线程池ThreadPoolExecutor 的使

2023-01-17  本文已影响0人  因为我的心

一、前言:

// 传统开启线程方式
Thread(Runnable {
     //to do异步请求

}).start()

1、使用new Thread()创建线程存在的问题

  • 1、如果在一个list每一个item都创建一个Thread,list量大的话会大量创建Thread,导致内存抖动,GC频繁的回收。要知道,GC的回收是在主线程的,这样会导致卡顿。
  • 2、线程过多,导致各个线程竞争抢夺CPU执行权,线程的频繁切换导致效率的降低。
  • 3、ListView的每一个item滑出窗口,线程无法停止也无法控制。

2、使用线程池的好处

  • 1、重用已经创建的好的线程,避免频繁创建进而导致的频繁GC
  • 2、控制线程并发数,合理使用系统资源,提高应用性能
  • 3、可以有效的控制线程的执行,比如定时执行,取消执行等

二、ThreadPoolExecutor线程池的使用:

1、创建线程池

 //创建线程池
  var  threadPoolExecutor = ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAXIMUM_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                ArrayBlockingQueue<Runnable>(QUEUE_SIZE),
                Executors.defaultThreadFactory(),
                RejectedExecutionHandler { _, _ ->
                    Log.d("ThreadPoolManager","$ThreadPoolManager  RejectedExecutionHandler----")
                }
            )

2、创建线程池 ThreadPoolExecutor 7个参数

3、 线程池 ThreadPoolExecutor 的方法

4、 线程池封装类

/***
 *  Created by lyy on 2023/01/17.
 * 线程池封装类
 *
 *   使用:ThreadPoolManager.getInstance().addTask("TAG", runnable)
 * */
class ThreadPoolManager private constructor() {

    private var threadPoolMap = hashMapOf<String, ThreadPoolExecutor>()

    /**
     * cpu数量
     * */
    private val CPU_COUNT = Runtime.getRuntime().availableProcessors()

    /**
     * 核心线程数为手机CPU数量+1
     * */
    private val CORE_POOL_SIZE = CPU_COUNT + 1

    /**
     * 最大线程数为手机CPU数量×2+1
     * */
    private val MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1

    /**
     * 线程活跃时间 秒,超时线程会被回收
     * */
    private val KEEP_ALIVE_TIME: Long = 60

    /**
     * 等待队列大小
     * */
    private val QUEUE_SIZE = 128

    companion object {
        fun getInstance() = SingleHolder.SINGLE_HOLDER
    }

    object SingleHolder {
        val SINGLE_HOLDER = ThreadPoolManager()
    }


    /**
     *   @param tag 针对每个TAG 获取对应的线程池
     *   @param corePoolSize  线程池中核心线程的数量
     *   @param maximumPoolSize  线程池中最大线程数量
     *   @param keepAliveTime 非核心线程的超时时长,
     *   当系统中非核心线程闲置时间超过keepAliveTime之后,则会被回收
     *   如果ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,
    则该参数也作用于核心线程的超时时长
     *   @param unit 第三个参数的单位,有纳秒、微秒、毫秒、秒、分、时、天等
     *   @param queueSize 等待队列的长度 一般128 (参考 AsyncTask)
     *   workQueue 线程池中的任务队列,
    该队列主要用来存储已经被提交但是尚未执行的任务。
    存储在这里的任务是由ThreadPoolExecutor的execute方法提交来的。
     *   threadFactory  为线程池提供创建新线程的功能,这个我们一般使用默认即可
     *
     *   1.ArrayBlockingQueue:这个表示一个规定了大小的BlockingQueue,ArrayBlockingQueue的构造函数接受一个int类型的数据,
     *              该数据表示BlockingQueue的大小,存储在ArrayBlockingQueue中的元素按照FIFO(先进先出)的方式来进行存取。
     *   2.LinkedBlockingQueue:这个表示一个大小不确定的BlockingQueue,在LinkedBlockingQueue的构造方法中可以传
     *          一个int类型的数据,这样创建出来的LinkedBlockingQueue是有大小的,也可以不传,不传的话,
     *          LinkedBlockingQueue的大小就为Integer.MAX_VALUE
     * */
    private fun getThreadPool(tag: String): ThreadPoolExecutor {
        var threadPoolExecutor = threadPoolMap[tag]
        if (threadPoolExecutor == null) {
            threadPoolExecutor = ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAXIMUM_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                ArrayBlockingQueue<Runnable>(QUEUE_SIZE),
                Executors.defaultThreadFactory(),
                RejectedExecutionHandler { _, _ ->
                    Log.d("ThreadPoolManager", "$ThreadPoolManager  RejectedExecutionHandler----")
                }
            )
            //允许核心线程闲置超时时被回收
            threadPoolExecutor.allowCoreThreadTimeOut(true)
            threadPoolMap[tag] = threadPoolExecutor
        }
        return threadPoolExecutor
    }

    /**
     *  @param tag 针对每个TAG 获取对应的线程池
     *  @param runnable 对应的 runnable 任务
     * */
    fun removeTask(tag: String, runnable: Runnable) {
        getThreadPool(tag)?.queue?.remove(runnable)
    }

    /**
     *  @param tag 针对每个TAG 获取对应的线程池
     *  @param runnable 对应的 runnable 任务
     * */
    fun addTask(tag: String, runnable: Runnable) {
        getThreadPool(tag).execute(runnable)
    }

    /**
     *   @param tag 针对每个TAG 获取对应的线程池
     *   取消 移除线程池
     * */

    //shutDown():关闭线程池后不影响已经提交的任务
    //shutDownNow():关闭线程池后会尝试去终止正在执行任务的线程
    fun exitThreadPool(tag: String) {
        var threadPoolExecutor = threadPoolMap[tag]
        if (threadPoolExecutor != null) {
            threadPoolExecutor.shutdownNow()
            threadPoolMap.remove(tag)
        }
    }
}

5、使用:

// 添加线程任务
 ThreadPoolManager.Companion.getInstance().addTask("google", new Runnable() {
           @Override
           public void run() {
               //执行子线程任务...
               //Thread.currentThread().getName().equals("main") //判断是否是主线程
           }
       });

这个要先说明一点,系统的API ThreadPoolExecutor 并没有提供接口移除任务,removeTask()方法里面是通过ThreadPoolExecutor 获取其对应的队列 BlockingQueue,通过队列 移除 线程任务 Runnable。因为这个BlockingQueue 就是我们创建 ThreadPoolExecutor 实例时通过构造方法 传进去的 ArrayBlockingQueue ,我们后续通过线程池添加任务,都会放进这个队列的。线程池执行任务,都会从这个队列里面取出来。

6、场景一 核心线程数大于 任务数

我们把 核心线程数 CORE_POOL_SIZE 设置成20 ,最大线程数 MAXIMUM_POOL_SIZE 设置成30,等待队列维持128

然后添加10个任务。很明显,任务都是一下子执行完。

// 创建十个任务
        for (i in 1..10) {
            val runnable = Runnable {
                val sdf = SimpleDateFormat("yyyy-MM-dd HH:mm:ss:ms")
                val time = sdf.format(Date(java.lang.Long.parseLong(System.currentTimeMillis().toString())))
                Logger.d("ThreadPoolManager  Runnable ---- $i time: $time" )
            }
            ThreadPoolManager.getInstance().addTask("TAG", runnable)
        }

看下log 而且是按顺序执行的,看下时间,几乎在同一时间执行的

图片.png

7、场景二 核心线程数 小于 任务数

我们把 核心线程数 CORE_POOL_SIZE 设置成3 ,最大线程数 MAXIMUM_POOL_SIZE 设置成5,等待队列维持128。为模拟线程执行的耗时操作,每个线程执行完功能后睡眠5秒。

// 创建十个任务
        for (i in 1..10) {
            val runnable = Runnable {
                val sdf = SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒")
                val time = sdf.format(Date(java.lang.Long.parseLong(System.currentTimeMillis().toString())))
                Logger.d("ThreadPoolManager  Runnable ---- 任务$i   执行时间是: $time")
                //模拟耗时操作,睡眠5秒
                Thread.sleep(5000)
            }
            ThreadPoolManager.getInstance().addTask("TAG", runnable)
        }

看打印的结果:前3(1-3)个先执行,过5秒再执行3(4-6)个,再过5秒再执行三个(7-9),最后执行第10个

图片.png

分析线程池的流程:

  • 1、因为任务数是10,最大线程数是5,核心线程只有3,等待队列维持128.
  • 2、所以当向任务队列添加前3个任务时,都会先创建3个核心线程出来执行前面三个任务.所以任务1,2,3会最先执行。
  • 3、当向线程池添加到第4个任务时,因为核心线程已经达到上限了,但任务队列里面(最大128)还没填满 ,所以不会开启新的非核心线程去执行任务,而是把任务放进等待队列里面等待,等有空闲的核心线程再去执行等待队列的任务。
  • 4、所以都是3个任务3个任务的执行。

注意:特别重要-至于其他场景,是遵循以下规律的:

  • 1、execute一个线程之后,如果线程池中的线程数未达到核心线程数,则会立马启用一个核心线程去执行
  • 2、execute一个线程之后,如果线程池中的线程数已经达到核心线程数,且workQueue未满,则将新线程放入workQueue中等待执行
  • 3、execute一个线程之后,如果线程池中的线程数已经达到核心线程数但未超过非核心线程数,且workQueue已满,则开启一个非核心线程来执行任务
  • 4、execute一个线程之后,如果线程池中的线程数已经超过非核心线程数,则拒绝执行该任务

8、场景三 移除已经提交的任务

这个场景是,我提交了一个任务到线程池,但是在未执行时想取消这个任务。这个场景在RecycleView的每个item快速滑动时很常见。item进入窗口时,要执行一个异步任务,但还没执行到,这个item就快速滑出屏幕。这个时候,就可以通过线程池的队列里把这个任务移除掉。废话少说,上模拟代码。

我们把 核心线程数 CORE_POOL_SIZE 设置成3 ,最大线程数 MAXIMUM_POOL_SIZE 设置成5,等待队列维持128。

var runnable6: Runnable? = null
        var runnable7: Runnable? = null
        var runnable8: Runnable? = null
        var runnable9: Runnable? = null
        for (i in 1..10) {
            val runnable = Runnable {
                val sdf = SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒")
                val time = sdf.format(Date(java.lang.Long.parseLong(System.currentTimeMillis().toString())))
                Logger.d("ThreadPoolManager  Runnable ---- 任务$i   执行时间是: $time")
                //模拟耗时操作,睡眠5秒
                Thread.sleep(5000)
            }
            ThreadPoolManager.getInstance().addTask("TAG", runnable)
            if (i == 6) {
                runnable6 = runnable
            }
            if (i == 7) {
                runnable7 = runnable
            }
            if (i == 8) {
                runnable8 = runnable
            }
            if (i == 9) {
                runnable9 = runnable
            }
        }
        //添加后,立马移除
        ThreadPoolManager.getInstance().removeTask("TAG", runnable6!!)
        ThreadPoolManager.getInstance().removeTask("TAG", runnable7!!)
        ThreadPoolManager.getInstance().removeTask("TAG", runnable8!!)
        ThreadPoolManager.getInstance().removeTask("TAG", runnable9!!)

运行结果:


图片.png

分析线程池的流程:

  • 1、因为任务数是10,最大线程数是5,核心线程只有3,等待队列维持128.
  • 2、所以当向任务队列添加前3个任务时,都会先创建3个核心线程出来执行前面三个任务.所以任务1,2,3会最先执行。
  • 3、当向线程池添加到第4-10个任务时,把任务放进等待队列里面等待。
  • 4、所以通过当前线程池,获取对应队列 queue,然后remove掉这个任务。

这个removeTask()就是这么实现的:

/**
     *  @param tag 针对每个TAG 获取对应的线程池
     *  @param runnable 对应的 runnable 任务
     * */
    fun removeTask(tag: String, runnable: Runnable) {
        getThreadPool(tag)?.queue?.remove(runnable)
    }

这个queue,就是我们创建线程池实例时,通过构造放传进去的ArrayBlockingQueue(),这个队列用来存放等待执行的任务Runnable。通过这个队列删除了就不会执行。

这个源码通过源码亦可以验证的:

ThreadPoolExecutor的构造方法:

图片.png

通过queue获取的

图片.png

通过定位代码,明显是同一个变量。

三、Java的四种线程池封装

1、newCachedThreadPool:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:
  /**
     * newCachedThreadPool线程池
     */
    fun cachedThreadPool() {
        val newCachedThreadPool = Executors.newCachedThreadPool()
        newCachedThreadPool.execute {
            //执行子线程功能
        }
    }

底层源码:

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
2、 newFixedThreadPool:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:
    /**
     * newFixedThreadPool线程池
     */
    fun fixedThreadPool() {
        val newFixedThreadPool = Executors.newFixedThreadPool(3)
        newFixedThreadPool.execute {
            //执行子线程功能
        }
    }

底层源码:

   public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
3、newScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:
 /**
     * newScheduledThreadPool线程池
     */
    fun scheduledThreadPool() {
        val newScheduledThreadPool = Executors.newScheduledThreadPool(3)
        newScheduledThreadPool.execute {
            //执行子线程功能
        }
    }

底层源码:

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
4、newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:
 /**
     * newSingleThreadExecutor线程池
     */
    fun singleThreadExecutor() {
        val newSingleThreadExecutor = Executors.newSingleThreadExecutor()
        newSingleThreadExecutor.execute {
            //执行子线程功能
        }
    }

底层源码:

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

参考:https://blog.csdn.net/Leo_Liang_jie/article/details/90263072

上一篇 下一篇

猜你喜欢

热点阅读