JAVA中的队列

2017-09-27  本文已影响498人  娃娃要从孩子抓起

前言

最近写一个简单版的网络框架,遇到一个这样的场景,如果有多个请求,那么我们需要把请求都放入一个队列当中。队列正常来说是FIFO(先进先出),在java中也定义了一个Queue接口,但是一看JAVA API文档发现,有一堆的实现类,那么我们该使用哪个类来维护用户的请求呢?

结构图

队列.png

以上是我对照JAVA API文档画的一个类图。把常用的类和接口都标了出来。首先BlockingQueue和Deque接口都继承了Queue,它们一个是阻塞队列,一个是双端队列。接着就是它们各自的实现类,我们看看每一个的特点。

BlockingQueue

BlockingQueue从字面意思就知道它是阻塞队列,它在以下两种情况会造成阻塞:
1.当队列满了的时候进行入队操作。
2.当队列空了的时候进行出队操作。
也就是说,当一个线程对已经满了的队列进行入队操作时,会被阻塞,除非另外一个线程进行了出队操作。或者当一个线程对一个空的队列进行出队操作的时候,会被阻塞,除非另外一个线程进行了入队的操作。

阻塞队列是线程安全的,主要用于生产者/消费者的场景。比如一个线程在队尾进行put操作,另外一个线程在队头进行take操作。需要注意的是BlockingQueue不能插入Null值,否则会报NullPointerException异常。

LinkedBlockingQueue

LinkedBlockingQueue阻塞队列的大小如果不指定默认为Integer的最大值。

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

 public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

内部采用链表实现

  /**
     * Linked list node class.
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

从入队出队的代码中我们可以发现,LinkedBlockingQueue是采用先进先出的方式存储数据,也就是在队尾入队,在队头出队。

  /**
     * Links node at end of queue.
     *
     * @param node the node
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

    /**
     * Removes a node from head of queue.
     *
     * @return the node
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

ArrayBlockingQueue

ArrayBlockingQueue阻塞队列必须在创建的时候指定队列的大小,内部采用数组的实现方式,并且跟LinkedBlockingQueue一样,都是采用先进先出的方式。

 public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c)
                    items[i++] = Objects.requireNonNull(e);
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

PriorityBlockingQueue

PriorityBlockingQueue默认的初始化大小只有11,允许分配的最大size为integer的最大值-8,内部采用数组实现,可以插入NULL值,所有插入PriorityBlockingQueue的值都必须实现Comparable接口,因为队列的优先顺序就是按照这个规则来实现的。

  /**
     * Default array capacity.
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * The maximum size of array to allocate.
     * Some VMs reserve some header words in an array.
     * Attempts to allocate larger arrays may result in
     * OutOfMemoryError: Requested array size exceeds VM limit
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     */
    private transient Object[] queue;

SynchronousQueue

SynchronousQueue是一种没有数据缓冲的BlockingQueue,生产者的put操作必须等待消费者的take操作,反过来也一样。(
producer waits until consumer is ready, consumer waits until producer is ready。)
SynchronousQueue使用了一种无锁算法“Dual stack and Dual queue“来实现阻塞操作,在创建SynchronousQueue对象的时候可以选择竞争机制,是公平竞争还是非公平竞争,如果不指定默认为非公平竞争。

 /**
     * Creates a {@code SynchronousQueue} with nonfair access policy.
     */
    public SynchronousQueue() {
        this(false);
    }

    /**
     * Creates a {@code SynchronousQueue} with the specified fairness policy.
     *
     * @param fair if true, waiting threads contend in FIFO order for
     *        access; otherwise the order is unspecified.
     */
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

非公平竞争使用的是先进后出,公平竞争使用的是先进先出。但是两种模式内部都是使用链表来实现。

 /** Node class for TransferQueue. */
        static final class QNode {
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;

            QNode(Object item, boolean isData) {
                this.item = item;
                this.isData = isData;
            }

            boolean casNext(QNode cmp, QNode val) {
                return next == cmp &&
                    U.compareAndSwapObject(this, NEXT, cmp, val);
            }

            boolean casItem(Object cmp, Object val) {
                return item == cmp &&
                    U.compareAndSwapObject(this, ITEM, cmp, val);
            }

            /**
             * Tries to cancel by CAS'ing ref to this as item.
             */
            void tryCancel(Object cmp) {
                U.compareAndSwapObject(this, ITEM, cmp, this);
            }

            boolean isCancelled() {
                return item == this;
            }

            /**
             * Returns true if this node is known to be off the queue
             * because its next pointer has been forgotten due to
             * an advanceHead operation.
             */
            boolean isOffList() {
                return next == this;
            }

            // Unsafe mechanics
            private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
            private static final long ITEM;
            private static final long NEXT;

            static {
                try {
                    ITEM = U.objectFieldOffset
                        (QNode.class.getDeclaredField("item"));
                    NEXT = U.objectFieldOffset
                        (QNode.class.getDeclaredField("next"));
                } catch (ReflectiveOperationException e) {
                    throw new Error(e);
                }
            }
        }


