Android开发程序员Android开发

重识 java 线程池

2021-02-03  本文已影响0人  __Y_Q

一. 什么是线程池

线程池就是提前创建若干个线程, 如果有任务需要处理, 线程池里的线程就会处理任务. 处理完之后线程并不会被销毁, 而是等待下一个任务.

 

二. 为什么要使用线程池

在开发中, 几乎所有需要异步或并发执行任务的程序都可以使用线程池. 合理使用线程池可以带来以下 3 个好处

  1. 降低资源消耗. 通过重复利用已创建的线程降低线程创建和销毁造成的消耗.
  2. 可以控制最大并发数. 避免大量线程之间因相互抢占系统资源而导致的阻塞现象.
  3. 提高线程的可管理性. 线程是稀缺资源, 如果无限制的创建, 不仅会消耗系统资源, 还会降低系统的稳定性. 使用线程池可以进行统一分配, 调优和监控.
     

三. Executor

Java 5之后引入了一堆新的启动, 调度和管理线程的API. Executor 框架便是 Java 5 中引入的, 其内部使用了线程池机制, 它在 java.util.cocurrent 包下. 通过该框架来控制线程的启动, 执行和关闭, 可以简化并发编程的操作. 因此, 在 Java 5 之后, 通过使用 Executor 框架来启动线程比使用Threadstart方法更好.

Executor 框架UML 图
提交任务
  1. Executorexecute() 方法用于提交不需要返回值的任务, 所以无法判断任务是否被线程执行成功.

  2. ExecutorServicesubmit() 方法用于提交需要有返回值的任务. 线程池会返回一个 Future 类型的对象, 可以调用 isDone() 方法查询 Future 是否已经完成. 当任务完成时, 它有一个结果. 可以调用 get() 方法来获取该结果. 也可以不用 isDone() 进行检查就直接调用 get() 获取结果, 在这种情况下, get() 将阻塞当前线程, 直至结果准备就绪. 还可以取消任务的执行, Future 提供了 cancel() 方法用来取消执行 pending 中的任务.
     

关闭线程池
  1. ExecutorServiceshutdown() 或者 shutdownNow() 方法用来关闭线程池. 它们的原理是遍历线程池中的工作线程, 然后逐个调用线程的 interrupt() 方法来中断线程, 所以无法响应中断的任务可能永远无法停止. 但是他们存在一定的区别.

    • shutdownNow() 首先将线程池的状态设置为 STOP, 然后尝试停止所有的正在执行或者暂停任务的线程, 并返回等待执行任务的列表.
    • shutdown() 只是将线程池的状态设置成 SHUTDOWN 状态. 然后中断所有没有正在执行任务的线程.
  2. 只要调用了这两个方法中的任意一个, ExecutorService.isShutdown() 方法就会返回 true. 当所有的任务都已关闭后, 才表示线程池关闭成功, 这时调用 ExecutorService.isTerminated() 方法会返回 true. 至于应该调用哪一种方法来关闭线程池, 应该由提交到线程池的任务特性决定, 通常调用 ExecutorService.shutdown() 来平滑的关闭线程池, 如果任务不一定要执行完, 则可以使用 ExecutorService.shutdownNow()

 
了解了如何提交任务到线程池与如何关闭线程池后, 那么接下来就来看一下 ThreadPoolExecutor 的构造函数中配置线程池的参数都有什么意义吧.

 

四. ThreadPoolExecutor

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

根据以上参数理解, 可以总结出当有新任务需要处理的时候 ThreadPoolExecuto 执行任务的流程大致如下

  1. 先看线程池中的线程数量是否小于核心线程 corePoolSize 的数量
    • 小于核心线程数, 那么会直接启动一个核心线程来执行任务.
  2. 大于或等于核心线程数, 接着看任务队列 workQueue是否满了.
    • 任务队列未满, 那么就直接插入到任务队列中等待执行
  3. 任务队列 workQueue 满了, 最后看线程池中的线程数量是否小于线程池最大线程数 maximumPoolSize.
    • 小于线程池最大线程数, 立刻启动一个非核心线程来执行任务.
  4. 大于或等于线程池最大线程数, 则执行拒绝策略.
     
ThreadPoolExecuto 执行任务流程图

下面以一个示例来使用一下 ThreadPoolExecuto

class MyTask implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "正在执行。。。");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class TestType5 {
    public static void main(String[] args) throws Exception {
        //定义一个容量为 2 的有界阻塞队列
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(2);
        //创建一个线程池, 核心线程 3 个, 最大线程数 5 个, 60 秒钟超时
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 60, TimeUnit.SECONDS, queue);
        Runnable r1= new MyTask();
        Runnable r2= new MyTask();
        Runnable r3= new MyTask();
        Runnable r4= new MyTask();
        Runnable r5= new MyTask();
        Runnable r6= new MyTask();
        Runnable r7= new MyTask();
        Runnable r8= new MyTask();

