多线程与并发

Java - Queue

2019-02-28  本文已影响0人  齐晋

简介

Queue,翻译成队列,是一种先进先出(FIFO, First In First Out)的数据结构。最先放进去的,取的时候也就最先取出来。最形象的比喻就是我们常见的排队就是一个队列。排队时,新来的人进入队尾,先到的人首先接收服务。

Queue大多数是单向队列,即只能从一端取数据,另一端放入数据。就像把羽毛球放入球筒,从一端放入,从另一端取出。

image.png

Queue体系

public interface Queue<E> extends Collection<E> {
    boolean add(E e);
    boolean offer(E e);
    E remove();
    E poll();
    E element();
    E peek();
}

上面的6个方法分别表示了3种操作,每一种操作都有两种类型,抛出异常的类型有特定返回值的类型。下面用表格来描述上述方法的异同:

方法 作用 返回值 成功 失败
add(E e) 插入元素 boolean true IllegalStateException
offer(E e) 同add() boolean true false
remove(E e) 返回第一个元素,并删除 E E NoSuchElementException
pool(E e) 同remove boolean E null
element() 返回第一个元素,不删除 E E NoSuchElementException
peak() 同element E E null

Queue接口常见的扩展和实现有:

image.png

AbstractQueue

AbstractQueue类似于很多JDK源码中的Abstract*类,实现了部分通用方法,做为具体实现类的基类。这里主要讲一下PriorityQueueConcurrentLinkedQueue

PriorityQueue

基于堆实现的优先队列。获取数据时是有序的。默认是升序。
主要特点:

public class PriorityQueue<E> extends AbstractQueue<E>{
    public PriorityQueue(int initialCapacity, Comparator<? super E> comparator);
}
PriorityQueue<Integer> test = new PriorityQueue<>();
test.add(10);
test.add(4);
test.add(7);
test.add(2);
test.add(9);

//可保证有序输出,输出 2 4 7 9 10
while (!test.isEmpty()) {
    System.out.println(test.poll());
}

//不保证有序输出,输出 2 4 7 10 9
for(Integer e : test){
    System.out.println(e);
}

//不保证有序输出,输出 2 4 7 10 9
Iterator it = test.iterator();
while (it.hasNext()) {
    System.out.println(it.next());
}

ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个无锁线程安全队列。
常见的线程安全实现都是通过加锁(在“线程安全的实现”这一节就能看到),而加锁的成本是很高的。如果能找到一种方法,既不用加锁,又能保证线程安全,很大可能能极大提升系统性能。ConcurrentLinkedQueue就是使用了这种无锁方法的一种队列。
其实现思想借鉴了很通用,也很重要的CAS操作。CAS简介
详情请见无锁队列 - ConcurrentLinkedQueue

BlockingQueue

BlockingQueue名为阻塞队列
何为阻塞?以从队列中取数据为例,当队列为空时,Queue提供方法的表现为:

BlockingQueueQueue的基础上主要扩展了以下几个阻塞方法:

public interface BlockingQueue<E> extends Queue<E> {
    void put(E e) throws InterruptedException;
    E take() throws InterruptedException;
    boolean offer(E e, long timeout, TimeUnit unit)
       throws InterruptedException;
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
}
异常行为 插入数据 取数据
抛出异常 add(E e) remove()
返回null offer(E e) poll()
阻塞 put(E e)/offer(E e, long timeout) take()/poll(long timeout)

BlockingQueue的具体实现

BlockingQueue只是个接口,其常用的具体实现有哪些呢?主要有以下这些:

image.png

ArrayBlockingQueue

主要特点:

public class ArrayBlockingQueue<E> extends AbstractQueue<E> 
    implements BlockingQueue<E>, java.io.Serializable {
    final Object[] items;  //基于数组
    final ReentrantLock lock;  //线程同步锁
    private final Condition notEmpty;  //条件变量,用于取数据同时队列为空时阻塞线程
    private final Condition notFull;  //条件变量,用户插入数据同时队列满时阻塞线程
}

LinkedBlockingQueue

