阻塞队列和线程池

2020-06-06  本文已影响0人  仕明同学

队列

image.png

队列,又称为伫列(queue),是先进先出(FIFO, First-In-First-Out)的线性表。在具体应用中通常用链表或者数组来实现。队列只允许在后端(称为rear)进行插入操作,在前端(称为front)进行删除操作。
队列的操作方式和堆栈类似,唯一的区别在于队列只允许新数据在后端进行添加。

线程池

image.png

什么是阻塞队列

1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列补满。

2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序整体处理数据的速度。

常用阻塞队列,其实在源码里面多次使用到 ReentrantLock ,显示锁,关系到了AQS原理

image.png

·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
·DelayQueue:一个使用优先级队列实现的无界阻塞队列。
·SynchronousQueue:一个不存储元素的阻塞队列。
·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

无界和有界

有限队列就是长度有限,满了以后生产者会阻塞,无界队列就是里面能放无数的东西而不会因为队列长度限制被阻塞,当然空间限制来源于系统资源的限制,如果处理不及时,导致队列越来越大越来越大,超出一定的限制致使内存超限,操作系统或者JVM帮你解决烦恼,直接把你 OOM kill 省事了。
无界也会阻塞,为何?因为阻塞不仅仅体现在生产者放入元素时会阻塞,消费者拿取元素时,如果没有元素,同样也会阻塞。

ArrayBlockingQueue

是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时有参数可以设置

LinkedBlockingQueue

是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

Array实现和Linked实现的区别

1. 队列中锁的实现不同

ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;
LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock


image.png
Condition是AQS的内部类。每个Condition对象都包含一个队列(等待队列)。
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,
该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,
那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。
等待队列的基本结构如下所示。

2. 在生产或消费时操作不同

ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的;


image.png

LinkedBlockingQueue实现的队列中在生产和消费的时候,需要把枚举对象转换为Node<E>进行插入或移除,会影响性能


image.png

3. 队列大小初始化方式不同

ArrayBlockingQueue实现的队列中必须指定队列的大小;


image.png

LinkedBlockingQueue实现的队列中可以不指定队列的大小,但是默认是Integer.MAX_VALUE


image.png

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

DelayQueue

是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。
缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

SynchronousQueue

是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。

LinkedTransferQueue

多了tryTransfer和transfer方法,
(1)transfer方法
如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
(2)tryTransfer方法
tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。

LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有First和Last后缀的方法更清楚。在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。

线程池

为什么要用线程池? Rxjava的线程切换底层就是线程池,所以在牛逼,都是逃不出java的生态

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

各个参数的含义

image.png

Java5.0版本开始,加入许多新特性,是Java历史中修改最大的版本,许多特点模仿自C#,因而被认为是为了与C#对抗。

image.png

但是但是 :自动装箱/拆箱的矛盾:为什么你比较大小就是 比数字的大小,但是!=,,你却比较的是内存地址??? 不科学呀

     Integer int1 = new Integer(1);
        Integer int2 = new Integer(1);
        //为什么你比较大小就是 比数字的大小,但是!=,,你却比较的是内存地址??? 不科学呀
        System.out.println(int1 >= int2); // 檢查兩者的值,     true
        System.out.println(int1 <= int2); // 檢查兩者的值,     true
        System.out.println(int1 != int2); // 檢查兩者的參考位置,true 我认为这里为false 1难道不等于1么 
        System.out.println(int1 == int2); // 檢查兩者的參考位置,false
image.png

ExecutorService executorService = Executors.newFixedThreadPool(1);

image.png

也就是调用了 toNanos方法,也就是 x的方法,其实都是返回0

image.png

线程池的工作机制

1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务。
4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

提交任务

image.png

execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

关闭线程池

可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

合理地配置线程池

要想合理地配置线程池,就必须首先分析任务特性
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
• 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
• 任务的优先级:高、中和低。
• 任务的执行时间:长、中和短。
• 任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。

ThreadPoolExecutor 的类关系

public interface Executor
Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。

image.png

ExecutorService接口继承了Executor,在其上做了一些shutdown()、submit()的扩展,可以说是真正的线程池接口;

image.png

AbstractExecutorService抽象类实现了ExecutorService接口中的大部分方法;


image.png

ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务

image.png
image.png

ScheduledExecutorService接口继承了ExecutorService接口,提供了带"周期执行"功能ExecutorService;

ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。

image.png
ScheduledThreadPoolExecutor可以用来在给定延时后执行异步任务或者周期性执行任务,相对于任务调度的Timer来说,其功能更加强大,Timer只能使用一个后台线程执行任务,而ScheduledThreadPoolExecutor则可以通过构造函数来指定后台线程的个数。

线程池的内部使用的CAS的原理(比较并交换(compare and swap, CAS),是原子操作的一种,可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。 该操作通过将内存中的值与指定数据进行比较,当数值一样时将内存中的数据替换为新的值。)

   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

线程池工作的原理

1、一个新的任务会放到corePool,核心线程池中
2、当核心线程池满了,就会放到BlockingQueue,如果BlockingQueue是一个有界队列,而且也满了的话
3、就是在小于最大线程池里面创建新的任务,直到最大线程池也满了
4、就会去拒绝队列中,看是哪种的拒绝的方案,然后就这样


image.png
上一篇 下一篇

猜你喜欢

热点阅读