        threadPoolExecutor.execute(r1);
        threadPoolExecutor.execute(r2);
        threadPoolExecutor.execute(r3);
        threadPoolExecutor.execute(r4);
        threadPoolExecutor.execute(r5);
        threadPoolExecutor.execute(r6);
        threadPoolExecutor.execute(r7);
        //threadPoolExecutor.execute(r8);
        threadPoolExecutor.shutdown();
    }
}

输出结果为:

pool-1-thread-1正在执行。。。
pool-1-thread-2正在执行。。。
pool-1-thread-3正在执行。。。
pool-1-thread-4正在执行。。。
pool-1-thread-5正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-5正在执行。。。

可是, 如果把 r8 也执行了, 输出结果就会变成下面这样

pool-1-thread-1正在执行。。。
pool-1-thread-3正在执行。。。
pool-1-thread-4正在执行。。。
pool-1-thread-2正在执行。。。
pool-1-thread-5正在执行。。。

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task org.study.MyTask@6f94fa3e rejected from java.util.concurrent.ThreadPoolExecutor@5e481248
[Running, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at org.study.TestType5.main(TestType5.java:42)

pool-1-thread-2正在执行。。。
pool-1-thread-3正在执行。。。

可以看到执行了拒绝策略, 抛出了默认的异常. 为什么呢? 我们来屡屡. 一共创建了 8 个Runnable, 分别是 r1 r2 r3 ... r8 .
创建的线程池核心线程为 3 个, 最大线程总数为 5 个. 阻塞队列长度为 2.

  1. r1, r2, r3 这三个分别在开始执行时发现当前线程池内线程总数(0)小于核心线程总数(3), 那么就会启动三个核心线程来执行r1 - r3.
  2. 执行 r4 的时候, 发现线程池内的线程总数已经等于了核心线程数, 那么看到任务队列未满, 就放入到阻塞队列中.
  3. r5的执行与 r4 流程相同., r5 执行完后, 此时线程池中线程数量为 3, 阻塞队列也满了.
  4. 执行 r6 的时候发现当前线程池内的线程总数等于核心线程, 并且阻塞队列也满了, 最后拿当前线程池中的线程总数(3)与我们设置的线程池最大线程数(5)做对比, 发现小于我们设置的最大线程数, 那么就新启动一个线程来执行 r6.
  5. r7 的执行与 r6 相同, 也是新启动一个线程来执行 r7. 这时候, 核心线程总数为 3, 阻塞队列满了, 线程池中的线程总数为 5 个.
  6. 当要执行 r8 的时候, 发现核心线程数满了, 阻塞队列也满了, 同时线程池中的线程总数也达标了, 那么就会执行拒绝策略, 抛出默认的异常.
  7. 目前正在执行的任务有 r1, r2, r3, r6, r7, 阻塞队列中的任务是 r4, r5. 当 r1, r2, r3, r6, r7 中任何一个任务执行完了, 就会从阻塞队列中取出任务来执行, 所以最后又打印了两个 log. 执行的就是阻塞队列中的 r4r5.

看到这里的同学是不是有点蒙圈了, 通过 ThreadPoolExecutor 配置的线程池, 那些参数要如何填写呢? 核心线程要分配几个呢? 怎么配置线程池算是合理呢? 其实想要合理的配置线程池, 需要先分析任务的特性. 而任务的特性可以从以下几个角度来分析

CPU 密集型任务的特点是要进行大量的计算, 消耗 CPU 资源, 比如计算圆周率, 对视频进行高清解码等, 全靠 CPU 的运算能力.
IO 密集型是指设计到网络, 磁盘 IO 的任务 都是 IO 密集型任务, 这类任务特点是 CPU 消耗很少, 任务的大部分时间都在等待 IO 操作完成 (因为 IO 的速度远远低于 CPU 和内存的速度).

性质不能同的任务可以用不同规模的线程池分开处理.

Ps: 通过 Runtime.getRuntime().availableProcessors() 方法获得当前设备的 CPU 个数
Ps: CPU+1 是为了防止页缺失.也叫硬中断. 当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时, 这个额外的线程也能确保 CPU 的时钟周期不会被浪费.


这里了解了通过 ThreadPoolExecutor 实现线程池方式, 而在 Java 中, 使用 Executors 类创建线程池也是一种常用的创建线程池的方式. 下面再来了解一下 Executors类.
 

五. Executors

Executors 类, 提供了一系列工厂方法用于创建线程池,返回的线程池都实现了 ExecutorService 接口.
通过 Executors 类创建的 4 类线程池都是直接或间接的通过配置ThreadPoolExecutor 来实现自己的功能特性, 下面分别来了解一下通过 Executors 创建的这 4 类线程池.
 

1. newCachedThreadPool 缓存型线程池
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newCachedThreadPool 创建出的线程池特性来看, 这类线程池比较适合执行大量的耗时较少的任务, 当整个线程池都处于闲置状态时, 线程池中的线程都会因为超时而被停止, 这时候线程池内部是没有任何线程的, 几乎不占用任何系统资源.
 

2. newFixedThreadPool 全核心型线程池
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

这类型的线程池多数针对一些很稳定很固定的正规并发线程, 多用于服务器.
 

3. newScheduledThreadPool 调度型线程池
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    // 上面的 Executor 框架 UML 图 中 ScheduledThreadPoolExecutor  继承了 ThreadPoolExecutor 类.
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
    }

