程序员

[Java源码][并发J.U.C]---阻塞队列Priority

2018-10-25  本文已影响0人  nicktming

前言

PriorityBlockingQueue是一个支持优先级排序的无界阻塞队列.

实现思路

无并发情况

针对没有并发情况下,其实就是我们熟悉的PriorityQueue, 对此请参考我的另外一篇博客 PriorityQueue 源码解析 .

并发情况

PriorityBlockingQueuePriorityQueue的一个升级版, 可以处理多线程情况.
PriorityBlockingQueue 是通过重入锁ReentrantLock和一个Condition notEmpty来实现的, 在操作过程需要用加锁来解决, 当队列为空需要消费元素时可以使用notEmpty.await()来使得该线程休眠等待, 另外由于该BlockingQueue是无界的, 所以并没有看到notFull.

源码

理解了基本思路后, 其实就是在PriorityQueue PriorityQueue 源码解析 中加上了锁的操作.

属性

/**
     * Default array capacity.
     * 默认的数组空间
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     *
     * 数组可申请的最大空间
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * 存储数据的数组
     */
    private transient Object[] queue;

    /**
     * 当前队列的元素个数
     */
    private transient int size;

    /**
     * 优先队列的比较器
     */
    private transient Comparator<? super E> comparator;

    /**
     * 重入锁
     */
    private final ReentrantLock lock;

    /**
     * 当队列为空时需要用到
     */
    private final Condition notEmpty;

    /**
     * Spinlock for allocation, acquired via CAS.
     *
     * 用来申请空间 注意是volatile
     */
    private transient volatile int allocationSpinLock;

构造方法

public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    /**
     * Creates a {@code PriorityBlockingQueue} with the specified
     * initial capacity that orders its elements according to their
     * {@linkplain Comparable natural ordering}.
     *
     * @param initialCapacity the initial capacity for this priority queue
     * @throws IllegalArgumentException if {@code initialCapacity} is less
     *         than 1
     */
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    /**
     *
     * 如果初始化空间小于1 抛出运行时异常IllegalArgumentException
     *
     * @param initialCapacity the initial capacity for this priority queue
     * @param  comparator the comparator that will be used to order this
     *         priority queue.  If {@code null}, the {@linkplain Comparable
     *         natural ordering} of the elements will be used.
     * @throws IllegalArgumentException if {@code initialCapacity} is less
     *         than 1
     */
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator; // 如果为null则使用对象的natural ordering
        this.queue = new Object[initialCapacity];
    }

辅助函数

