java中的队列

2019-12-23  本文已影响0人  北海北_6dc3

一、队列是什么

队列是一种先进先出的数据结构。

二、队列的接口定义

方法名称 作用 队列满
boolean add(E e) 队尾添加 异常
boolean offer(E e) 队尾添加 返回false
E remove() 获取并移除队头 异常
E poll() 获取并移除队头 null
E element() 获取队头 异常
E peek() 移除队头 null
public interface Queue<E> extends Collection<E> {

    /**
     *队列满,抛出异常
     */
    boolean add(E e);

    /**
     *队列满,返回false
     */
    boolean offer(E e);

    /**
     *获取并移除一个队列头部元素. 这个方法与{@link #poll poll} 不同点是
     * 对列为空,会抛出异常,而poll方法,会返回null
     * @return the head of this queue
     * @throws NoSuchElementException if this queue is empty
     */
    E remove();

    /**
     * @return the head of this queue, or {@code null} if this queue is empty
     */
    E poll();

    /**
     * 获取但不移除,对应remove
     * @return the head of this queue
     * @throws NoSuchElementException if this queue is empty
     */
    E element();

   /**
     * 获取但不移除,对应poll
     * @return the head of this queue, or {@code null} if this queue is empty
     */
    E peek();

可以看到,队列简单来说就是在队头获取移除数据,在队尾插入数据

一、队列使用场景

参考资料:
消息队列的使用场景是怎样的? - 祁达方的回答 - 知乎
https://www.zhihu.com/question/34243607/answer/140732170
消息队列mq总结

四、java中已定义的队列

1、先看总体结构:
queue总体结构.png

Queue的实现,只有BlockingQueue和Deque【阻塞队列和双链队列】

2、BlockingQueue

BlockingQueue从字面意思就知道它是阻塞队列,它在以下两种情况会造成阻塞:
1.当队列满了的时候进行入队操作。
2.当队列空了的时候进行出队操作。
参考资料:
ArrayBlockingQueue源码-JUC阻塞队列
JAVA中的队列
https://www.cnblogs.com/silyvin/p/9106632.html

BlockingQueue接口定义如下:

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);
    boolean offer(E e);

    //插入元素,如果队列满,阻塞
    void put(E e) throws InterruptedException;

    //插入元素,如果队列满,指定超时阻塞
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //获取并移除,如果队列为空,则阻塞
    E take() throws InterruptedException;

    //获取并移除,如果队列为空,指定超时阻塞
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    //队列剩余容量,(不可用于判定队列是否会插入阻塞,存在其他线程插入情况)
    int remainingCapacity();

    //移除指定元素,通过o.equals判定
    boolean remove(Object o);

    //是否包含指定元素,通过o.equals判定
    public boolean contains(Object o);

    //从该队列中删除所有可用元素并将它们添加到给定集合中。
    int drainTo(Collection<? super E> c);

    //从该队列中删除指定数量可用元素并将它们添加到给定集合中。
    int drainTo(Collection<? super E> c, int maxElements);
BlockingQueue基于Queue增加的方法
方法名称 作用 队列满
void put(E e) 队尾添加 阻塞
boolean offer(E e, long timeout, TimeUnit unit) 队尾添加 指定时间阻塞
E take() 获取并移除队头 阻塞
E poll(long timeout, TimeUnit unit) 获取并移除队头 指定时间阻塞
int remainingCapacity() 剩余容量
boolean remove(Object o) 移除指定元素 不阻塞
boolean contains(Object o) 是否存在元素
int drainTo(Collection<? super E> c) 批量获取移除所有元素 不阻塞
int drainTo(Collection<? super E> c, int maxElements) 批量获取移除指定数量元素 不阻塞

继承结构如下


BlockingQueue结构.png

继承自BlockingQueue的实现队列有9个。