主要特点:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private transient Node<E> head;  //队列头
    private final ReentrantLock takeLock = new ReentrantLock();  //取数据线程同步锁
    private final Condition notEmpty = takeLock.newCondition(); //条件变量,用于取数据同时队列为空时阻塞线程
    private final ReentrantLock putLock = new ReentrantLock(); //插入数据线程同步锁
    private final Condition notFull = putLock.newCondition(); //条件变量,用户插入数据同时队列满时阻塞线程
}

注:put(E e)方法只有在队列满时才组设,因此,如果是无界队列,put(E e)永远不会阻塞。

PriorityBlockingQueue

优先队列PriorityQueue的线程安全版本,同时提供阻塞方法。
主要特点同PriorityQueue

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private transient Object[] queue;
    
}

SynchronousQueue

比较特殊的队列,没有任何存储空间。
举个简单的例子:
A是个快递员,要送快递给用户B,如果使用ArrayBlockingQueue或者LinkedBlockingQueue,会是这样:

  1. A把快递放到快递柜的箱子里(假设快递柜有20个箱子)
  2. 如果有空箱子,可以直接把快递放到空箱子中。
  3. 如果所有的箱子都满了,那么A等着,直到B取了任意快递,空出了箱子,A再把快递放到空箱子中。
  4. 如果所有的箱子都空了,取快递的人B会一直等待,直到有快递投递到箱子中。

那么如果使用SynchronousQueue,情况就不同了

  1. 首先没有能临时存放快递的柜子和箱子
  2. A要送快递,会一直拿着快递等,直到B来取。
  3. B要取快递,也会一直等,直到A来送。
    SynchronousQueue就是类似这种“手到手”的交付方式,不经过任何媒介缓存。

主要特点:

DelayQueue

DelayQueue的继承结构如下:

image.png
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
    private final PriorityQueue<E> q = new PriorityQueue<E>();  //存储数据
}

由类的声明可知,插入到DelayQueue中的元素必须实现Delayed接口。Delayed接口比较简单,只有一个getDelay(TimeUnit unit)方法。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);  //剩余时间
}

同时,DelayQueue存储数据既不像ArrayBlockingQueue使用数组,也不像LinkedBlockingQueue使用链表,而是使用现成的PriorityQueue。因此DelayQueue取数据的规则跟PriorityQueue类似。

何谓延迟?
延迟主要体现在取数据的时候。通过查看poll()的源码可知:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

在取数的时候,getDelay()必须小于等于0才能把数取出来。通过实现getDelay()方法,就能实现过多长时间以后才能取出数据这种延迟效果。

主要特点:

BlockingQueue的应用

实现生产者-消费者
生产者-消费者模式在工程之中应用广泛。BlockingQueue可极大简化实现生产者-消费者的难度。
伪代码:

//生产者
public class Producer{
    private BlockingQueue queue;
    Producer(BlockingQueue queue){this.queue = queue}
    public void produce(E e){
        //生产者调用put()方法,队列满时阻塞等待。
        queue.put(e);
    }
}

//消费者
public class Consumer{
    private BlockingQueue queue;
    Consumer(BlockingQueue queue){this.queue = queue}
    public E consume(){
        //消费者调用take()方法,队列空时阻塞等待
        queue.take(e);
    }
}

做为线程池的等待队列

求Top K大/小的元素
比如有1亿个随机数字,找出最大的10个数。这种类似的求Top K的问题很常见。
由于PriorityQueue/PriorityBlockingQueue底层结构是堆(大顶堆/小顶堆),而解决Top K问题的最好办法就是使用堆,因此PriorityQueue/PriorityBlockingQueue是解决该问题的不二选择。
代码摘要:

public class FixSizedPriorityQueue<E extends Comparable> {
    private PriorityQueue<E> queue;
    private int maxSize; // 堆的最大容量

    public FixSizedPriorityQueue(int maxSize) {
        if (maxSize <= 0)
            throw new IllegalArgumentException();
        this.maxSize = maxSize;
        this.queue = new PriorityQueue(maxSize, new Comparator<E>() {
            public int compare(E o1, E o2) {
                // 生成最大堆使用o2-o1,生成最小堆使用o1-o2, 并修改 e.compareTo(peek) 比较规则
                return (o2.compareTo(o1));
            }
        });
    }