private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }

    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                                  Comparator<? super T> cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }

    /**
     * Inserts item x at position k, maintaining heap invariant by
     * demoting x down the tree repeatedly until it is less than or
     * equal to its children or is a leaf.
     *
     *
     * @param k the position to fill
     * @param x the item to insert
     * @param array the heap array
     * @param n heap size
     */
    private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                int right = child + 1;
                if (right < n &&
                        ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }

    private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                    int n,
                                                    Comparator<? super T> cmp) {
        if (n > 0) {
            int half = n >>> 1;
            while (k < half) {
                int child = (k << 1) + 1;
                Object c = array[child];
                int right = child + 1;
                if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                    c = array[child = right];
                if (cmp.compare(x, (T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = x;
        }
    }

这些函数在PriorityQueue 源码解析 中已经详细分析过.

加入元素

/**
     * 将元素e加入到优先队列中
     * true  表示加入成功
     * false 表示加入失败
     *
     *  以下情况下会抛出运行时异常:
     *  1. e为null时会抛出NullPointerException异常
     *  2. 如果元素e不能被比较 会抛出ClassCastException异常
     *  3. 无法扩容是 抛出OutofMemoryException异常
     *
     * @param e the element to add
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        // 扩容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1; // size加1
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

    /**
     * 因为默认情况下该优先队列是无界
     * 当无法扩容时 会抛出OutofMemoryException
     * 所以直接调用offer方法即可
     *  以下情况下会抛出运行时异常:
     *  1. e为null时会抛出NullPointerException异常
     *  2. 如果元素e不能被比较 会抛出ClassCastException异常
     *  3. 无法扩容是 抛出OutofMemoryException异常
     * @param e the element to add
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public void put(E e) {
        offer(e); // never need to block
    }

    /**
     *
     * 因为无界 所以也不存在无法增加情况 所以直接调用offer方法即可
     * 另外以下情况下会抛出运行时异常:
     *  1. e为null时会抛出NullPointerException异常
     *  2. 如果元素e不能被比较 会抛出ClassCastException异常
     *  3. 无法扩容是 抛出OutofMemoryException异常
     * @param e the element to add
     * @param timeout This parameter is ignored as the method never blocks
     * @param unit This parameter is ignored as the method never blocks
     * @return {@code true} (as specified by
     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e); // never need to block
    }

可以看到在put方法中获得锁后最后调用siftUpComparable或者siftUpUsingComparator方法,这两个方法在单线程中已经有所讲解. 可以看到的是PriorityQueue是无界的,所以putoffer都是在不发生异常的情况下始终可以加入元素的.

获取元素

/**
     * Mechanics for poll().  Call only while holding lock.
     *
     * 出队列
     * 跟正常堆操作出堆一样
     *
     */
    private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }
/**
     * 从优先队列中取优先级最高的元素 如果队列为空会一直等待
     * 以下情况会出现异常:
     * 1. 获得锁或者在因为队列空而休眠等待的过程中被其他线程中断
     *
     * @return
     * @throws InterruptedException
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

最终还是到了siftDownComparable或者siftDownUsingComparator方法. poll如果队列空的时候会立即返回null不会等待.就不多说了,

删除元素

/**
     * Removes the ith element from queue.
     * 删除下标为i的元素
     */
    private void removeAt(int i) {
        Object[] array = queue;
        int n = size - 1;
        if (n == i) // removed last element
            array[i] = null;
        else {
            E moved = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(i, moved, array, n);
            else
                siftDownUsingComparator(i, moved, array, n, cmp);
            if (array[i] == moved) {
                if (cmp == null)
                    siftUpComparable(i, moved, array);
                else
                    siftUpUsingComparator(i, moved, array, cmp);
            }
        }
        size = n;
    }

    /**
     * Removes a single instance of the specified element from this queue,
     * if it is present.  More formally, removes an element {@code e} such
     * that {@code o.equals(e)}, if this queue contains one or more such
     * elements.  Returns {@code true} if and only if this queue contained
     * the specified element (or equivalently, if this queue changed as a
     * result of the call).
     *
     * @param o element to be removed from this queue, if present
     * @return {@code true} if this queue changed as a result of the call
     */
    public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = indexOf(o);
            if (i == -1)
                return false;
            removeAt(i);
            return true;
        } finally {
            lock.unlock();
        }
    }

都是获得锁后执行的就是PriorityQueue 源码解析 单线程的方法.

遍历元素

PriorityQueue遍历有所不同的是, PriorityQueue会事先copy一份当前队列中的数据给此iterator,即使在遍历过程中删除某个元素也不会影响此iterator中的元素.

public Iterator<E> iterator() {
        return new Itr(toArray());
    }

    /**
     * Snapshot iterator that works off copy of underlying q array.
     */
    final class Itr implements Iterator<E> {
        final Object[] array; // Array of all elements
        int cursor;           // index of next element to return
        int lastRet;          // index of last element, or -1 if no such

        Itr(Object[] array) {
            lastRet = -1;
            this.array = array;
        }

        public boolean hasNext() {
            return cursor < array.length;
        }

        public E next() {
            if (cursor >= array.length)
                throw new NoSuchElementException();
            lastRet = cursor;
            return (E)array[cursor++];
        }

        public void remove() {
            if (lastRet < 0)
                throw new IllegalStateException();
            removeEQ(array[lastRet]);
            lastRet = -1;
        }
    }

例子如下

package com.priorityblockingqueue;

import java.util.Comparator;
import java.util.Iterator;

public class Test01 {
    public static void main(String[] args) {
        PriorityBlockingQueue<Integer> pq = new PriorityBlockingQueue<>(10, new Comparator<Integer>() {
            @Override
            public int compare(Integer o1, Integer o2) {
                return o1 - o2;
            }
        });
        pq.add(1);
        pq.add(3);
        pq.add(12);
        pq.add(4);
        pq.add(6);
        pq.add(17);
        pq.add(13);
        pq.add(8);
        pq.add(5);
        pq.add(10);
        pq.add(11);
        pq.add(19);
        pq.add(23);
        pq.add(14);

        System.out.println(pq);

        Iterator<Integer> iter = pq.iterator();
        while (iter.hasNext()) {
            int val = iter.next();
            if (val == 23) iter.remove();
            System.out.print(val + " ");
        }
    }
}

输出如下:

[1, 3, 12, 4, 6, 17, 13, 8, 5, 10, 11, 19, 23, 14]
1 3 12 4 6 17 13 8 5 10 11 19 23 14 

参考

1. Java 1.8

上一篇 下一篇

猜你喜欢

热点阅读