队列名称 作用 是否有界
ArrayBlockingQueue 必须在创建的时候指定队列的大小,内部采用数组的实现方式 只有一个锁 有界
LinkedBlockingQueue 大小如果不指定默认为Integer的最大值,内部采用链表实现,链表队列通常比基于阵列的队列具有更高的吞吐量,但是在大多数并发应用程序中,可预测的性能较差。 读写分离锁,两个锁 可选有界
PriorityBlockingQueue 通过最小堆实现,内部为数组 只有一个锁,和cas扩容使用 无界,初始容量为11
SynchronousQueue 通过内部构建一个队列(公平锁)或者栈(非公平锁)实现多线程实时读取,生产和消费速度 要大体相似,否则会产生内存溢出 CAS 无容量
LinkedBlockingDeque 大小如果不指定默认为Integer的最大值,底层实现采用的链表,但是锁的方式,采用和ArrayBlockingQueue的一样,故首位插入获取都会锁住 只有一个锁 可选有界
DelayQueue 延时队列,每个元素需实现Delayed接口,延时到期后方可从队列去除,核心就是内置了个PriorityQueue,引入leader机制 只有一个锁 无界
DelayedWorkQueue 延时队列,每个元素实现一个Runnable接口,并再次实现了PriorityQueue接口相应方法,同时也引入leader 只有一个锁 无界
LinkedTransferQueue<E> LinkedTransferQueue是 SynchronousQueue 和 LinkedBlockingQueue 的合体,性能比 LinkedBlockingQueue 更高(没有锁操作),比 SynchronousQueue能存储更多的元素。 CAS 无界
ArrayBlockingQueue核心源码解析
    /** 存储队列容器 */
    final Object[] items;

    /** items index for next take, poll, peek or remove ,获取索引*/
    int takeIndex;

    /** items index for next put, offer, or add ,插入索引*/
    int putIndex;

    /** Number of elements in the queue,总数 */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access,锁 */
    final ReentrantLock lock;

    /** Condition for waiting takes,锁条件,不为空释放 */
    private final Condition notEmpty;

    /** Condition for waiting puts,锁条件,队列不满释放 */
    private final Condition notFull;

    /**
     * Shared state for currently active iterators, or null if there
     * are known not to be any.  Allows queue operations to update
     * iterator state.
     */
    transient Itrs itrs = null;

    //插入元素,数据位置设置值,更改指针位置,发送信号量。
    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }


   /**
     * 删除元素,数据位置置空,更改指针位置,发送信号量。
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

    /**
     * while里面优雅的阻塞,插入值不可为null
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
LinkedBlockingQueue核心源码解析

参考资料:
LinkedBlockingQueue 源码分析 (基于Java 8)

/** Linked list node class */
/**
 * Linked 的数据节点, 这里有个特点, LinkedBlockingQueue 开始构建时会创建一个dummy节点(类似于 ConcurrentLinkedQueue)
 * 而整个队列里面的头节点都是 dummy 节点
 * @param <E>
 */
static class Node<E>{
    E item;

    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    /**
     * 在进行 poll 时 会将 node.next = node 来进行 help gc
     * next == null 指的是要么是队列的 tail 节点
     */
    Node<E> next;
    Node(E x){
        item = x;
    }
}

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
 * Head of linked list
 * Invariant: head.item == null
 * Head 节点的不变性 head.item == null <- 这是一个 dummy 节点(dummy 节点什么作用呢, 主要是方便代码, 不然的话需要处理一些 if 的判断, 加大代码的复杂度, 尤其是非阻塞的实现)
 */
transient Node<E> head;

/**
 * Tail of linked list
 * Invariant: last.next == null
 * Tail 节点的不变性 last.next == null <- 尾节点的 next 是 null
 */
private transient Node<E> last;

/** ReentrantLock Condition 的常见使用方式 */
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

