一些收藏

灯下黑?原来线程池是最典型生产消费者模式

2021-08-11  本文已影响0人  小胖学编程

线程池:是生产者-消费者模型的典型应用。应用程序将任务交给线程池,线程池将其放入到BlockingQueue中。线程池中的已经开启的worker线程会循环监听BlockingQueue中的任务来进行消费。

1. 简述:线程池的设计理念

使用Thread开启线程:

当然,一般我们会执行start方法。

若使用Thread开启线程:每一次子线程均需要经历创建和销毁的生命周期,性能不好。

为了解决这个问题,JDK设计出线程池。

1.1 生产消费者模型

JDK的BlockingQueue,天生实现的是生产者-消费者模型,即队列满了put队列会被阻塞;队列空了后get方法会被阻塞。

work线程将开启while循环的监听

线程池中Worker线程开启后。使用while循环消费BlockingQueue中的Runnable。若while循环时得到的任务为空,那么便去关闭Worker线程。这也就是最大线程数超过空闲时间的核心线程数能自动回收的原因。

image.png

核心线程最大空闲时间:

核心线程数的空闲时间KeepAliveTime。其实就是阻塞队列获取Running的最大等待时间。当配置了该参数且超过最大等待时间,那么返回一个null任务。而worker线程的run方法中的while循环得到null时,便会结束这个worker线程。

poll获取到的元素为空,那么便会结束监听,worker线程的run方法执行完毕。

image.png

当返回null时,由“work线程将开启while循环的监听”可值,将结束worker线程的监听操作。

生产者和拒绝策略

生产者向BlockingQueue中放入消息时,并没有使用put方法,而是使用的offer方法。

拒绝策略:因为offer方法返回false,说明阻塞队列已经满了,若最大线程数大于核心线程数,那么继续开启Worker线程。若开启的是最大线程数的Worker线程,那么就会执行配置的拒绝策略。

image.png

本质上就是一个内存级别的RabbitMQ

1.2 线程池的原理简述

  1. 线程池里面的线程为消费者线程(worker线程)【消费者循环去处理任务】,默认情况下,必须有任务的到来才会触发创建消费者。也可以使用prestartAllCoreThreads方法,在线程池初始化时,将所有的消费者全部创建出来。

  2. 消费者监听的是BlockingQueue队列(默认实现生产消费者模式),消费者调用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法拉取任务,若是在指定时间内获取不到任务,便会将自己注销掉(也就是结束监听)。

  3. 生产者向BlockingQueue存入任务时,使用的是offer方法,即队列满了之后返回false,然后判断最大线程数是否在开启Worker线程以及任务的拒绝策略

1.3 线程池任务执行流程

  1. 当线程池中线程数量小于corePoolSize,每来一个任务,就会创建一个线程执行这个任务。
  2. 当前线程池线程数量大于等于corePoolSize,则每来一个任务。会尝试将其添加到任务缓存队列中,若是添加成功,则该任务会等待线程将其取出去执行;若添加失败(一般来说任务缓存队列已满),则会尝试创建新的线程执行。
  3. 当前线程池线程数量等于maximumPoolSize,则会采取任务拒绝策略进行处理。
  4. 当前线程池线程数量大于corePoolSize,如果某线程空闲时间超过keepAliveTime,线程将会被终止,直到不大于corePoolSize数量。如果允许为核心池(corePoolSize)中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

1.4 拒绝策略:一个也不放弃

有时候,我们不想丢弃任务,且将生产者(主线程)阻塞,那么如何实现拒绝策略?

上文我们得知,生产者放入queue时使用的是offer方法,当阻塞队列满了之后,直接返回false。我们可以采用put的方式,将主线程阻塞。不在向BlockingQueue中填充任务。

    ThreadPoolExecutor executor2 = new ThreadPoolExecutor(1, 1, 100
            , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(50),((r, executor) -> {
        try {
            //阻塞主线程
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
        }     
    }));

详细源码分析

JAVA并发(12)— Lock实现生产者消费者
JAVA并发(13)— ThreadPoolExecutor的实现原理(源码分析)

上一篇下一篇

猜你喜欢

热点阅读