阻塞队列(BlockingQueue)

2017-08-15  本文已影响0人  千释炎

  BlockingQueue是java.until.concurrent包下的一个重要的数据结构,其继承于Queue接口,Queue继承于Collection。BlockingQueue提供了对数据的线程安全操作,其方法如下:

/**
  * Inserts the specified element into this queue if   it is possible to do so
  * immediately without violating capacity  restrictions, returning
  * {@code true} upon success and throwing an {@code  IllegalStateException}
  * if no space is currently available.
  *往队列中插入一个特定元素,如果空间允许,一旦成功就返回true,如果当前没有 
  可用空间,则抛出IllegalStateException异常
  ...
 */
boolean add(E e);

/**
 * Inserts the specified element into this queue if it is possible to do
 * so immediately without violating capacity restrictions.
 * When using a capacity-restricted queue, this method is generally
 * preferable to {@link #add}, which can fail to insert an element only
 * by throwing an exception.
 *往队列中插入一个元素,如果没有空间限制,在使用有容量限制的队列时,该方法比
 add()方法更可取,因为add方法失败时只是抛出异常,而offer会返回false
 ...
 */
boolean offer(E e);


/**
 * Inserts the specified element into this queue, waiting if necessary
 * for space to become available.
 *往队列中插入一个元素,与前两个方法不同的是,如果当前没有空间可用,则阻塞当  
 *前线程,直至有有空间
...
 */
void put(E e) throws InterruptedException;


/**
 * Inserts the specified element into this queue, waiting up to the
 * specified wait time if necessary for space to become available.
 往队列中插入元素,可以设置等待时间,如果无空间可用,则等待指定时间。若指定   
 时间后还没有空间可用,则返回false
 *
...
 */
boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;


/**
 * Retrieves and removes the head of this queue,   waiting if necessary
 * until an element becomes available.
 *获取并从队列中移除排在首位的元素,如果队列为空,则阻塞当前线程直到有新的元素加入
 * @return the head of this queue
 * @throws InterruptedException if interrupted while waiting
 */
E take() throws InterruptedException;


/**
 * Retrieves and removes the head of this queue, waiting up to the
 * specified wait time if necessary for an element to become available.
 *获取并移除队列中处在首位的数据,如果队列为空,则等待指定时间,等待指定时间后,若不为空,则返回对首数据,否则返回null
 * @param timeout how long to wait before giving up, in units of
 *        {@code unit}
 * @param unit a {@code TimeUnit} determining how to interpret the
 *        {@code timeout} parameter
 * @return the head of this queue, or {@code null} if the
 *         specified waiting time elapses before an element is available
 * @throws InterruptedException if interrupted while waiting
 */
E poll(long timeout, TimeUnit unit)throws InterruptedException;

boolean remove(Object o);  //从队列中移除指定元素


/**
 * Returns the number of additional elements that this queue can ideally
 * (in the absence of memory or resource constraints) accept without
 * blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
 * limit.
 *返回队列中元可容纳元素的最大数量,如果没有内部限制,则返回Integer.MAX_VALUE
 *
 * @return the remaining capacity
 */
int remainingCapacity();


public boolean contains(Object o);   
//判断队列中是否包含某特定元素


int drainTo(Collection<? super E> c);  
//移除队列中的所有元素并加入到指定集合中,返回传输的元素个数


int drainTo(Collection<? super E> c, int maxElements);   
//从这个队列中最多删除指定数量的可用元素,并将它们添加到给定的集合中

  BlockingQueue是一个接口,java并发包下提供了其多种实现:
1.ArrayBlockingQueue

ArrayBlockingQueue是一个基于数组的先进先出有界阻塞队列,其内部维护一个定长数组:

/** The queued items */  
final Object[] items;  //存放元素的数组

/** items index for next take, poll, peek or remove */
int takeIndex;   //队首元素的索引

/** items index for next put, offer, or add */
int putIndex;  //下一个加入的元素存放的索引

  由于ArrayBlockingQueue放入数据和获取数据使用的是同一个锁对象,因此两者无法真正并行运行,创建ArrayBlockingQueue对象时,可以控制对象的内部所是否采用公平锁,默认采用非公平锁.
2.LinkedBlockingQueue

LinkedBlockingQueue是一个基于单向链表的阻塞队列,其内部也维护一个数据缓冲队列(由链表组成)。只有缓冲队列区容量达到最大 缓存容量时,才会阻塞生产者队列,直到消费者线程从队列中消费一份数据。与ArrayBlockingQueue不同的是,LinkedBlockingQueue内部采用不同的锁来放入数据和获取数据,因此在高并发场合可以更高效地处理数据。
需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

/**
 * Creates a {@code LinkedBlockingQueue} with a capacity of
 * {@link Integer#MAX_VALUE}.
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

3.DelayQueue

DelayQueue是一个无界的阻塞队列,队列中的元素只有当其指定的延迟时间到了,才能从队列中获取该元素。因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。其内部实现使用一个优先队列,调用offer方法时,将delay对象加入到优先队列中:

/**
 * Inserts the specified element into this delay queue.
 *
 * @param e the element to add
 * @return {@code true}
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

调用take方法从优先队列中将first移出,如果没有达到延迟时间,则进行await处理:

/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element with an expired delay is available on this queue.
 *
 * @return the head of this queue
 * @throws InterruptedException {@inheritDoc}
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

4.PriorityBlockingQueue

PriorityBlockingQueue是一个基于优先级的无界阻塞队列,里面的对象必须实现Comparable接口,队列通过这个接口的compare方法确定对象的优先级,内部控制线程同步的锁采用的是公平锁。
该队列offer方法如下:

/**
 * Inserts the specified element into this priority queue.
 * As the queue is unbounded, this method will never return {@code false}.
 * 由于是无界的,故该方法始终返回true,且不会阻塞
 * @param e the element to add
 * @return {@code true} (as specified by {@link Queue#offer})
 * @throws ClassCastException if the specified element cannot be compared
 *         with elements currently in the priority queue according to the
 *         priority queue's ordering
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;

该队列take方法如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

当队列为空时,会阻塞当前线程,知道有新的数据加入队列

5.SynchronousQueue

SynchronousQueue是一种无缓冲的等待队列,内部使用公平锁,其容量为0,不允许存放null元素。任何一个对SynchronousQueue的写需要等待一个对SynchronousQueue的读,类似于在CSP和Ada中使用的会合通道。与其说他是一个队列,不如说他是一个数据交换通道。

使用场景:
SynchronousQueue非常适合于切换设计,其中一个运行在一个线程中的对象必须与在另一个线程中运行的对象同步,以便传递一些信息、事件或任务

上一篇下一篇

猜你喜欢

热点阅读