public KLinkedBlockingQueue(int capacity){
    if(capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity; // 指定 queue 的容量
    last = head = new Node<E>(null); // 默认的在 queue 里面 创建 一个 dummy 节点
}
PriorityBlockingQueue<E>核心源码解析

我们需要先了解PriorityQueue
参考资料:
https://en.wikipedia.org/wiki/Binary_heap
https://www.jianshu.com/p/f86851ee6b96

public class PriorityQueue<E> extends AbstractQueue<E>
    implements java.io.Serializable {
    //默认容量
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * 优先级队列表示为平衡二叉树堆:queue[n]有子树queue[2 N + 1]和queue[2(N + 1)]。
     * queue[0]为最小,每个父级节点均不大于子树
     */
    transient Object[] queue; // non-private to simplify nested class access
    //队列大小
    private int size = 0;
    private final Comparator<? super E> comparator;
    transient int modCount = 0; // non-private to simplify nested class access
    //构造函数
    public PriorityQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
    public PriorityQueue(int initialCapacity,
                         Comparator<? super E> comparator) {
        // Note: This restriction of at least one is not actually needed,
        // but continues for 1.5 compatibility
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.queue = new Object[initialCapacity];
        this.comparator = comparator;
    }

    /**
     * Initializes queue array with elements from the given Collection.
     *
     * @param c the collection
     */
    private void initFromCollection(Collection<? extends E> c) {
        initElementsFromCollection(c);
        heapify();
    }

    /**
     * Establishes the heap invariant (described above) in the entire tree,
     * assuming nothing about the order of the elements prior to the call.
     */
    @SuppressWarnings("unchecked")
    private void heapify() {
        for (int i = (size >>> 1) - 1; i >= 0; i--)
            siftDown(i, (E) queue[i]);
    }

    private void siftDown(int k, E x) {
        if (comparator != null)
            siftDownUsingComparator(k, x);
        else
            siftDownComparable(k, x);
    }

    //核心下沉代码
    private void siftDownUsingComparator(int k, E x) {
        int half = size >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = queue[child];
            int right = child + 1;
            if (right < size &&
                comparator.compare((E) c, (E) queue[right]) > 0)
                c = queue[child = right];
            if (comparator.compare(x, (E) c) <= 0)
                break;
            queue[k] = c;
            k = child;
        }
        queue[k] = x;
    }

     //核心上浮代码
    private void siftUpComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (key.compareTo((E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }
}

我们可以看到,优先级队列,里面就是使用数组进行了完全二叉树的存取,没有任何锁的操作,所以是个线程不安全的。
我们再看看PriorityBlockingQueue源码

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    private transient Object[] queue;
    private transient int size;
    private transient Comparator<? super E> comparator;


    /**
     * Lock used for all public operations
     */
    private final ReentrantLock lock;

    /**
     * Condition for blocking when empty
     */
    private final Condition notEmpty;

    /**
     * Spinlock for allocation, acquired via CAS.
     */
    private transient volatile int allocationSpinLock;
    /**
     * A plain PriorityQueue used only for serialization,
     * to maintain compatibility with previous versions
     * of this class. Non-null only during serialization/deserialization.
     */
    private PriorityQueue<E> q;

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

}

可以看到,跟PriorityQueue比较,多2个锁,和持有了PriorityQueue<E> q用于序列化。锁的方式和ArrayBlockQueue相似。

        //这里cas自旋锁
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                .....
            } finally {
                allocationSpinLock = 0;
            }

SynchronousQueue<E> 源码解析

参考资料:
https://www.jianshu.com/p/cb5e09facfee
https://www.cnblogs.com/leesf456/p/5560362.html
https://javadoop.com/post/java-concurrent-queue
https://www.cnblogs.com/dwlsxj/p/Thread.html
核心是高并发下使用链表通过CAS控制并发来实现队列和栈存取数据。