/** Node class for TransferStacks. */
        static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // the node matched to this
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;
            // Note: item and mode fields don't need to be volatile
            // since they are always written before, and read after,
            // other volatile/atomic operations.

            SNode(Object item) {
                this.item = item;
            }

            boolean casNext(SNode cmp, SNode val) {
                return cmp == next &&
                    U.compareAndSwapObject(this, NEXT, cmp, val);
            }

            /**
             * Tries to match node s to this node, if so, waking up thread.
             * Fulfillers call tryMatch to identify their waiters.
             * Waiters block until they have been matched.
             *
             * @param s the node to match
             * @return true if successfully matched to s
             */
            boolean tryMatch(SNode s) {
                if (match == null &&
                    U.compareAndSwapObject(this, MATCH, null, s)) {
                    Thread w = waiter;
                    if (w != null) {    // waiters need at most one unpark
                        waiter = null;
                        LockSupport.unpark(w);
                    }
                    return true;
                }
                return match == s;
            }

            /**
             * Tries to cancel a wait by matching node to itself.
             */
            void tryCancel() {
                U.compareAndSwapObject(this, MATCH, null, this);
            }

            boolean isCancelled() {
                return match == this;
            }

            // Unsafe mechanics
            private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
            private static final long MATCH;
            private static final long NEXT;

            static {
                try {
                    MATCH = U.objectFieldOffset
                        (SNode.class.getDeclaredField("match"));
                    NEXT = U.objectFieldOffset
                        (SNode.class.getDeclaredField("next"));
                } catch (ReflectiveOperationException e) {
                    throw new Error(e);
                }
            }
        }

DelayQueue

DelayQueue=BlockingQueue+PriorityQueue+Delayed。从这个等式中可以看出所有DelayQueue的特性:阻塞,有序,延迟。DelayQueue中的元素必须实现java.util.concurrent.Delayed这个接口。

public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

这个接口的定义非常简单,getDelay返回值就是队列中元素延迟释放的时间。如果返回值是0或者是一个负值,那么就说明该元素到了要释放的时间,就会通过take方法释放该元素。

 /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0L)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

DelayQueue的使用场景很多,例如:
a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

Deque

Deque的含义是”double ended queue“,即双端队列。
双端队列是一种具有队列和栈性质的数据结构。元素可以从队列的两端进行插入,弹出和删除操作。在java的util包中定义为接口。

LinkedList

LinkedList同时实现了List和Deque接口。所以它既可以看作是一个顺序容器,又可以看作是一个队列,还可以看作是一个栈。当你想使用栈的时候,可以考虑一下LinkedList,因为官方已经不建议使用Stack。

private static class Node<E> {
        E item;
        Node<E> next;
        Node<E> prev;

        Node(Node<E> prev, E element, Node<E> next) {
            this.item = element;
            this.next = next;
            this.prev = prev;
        }
    }

LinkedList内部使用双向链表来实现。

ArrayDeque

ArrayDeque是用数组实现的双端队列,创建的时候可以指定队列的容量,队列最小容量为8。ArrayDeque的效率比LinkedList的效率高,所以优先考虑使用。

总结

看了这么多类的介绍,那么我们在网络框架中到底使用哪一个来维护用户的请求比较好呢?窃以为如果用户的请求有优先级的话,那么可以考虑使用PriorityBlockingQueue,如果有过期的需求可以考虑DelayQueue。如果想更灵活的话可以使用双端队列中的ArrayDeque也是很高效的。

如果你觉得本篇文章帮助到了你,希望大爷能够给瓶买水钱。
本文为原创文章,转载请注明出处!

上一篇 下一篇

猜你喜欢

热点阅读