java

深入理解 Java线程池

2021-03-14  本文已影响0人  小王_min

前言

线程池之前需要是[多线程知识:https://www.jianshu.com/p/1b2daac373d5]

什么是线程池

顾名思义,线程池就是有一个容器[底层数据结构HashSet<Worker>],容器用于存放多个线程。线程池中存在多个线程,如果需要执行任务的话,则从这个池子中取得一个线程对象用于执行此任务。[只是一个大概的粗略的介绍,具体细节请接着往下看!!!]

怎么使用线程池[对于CPU/IO密集型]

为什么要使用线程池

  1. 由于线程池的创建和销毁的过程会涉及到用户态和内核态的切换等一些消耗计算机资源的操作(此处针对的是内核线程模型,可见https://www.jianshu.com/p/39d2a4c050f8),导致效率的降低。线程池的作用就是利用一个数据结构容器维系一些线程,用于执行Application的Task,以此达到复用线程提高效率(提高并发、降低RT)。
  2. 有些可以实现与时间相关的任务,如定时任务、周期性任务等。

Java线程池Demo

public class ThreadPoolDemo {
    public static void main(String[] args) throws Exception {
        // 创建Cached线程池,运行速度最快、线程数=理论上可以理解为无穷大
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        // 创建混合线程池,运行速度中等、线程数=传入的 nThreads
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
        // 创建单个线程的线程池,运行速度最慢,但是可以实现顺序执行,线程数=1
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
        Future<Double> future1 = cachedThreadPool.submit(() -> doTask((e1, e2) -> e1*1.0 + e2, 1, 9));
        Future<Double> future2 = cachedThreadPool.submit(() -> doTask((e1, e2) -> e1*1.0 - e2, 1, 9));
        Future<Double> future3 = cachedThreadPool.submit(() -> doTask((e1, e2) -> e1*1.0 * e2, 1, 9));
        Future<Double> future4 = cachedThreadPool.submit(() -> doTask((e1, e2) -> e1*1.0 / e2, 1, 9));
        System.out.println(future1.get());
        System.out.println(future2.get());
        System.out.println(future3.get());
        System.out.println(future4.get());
        // 关闭资源
        close(cachedThreadPool);
        close(fixedThreadPool);
        close(singleThreadExecutor);
        close(scheduledThreadPool);
    }

    public static double doTask(BiFunction<Integer, Integer, Double> biFunction, int arg1, int arg2) {
        return biFunction.apply(arg1, arg2);
    }
    
    public static void close(ExecutorService executorService) {
        executorService.shutdown();
    }
}

Executors类是线程池的工具类(不推荐使用),有一些创建线程池的方法,最主要的方法如下:

底层调用截图

ThreadPoolExecutor

如果是使用Java原生的线程池,推荐使用ThreadPoolExecutor

先来看看类层级结构

类层级结构 Executor源码 ExecutorService源码 AbstractExecutorService源码 ThreadPoolExecutor源码

构造方法

构造方法一

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler)

构造方法二

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    RejectedExecutionHandler handler)

构造方法三

public ThreadPoolExecutor
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory)

构造方法四

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue)

阻塞队列

主要的阻塞队列如下:

拒绝策略

拒绝策略抽象接口RejectedExecutionHandler,其所有的实现类都在ThreadPoolExecutor类中,当然你也可以自己自己的拒绝策略。

拒绝策略抽象接口及其实现类

自定义拒绝策略使用Demo

public class ThreadPoolRejectedExecutionHandlerDemo {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(
                    Runnable r,                 // 被拒绝的任务Task
                    ThreadPoolExecutor executor // 线程池对象
            ) {

                // do something
            }
        });
        // 执行Task
        executor.execute(() -> doTask());
        // 关闭资源
        close(executor);
    }

    public static void doTask() {
        System.out.println("do Task");
    }

    public static void close(ExecutorService executorService) {
        executorService.shutdown();
    }
}

线程池详解

ThreadPoolExecutor类中有一个ctl的原子整数的属性,该属性巧妙的表示了两个属性值(workerCountrunState),workerCount指示线程的有效数量,runState指示线程池的生命周期(是否运行,关闭等)。

为什么要使用一个ctl的原子整数来表示这些值呢?

线程池模型(生命周期)

线程池模型
  1. RUNNING:运行中,此状态可以接受新任务并处理排队的任务。
  2. RUNNING->SHUTDOWN:执行shutdown()方法,将会导致线程池状态又RUNNING->SHUTDOWN的变迁,具体的shutdown()下面会有介绍。
  3. RUNNING->STOP:执行shutdownNow()方法,将会导致线程池状态又RUNNING->STOP的变迁,具体的shutdown()下面会有介绍。
  4. SHUTDOWN/STOP->TIDYING:所有任务已终止,workerCount为零,状态转换为TIDYING。
  5. TIDYING->TERMINATED:状态TIDYING的线程将运行terminated()方法,会导致状态变迁为TERMINATED

shutdown VS shutdownNow

线程池执行流程

线程提交顺序 VS 线程执行顺序

提交顺序
提交顺序
  1. 应用程序调用execute方法向线程池中提交Task执行。
  2. 首先会向corePool中提交Task,如果corePool中有空闲的线程或者数量<maximumPoolSize,则选择/创建一个线程执行Task。
  3. 否则,会将Task提交到Queue中,如果Queue队列未满,那么增将Task任务添加至Queue等待。
  4. 否则,会将Task提交到临时Pool中(如果有的话,临时Pool=maximumPoolSize-corePoolSize),如果临时Pool有空闲或者数量<maximumPoolSize,则选择/创建一个线程执行Task。
  5. 否则,将会调用拒绝策略(默认是抛出异常),当然还有其他的实现或者自定义实现。
执行顺序
执行顺序
  1. 执行顺序与提交顺序有一些差别
  2. 首先会执行corePool和临时Pool中的线程任务,执行顺序不固定(抢占式)。
  3. 等待Queue中的任务,是处于等待状态。只有当corePool和临时Pool中线程有空闲的时候才会被执行。

submit VS execute

上源码

  1. public Future<?> submit(Runnable task)
submit源码 newTaskFor源码

可见本质上还是调用的execute方法,但是在执行execute方法之前对任务进行了额外的操作,将Runnable的Task通过newTaskFor方法转为RunnableFutureTask,目的是为了能够拿到执行的结果,此处返回结果默认为null。

  1. public <T> Future<T> submit(Runnable task, T result)
submit源码

1中类似,T result是指定的任务的返回值

  1. public <T> Future<T> submit(Callable<T> task)
submit源码

1中类似,不同的是,此次提交的Task是Callable<T>类型的,也就是有返回值的。

后续待更新,如果您觉得有帮助,麻烦点个三连,给个关注。

上一篇 下一篇

猜你喜欢

热点阅读