LinkedBlockingDeque 源码解析

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
   static final class Node<E> {
        /**
         *与LinkedBlockingQueue比,增加一个祖先节点
         */
        Node<E> prev;

        E item;
        Node<E> next;
        Node(E x) {
            item = x;
        }
    }

    /**
     * Pointer to first node.不变式很重要,在添加和移除时在维护
     * Invariant: (first == null && last == null) ||
     *            (first.prev == null && first.item != null)
     */
    transient Node<E> first;

    /**
     * Pointer to last node..不变式很重要,在添加和移除时在维护
     * Invariant: (first == null && last == null) ||
     *            (last.next == null && last.item != null)
     */
    transient Node<E> last;

    /** Number of items in the deque,因为用了一个锁,所以不需要原子操作,同ArrayBlockQueue */
    private transient int count;

  /** Maximum number of items in the deque,总容量控制  */
    private final int capacity;

    /** Main lock guarding all access,同ArrayBlockQueue  */
    final ReentrantLock lock = new ReentrantLock();

    /** Condition for waiting takes,同ArrayBlockQueue  */
    private final Condition notEmpty = lock.newCondition();

    /** Condition for waiting puts ,同ArrayBlockQueue */
    private final Condition notFull = lock.newCondition();

    //构造函数,只需要初始化容量
    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }


  /**
     * 头部写入,核心方法,外面加锁,当单线程使用
     */
    private boolean linkFirst(Node<E> node) {
        // assert lock.isHeldByCurrentThread();
        if (count >= capacity)
            return false;
        Node<E> f = first;
        node.next = f;
        first = node;
        if (last == null)
            last = node;
        else
            f.prev = node;
        ++count;
        notEmpty.signal();
        return true;
    }

    /**
     * 尾部写入,核心方法,外面加锁,当单线程使用
     */
    private boolean linkLast(Node<E> node) {
        // assert lock.isHeldByCurrentThread();
        if (count >= capacity)
            return false;
        Node<E> l = last;
        node.prev = l;
        last = node;
        if (first == null)
            first = node;
        else
            l.next = node;
        ++count;
        notEmpty.signal();
        return true;
    }

    /**
     * 头部读取,核心方法,外面加锁,当单线程使用
     */
    private E unlinkFirst() {
        // assert lock.isHeldByCurrentThread();
        Node<E> f = first;
        if (f == null)
            return null;
        Node<E> n = f.next;
        E item = f.item;
        f.item = null;
        f.next = f; // help GC
        first = n;
        if (n == null)
            last = null;
        else
            n.prev = null;
        --count;
        notFull.signal();
        return item;
    }

    /**
     * 尾部读取,核心方法,外面加锁,当单线程使用
     */
    private E unlinkLast() {
        // assert lock.isHeldByCurrentThread();
        Node<E> l = last;
        if (l == null)
            return null;
        Node<E> p = l.prev;
        E item = l.item;
        l.item = null;
        l.prev = l; // help GC
        last = p;
        if (p == null)
            first = null;
        else
            p.next = null;
        --count;
        notFull.signal();
        return item;
    }

    //写入方法,粗暴的一把锁。
    public boolean offerFirst(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return linkFirst(node);
        } finally {
            lock.unlock();
        }
    }

}

DelayQueue<E extends Delayed> 源码详解

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    //锁,同ArrayBlockingQueue作用及用法
    private final transient ReentrantLock lock = new ReentrantLock();
    private final Condition available = lock.newCondition();
    //存储数据,使用最小堆
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    /**
     * Thread designated to wait for the element at the head of
     * the queue.  This variant of the Leader-Follower pattern
     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
     * minimize unnecessary timed waiting.  When a thread becomes
     * the leader, it waits only for the next delay to elapse, but
     * other threads await indefinitely.  The leader thread must
     * signal some other thread before returning from take() or
     * poll(...), unless some other thread becomes leader in the
     * interim.  Whenever the head of the queue is replaced with
     * an element with an earlier expiration time, the leader
     * field is invalidated by being reset to null, and some
     * waiting thread, but not necessarily the current leader, is
     * signalled.  So waiting threads must be prepared to acquire
     * and lose leadership while waiting.
     */
    private Thread leader = null;

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    if (nanos <= 0)
                        return null;
                    first = null; // don't retain ref while waiting
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
}



其中leader这个解释下
https://stackoverflow.com/questions/48493830/what-exactly-is-the-leader-used-for-in-delayqueue

DelayedWorkQueue源码

    static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
        //初始容量
        private static final int INITIAL_CAPACITY = 16;
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition available = lock.newCondition();
        //队列中元素
        private int size = 0;
        private Thread leader = null;
}
上一篇 下一篇

猜你喜欢

热点阅读