7. BlockingQueue

java.util.concurrent包中的Java BlockingQueue接口表示一个线程可以安全放入以及从中获取实例的队列。在本文中,我将向你展示如何使用BlockingQueue

BlockingQueue 使用




BlockingQueue 方法


抛出异常 返回特殊值 阻塞 超时
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
删除 remove(o) poll() take() poll(timeout, timeunit)
访问 element() peek()


  1. 抛出异常:
  2. 特殊值:
    如果请求的操作现在无法完成,则返回特殊值(一般为 true / false).
  3. 阻塞:
  4. 超时:
    如果请求的操作现在无法完成,则方法调用将阻塞直到它能够进行,但等待不超过给定的超时。返回一个特殊值,告知操作是否成功(通常为true / false)



BlockingQueue 实现


Java BlockingQueue 示例

这是一个Java BlockingQueue示例。该示例使用实现BlockingQueue接口的ArrayBlockingQueue类。

首先, BlockingQueueExample类在不同的线程中启动ProducerConsumerProducer将一个字符串插入共享的BlockingQueue,而Consumer使用它们。

public class BlockingQueueExample {

    public static void main(String[] args) throws Exception {

        BlockingQueue queue = new ArrayBlockingQueue(1024);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();



public class Producer implements Runnable{

    protected BlockingQueue queue = null;

    public Producer(BlockingQueue queue) {
        this.queue = queue;

    public void run() {
        try {
        } catch (InterruptedException e) {


public class Consumer implements Runnable{

    protected BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;

    public void run() {
        try {
        } catch (InterruptedException e) {


import org.junit.Test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingExample {
    public void test() throws Exception {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        Thread thread1 = new Thread(producer);
        Thread thread2 = new Thread(consumer);


    private static class Producer implements Runnable {
        private BlockingQueue<String> queue;

        Producer(BlockingQueue<String> queue) {
            this.queue = queue;

        public void run() {
            try {
            } catch (InterruptedException e) {

    private static class Consumer implements Runnable {
        private BlockingQueue<String> queue = null;

        Consumer(BlockingQueue<String> queue) {
            this.queue = queue;

        public void run() {
            try {
            } catch (InterruptedException e) {


可以看出执行了2s 15ms才执行完

下面是BlockingQueue的源码。注意每个方法文档中提及的异常,同时此接口还提供了两个方法int drainTo(Collection<? super E> c)int drainTo(Collection<? super E> c, int maxElements),这两个方式的作用是将队列中的元素增加到参数指定的集合中。

public interface BlockingQueue<E> extends Queue<E> {
     * 将指定值插入到队列中,如果没有超出容量限制那么立刻完成并返回
     * {@code true} 成功
     * {@code IllegalStateException} 如果现在没有足够容量
     * 当使用一个容量限制的队列,使用{@link #offer(Object) offer}更好
     * @param e the element to add
     * @return {@code true} (as specified by {@link Collection#add})
     * @throws IllegalStateException if the element cannot be added at this
     *         time due to capacity restrictions
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
    boolean add(E e);

     * Inserts the specified element into this queue if it is possible to do
     * so immediately without violating capacity restrictions, returning
     * {@code true} upon success and {@code false} if no space is currently
     * available.  When using a capacity-restricted queue, this method is
     * generally preferable to {@link #add}, which can fail to insert an
     * element only by throwing an exception.
     * @param e the element to add
     * @return {@code true} if the element was added to this queue, else
     *         {@code false}
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
    boolean offer(E e);

     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
    void put(E e) throws InterruptedException;

     * Inserts the specified element into this queue, waiting up to the
     * specified wait time if necessary for space to become available.
     * @param e the element to add
     * @param timeout how long to wait before giving up, in units of
     *        {@code unit}
     * @param unit a {@code TimeUnit} determining how to interpret the
     *        {@code timeout} parameter
     * @return {@code true} if successful, or {@code false} if
     *         the specified waiting time elapses before space is available
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element becomes available.
     * @return the head of this queue
     * @throws InterruptedException if interrupted while waiting
    E take() throws InterruptedException;

     * Retrieves and removes the head of this queue, waiting up to the
     * specified wait time if necessary for an element to become available.
     * @param timeout how long to wait before giving up, in units of
     *        {@code unit}
     * @param unit a {@code TimeUnit} determining how to interpret the
     *        {@code timeout} parameter
     * @return the head of this queue, or {@code null} if the
     *         specified waiting time elapses before an element is available
     * @throws InterruptedException if interrupted while waiting
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

     * Returns the number of additional elements that this queue can ideally
     * (in the absence of memory or resource constraints) accept without
     * blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
     * limit.
     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
     * an element will succeed by inspecting {@code remainingCapacity}
     * because it may be the case that another thread is about to
     * insert or remove an element.
     * @return the remaining capacity
    int remainingCapacity();

     * Removes a single instance of the specified element from this queue,
     * if it is present.  More formally, removes an element {@code e} such
     * that {@code o.equals(e)}, if this queue contains one or more such
     * elements.
     * Returns {@code true} if this queue contained the specified element
     * (or equivalently, if this queue changed as a result of the call).
     * @param o element to be removed from this queue, if present
     * @return {@code true} if this queue changed as a result of the call
     * @throws ClassCastException if the class of the specified element
     *         is incompatible with this queue
     * (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
     * @throws NullPointerException if the specified element is null
     * (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
    boolean remove(Object o);

     * Returns {@code true} if this queue contains the specified element.
     * More formally, returns {@code true} if and only if this queue contains
     * at least one element {@code e} such that {@code o.equals(e)}.
     * @param o object to be checked for containment in this queue
     * @return {@code true} if this queue contains the specified element
     * @throws ClassCastException if the class of the specified element
     *         is incompatible with this queue
     * (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
     * @throws NullPointerException if the specified element is null
     * (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
    boolean contains(Object o);

     * 从队列中删除所有能访问的元素并且将他们增加到指定的集合中。
     * 这个操作可能比重复的调用poll方法效率更高。当尝试将元素增加
     * 到集合中时如果失败啊,那么可能队列和集合中都包含或都不包含
     * 此元素。如果指定的集合是它自己,那么抛出一个IllegalArgumentException。
     * 当此操作进行时,如果指定的集合被修改,那么这个操作将是未定义的。
     * @param c the collection to transfer elements into
     * @return the number of elements transferred
     * @throws UnsupportedOperationException if addition of elements
     *         is not supported by the specified collection
     * @throws ClassCastException if the class of an element of this queue
     *         prevents it from being added to the specified collection
     * @throws NullPointerException if the specified collection is null
     * @throws IllegalArgumentException if the specified collection is this
     *         queue, or some property of an element of this queue prevents
     *         it from being added to the specified collection
    int drainTo(Collection<? super E> c);

     * Removes at most the given number of available elements from
     * this queue and adds them to the given collection.  A failure
     * encountered while attempting to add elements to
     * collection {@code c} may result in elements being in neither,
     * either or both collections when the associated exception is
     * thrown.  Attempts to drain a queue to itself result in
     * {@code IllegalArgumentException}. Further, the behavior of
     * this operation is undefined if the specified collection is
     * modified while the operation is in progress.
     * @param c the collection to transfer elements into
     * @param maxElements the maximum number of elements to transfer
     * @return the number of elements transferred
     * @throws UnsupportedOperationException if addition of elements
     *         is not supported by the specified collection
     * @throws ClassCastException if the class of an element of this queue
     *         prevents it from being added to the specified collection
     * @throws NullPointerException if the specified collection is null
     * @throws IllegalArgumentException if the specified collection is this
     *         queue, or some property of an element of this queue prevents
     *         it from being added to the specified collection
    int drainTo(Collection<? super E> c, int maxElements);