    public void add(E e) {
        if (queue.size() < maxSize) { // 未达到最大容量,直接添加
            queue.add(e);
        } else { // 队列已满
            E peek = queue.peek();
            if (e.compareTo(peek) < 0) { // 将新元素与当前堆顶元素比较,保留较小的元素
                queue.poll();
                queue.add(e);
            }
        }
    }
}

延时需求
如:考试时间为120分钟,30分钟后才可交卷。这种情况下,就需要使用到DelayQueue了。
案例参考

Deque

Deque发音为'deck',为双向队列。
Queue默认是单向队列,数据只能从一头放入,从另一头取出,这跟羽毛球的放取原理一样。双向队列就像放取乒乓球,可以从两端放入,也可以从两端取出。

image.png

Deque接口的部分组成

public interface Deque<E> extends Queue<E> {
    //新增方法
    void addFirst(E e);
    void addLast(E e);
    boolean offerFirst(E e);
    boolean offerLast(E e);
    E removeFirst();
    E removeLast();
    E pollFirst();
    E pollLast();
    E getFirst();
    E getLast();
    E peekFirst();
    E peekLast();
    void push(E e);
    E pop();

    //Queue接口方法
    boolean add(E e);
    boolean offer(E e);
    E remove();
    E poll();
    E element();
    E peek();
}

从扩展的函数名字就能看出来,Deque分别针对插入和取出接口提供了对队列头和尾的操作。
同时,从Queue继承过来方法的与扩展方法有如下对应关系:

add(E e) == addLast(E e)
offer(E e) == offerLast(E e)
remove() == removeFirst()
poll() == pollFirst()
element() == getFirst()
peek() == peekFirst()

同时,Deque还扩展出了push(E e)pop()方法。其中:

push(E e)等同于addFirst(E e)
pop()等同于removeFirst()

因此,当Deque只通过push(E e)pop()操作队列头时,Deque就演化成了一个栈Stack

[图片上传失败...(image-f152e6-1551348671199)]

扩展接口或实现类

线程安全的实现

引申
Java线程安全...

BlockingQueue中的方法都是线程安全的,都使用了ReentrantLock做为锁。如:
ArrayBlockingQueue
ArrayBlockingQueue中有公共变量count来计数元素个数,因此需要一个全局的锁来保护。

public ArrayBlockingQueue(int capacity, boolean fair) {
    lock = new ReentrantLock(fair);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //do something
    } finally {
        lock.unlock();
    }
}

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //do something
    } finally {
        lock.unlock();
    }
}

LinkedBlockingQueue
LinkedBlockingQueue中计数元素个数使用的是AtomicInteger类型,本身是线程安全的。因此没有使用全局锁,而是针对插入和获取分别创建了两个锁。

private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();

public boolean offer(E e) {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //do something
    } finally {
        putLock.unlock();
    }
}

public E poll() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //do something
    } finally {
        takeLock.unlock();
    }
}

引申
上面实现线程安全的方式都是通过锁。而我们知道,锁是一个比较重的操作,在高并发系统中,能少用就少用。因此,在此介绍一个不用锁就能实现线程安全的队列:
无锁队列 - ConcurrentLinkedQueue...
无锁并且保证线程安全的思想是使用CAS...

阻塞算法的实现

引申
Java线程通信...

BlockingQueue的功能是个标准的生产者-消费者模式。线程间通信使用的是条件变量Condition。伪代码如下:

ReentrantLock lock = new ReentrantLock(fair);
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

public boolean put(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果队列已经满了,那么等待队列notFull
        while(count == container.size){
            notFull.await()
        }
        notEmpty.signal();
        //do other things
    } finally {
        lock.unlock();
    }
}

public E take() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果队列为空,那么等待队列notEmpty
        while (count == 0){
            notEmpty.await();
        }
        notFull.signal();
    } finally {
        lock.unlock();
    }
}

非阻塞型
ConcurrentLinkedQueue: 无锁线程安全队列

参考

上一篇下一篇

猜你喜欢

热点阅读