这类型的线程池主要用于执行定时任务和具有固定周期的重复任务.

 

4. newSingleThreadExecutor 单例型线程池
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

下面只挑出两个具有代表性的来看一下是如何使用的, newCachedThreadPoolnewScheduledThreadPool

带有返回值的任务提交与 newCachedThreadPool 的使用.
class MyTask implements Callable<String>{

    private int id;

    public MyTask(int id) {
        this.id = id;
    }

    @Override
    public String call() throws Exception {
        System.out.println("id:"+ id + " - threadName:"+Thread.currentThread().getName() + "调用 call 方法");
        //这里返回的结果, 会被 Future 的 get 方法得到.
        return "任务返回结果为:" +  id +" - "+ Thread.currentThread().getName();
    }
}

public class TestType5 {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newCachedThreadPool();
        //存储返回的结果
        ArrayList<Future<String>> list = new ArrayList<>();

        //提交 10 个任务, 并将返回的 future 存储
        for (int i = 0; i < 10; i++) {
            Future<String> future = pool.submit(new MyTask(i));
            list.add(future);
        }

        for (Future<String> fs : list) {
            //Future 返回如果没有完成, 则一直循环,
            while (!fs.isDone());
            System.out.println(fs.get());
            pool.shutdown();
        }

    }
}

输出结果

id:0 - threadName:pool-1-thread-1调用 call 方法
id:3 - threadName:pool-1-thread-4调用 call 方法
id:2 - threadName:pool-1-thread-3调用 call 方法
id:1 - threadName:pool-1-thread-2调用 call 方法
id:5 - threadName:pool-1-thread-6调用 call 方法
id:4 - threadName:pool-1-thread-5调用 call 方法
id:6 - threadName:pool-1-thread-7调用 call 方法
id:7 - threadName:pool-1-thread-6调用 call 方法
id:9 - threadName:pool-1-thread-5调用 call 方法
id:8 - threadName:pool-1-thread-7调用 call 方法
任务返回结果为:0 - pool-1-thread-1
任务返回结果为:1 - pool-1-thread-2
任务返回结果为:2 - pool-1-thread-3
任务返回结果为:3 - pool-1-thread-4
任务返回结果为:4 - pool-1-thread-5
任务返回结果为:5 - pool-1-thread-6
任务返回结果为:6 - pool-1-thread-7
任务返回结果为:7 - pool-1-thread-6
任务返回结果为:8 - pool-1-thread-7
任务返回结果为:9 - pool-1-thread-5
newScheduledThreadPool 调度型线程池的使用
class ThreadPoolUtil implements Runnable{

    private Integer index;

    public ThreadPoolUtil(Integer index) {
        this.index = index;
    }

    @Override
    public void run() {
        try {
            System.out.println(index+"开始处理线程!");
            Thread.sleep(5000);
            System.out.println("线程标识是:"+this.toString());
            System.out.println(index+"处理结束!");
        }
        catch(InterruptedException e) {
            e.printStackTrace();
        }
    }

}

public class TestType5 {
    public static void main(String[] args) throws Exception {
        //核心线程为 2. 一次执行 2 个任务.剩下的放入到队列
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
        for (int i = 0; i < 4; i++) {
            //延迟 2 秒执行
            pool.schedule(new ThreadPoolUtil(i), 2 , TimeUnit.SECONDS);
        }
        pool.shutdown();
    }
}

输出结果

0开始处理线程!
1开始处理线程!
线程标识是:org.study.ThreadPoolUtil@30f68028
线程标识是:org.study.ThreadPoolUtil@37c3a03e
0处理结束!
1处理结束!
2开始处理线程!
3开始处理线程!
线程标识是:org.study.ThreadPoolUtil@34c912ee
线程标识是:org.study.ThreadPoolUtil@390458b6
2处理结束!
3处理结束!

 
线程池到这里就结束了, 我们在 Android 中有时也会需要用到线程池. 所以了解类之间的关系以及线程池的优化也是必不可少的.

上一篇 下一篇

猜你喜欢

热点阅读