7. BlockingQueue

2019-03-20  本文已影响0人  shallowinggg

java.util.concurrent包中的Java BlockingQueue接口表示一个线程可以安全放入以及从中获取实例的队列。在本文中,我将向你展示如何使用BlockingQueue

BlockingQueue 使用

一个BlockingQueue通常用于在线程上生成对象,另一个线程消耗对象。这是一个说明这个原则的图表:

生产线程将一直生成新对象并将它们插入队列,直到达到队列的容量上限。如果阻塞队列达到其上限,则在尝试插入新对象时会阻止生产线程。它将一直被阻塞,直到消费线程将一个对象从队列中取出。

消费线程不断将对象从阻塞队列中取出并处理它们。如果消费线程试图将对象从空队列中取出实例,那么消费线程将被阻塞,直到生产线程向队列放入一个对象。

BlockingQueue 方法

BlockingQueue有4组不同的方法用于插入,删除和检查队列中的元素。当不能立即执行所请求的操作时,每组方法的行为会不同。这是一个方法表:

抛出异常 返回特殊值 阻塞 超时
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
删除 remove(o) poll() take() poll(timeout, timeunit)
访问 element() peek()

这4种不同的行为意味着:

  1. 抛出异常:
    如果请求的操作现在无法完成,则抛出异常。
  2. 特殊值:
    如果请求的操作现在无法完成,则返回特殊值(一般为 true / false).
  3. 阻塞:
    如果请求的操作现在无法完成,则方法调用将阻塞,直到操作能够进行。
  4. 超时:
    如果请求的操作现在无法完成,则方法调用将阻塞直到它能够进行,但等待不超过给定的超时。返回一个特殊值,告知操作是否成功(通常为true / false)

无法插入nullBlockingQueue中。如果你尝试插入nullBlockingQueue则会抛出一个NullPointerException异常。

你可以访问BlockingQueue内的所有元素,而不仅仅是开头和结尾的元素。例如,假设你已将一个对象入队等待处理,但你的应用程序决定取消它。你可以调用remove(o)这样的操作来删除队列中的特定对象。但是,这是个效率很低的操作,所以除非你真的需要,否则你不应该使用Collection中的这些方法。

BlockingQueue 实现

由于BlockingQueue是一个接口,你需要使用它的一个实现来使用它。java.util.concurrent包中具有以下实现BlockingQueue接口的类:

Java BlockingQueue 示例

这是一个Java BlockingQueue示例。该示例使用实现BlockingQueue接口的ArrayBlockingQueue类。

首先, BlockingQueueExample类在不同的线程中启动ProducerConsumerProducer将一个字符串插入共享的BlockingQueue,而Consumer使用它们。

public class BlockingQueueExample {

    public static void main(String[] args) throws Exception {

        BlockingQueue queue = new ArrayBlockingQueue(1024);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();

        Thread.sleep(4000);
    }
}

这是Producer类。注意每次put()调用之间它都会睡一秒钟。这将导致Consumer阻塞,为了等待获取队列中的对象。

public class Producer implements Runnable{

    protected BlockingQueue queue = null;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这是Consumer类。它从队列中取出对象,然后将它们打印到System.out

public class Consumer implements Runnable{

    protected BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

下面是一个测试:

import org.junit.Test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingExample {
    @Test
    public void test() throws Exception {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        Thread thread1 = new Thread(producer);
        Thread thread2 = new Thread(consumer);

        thread1.start();
        thread2.start();
        
        thread1.join();
        thread2.join();
    }

    private static class Producer implements Runnable {
        private BlockingQueue<String> queue;

        Producer(BlockingQueue<String> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                queue.put("1");
                Thread.sleep(1000);
                queue.put("2");
                Thread.sleep(1000);
                queue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static class Consumer implements Runnable {
        private BlockingQueue<String> queue = null;

        Consumer(BlockingQueue<String> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                System.out.println(queue.take());
                System.out.println(queue.take());
                System.out.println(queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

结果如下:


可以看出执行了2s 15ms才执行完

下面是BlockingQueue的源码。注意每个方法文档中提及的异常,同时此接口还提供了两个方法int drainTo(Collection<? super E> c)int drainTo(Collection<? super E> c, int maxElements),这两个方式的作用是将队列中的元素增加到参数指定的集合中。

public interface BlockingQueue<E> extends Queue<E> {
    /**
     * 将指定值插入到队列中,如果没有超出容量限制那么立刻完成并返回
     * {@code true} 成功
     * {@code IllegalStateException} 如果现在没有足够容量
     * 当使用一个容量限制的队列,使用{@link #offer(Object) offer}更好
     *
     * @param e the element to add
     * @return {@code true} (as specified by {@link Collection#add})
     * @throws IllegalStateException if the element cannot be added at this
     *         time due to capacity restrictions
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    boolean add(E e);

    /**
     * Inserts the specified element into this queue if it is possible to do
     * so immediately without violating capacity restrictions, returning
     * {@code true} upon success and {@code false} if no space is currently
     * available.  When using a capacity-restricted queue, this method is
     * generally preferable to {@link #add}, which can fail to insert an
     * element only by throwing an exception.
     *
     * @param e the element to add
     * @return {@code true} if the element was added to this queue, else
     *         {@code false}
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    boolean offer(E e);

    /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;

    /**
     * Inserts the specified element into this queue, waiting up to the
     * specified wait time if necessary for space to become available.
     *
     * @param e the element to add
     * @param timeout how long to wait before giving up, in units of
     *        {@code unit}
     * @param unit a {@code TimeUnit} determining how to interpret the
     *        {@code timeout} parameter
     * @return {@code true} if successful, or {@code false} if
     *         the specified waiting time elapses before space is available
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element becomes available.
     *
     * @return the head of this queue
     * @throws InterruptedException if interrupted while waiting
     */
    E take() throws InterruptedException;

    /**
     * Retrieves and removes the head of this queue, waiting up to the
     * specified wait time if necessary for an element to become available.
     *
     * @param timeout how long to wait before giving up, in units of
     *        {@code unit}
     * @param unit a {@code TimeUnit} determining how to interpret the
     *        {@code timeout} parameter
     * @return the head of this queue, or {@code null} if the
     *         specified waiting time elapses before an element is available
     * @throws InterruptedException if interrupted while waiting
     */
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * Returns the number of additional elements that this queue can ideally
     * (in the absence of memory or resource constraints) accept without
     * blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
     * limit.
     *
     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
     * an element will succeed by inspecting {@code remainingCapacity}
     * because it may be the case that another thread is about to
     * insert or remove an element.
     *
     * @return the remaining capacity
     */
    int remainingCapacity();

    /**
     * Removes a single instance of the specified element from this queue,
     * if it is present.  More formally, removes an element {@code e} such
     * that {@code o.equals(e)}, if this queue contains one or more such
     * elements.
     * Returns {@code true} if this queue contained the specified element
     * (or equivalently, if this queue changed as a result of the call).
     *
     * @param o element to be removed from this queue, if present
     * @return {@code true} if this queue changed as a result of the call
     * @throws ClassCastException if the class of the specified element
     *         is incompatible with this queue
     * (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
     * @throws NullPointerException if the specified element is null
     * (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
     */
    boolean remove(Object o);

    /**
     * Returns {@code true} if this queue contains the specified element.
     * More formally, returns {@code true} if and only if this queue contains
     * at least one element {@code e} such that {@code o.equals(e)}.
     *
     * @param o object to be checked for containment in this queue
     * @return {@code true} if this queue contains the specified element
     * @throws ClassCastException if the class of the specified element
     *         is incompatible with this queue
     * (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
     * @throws NullPointerException if the specified element is null
     * (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
     */
    boolean contains(Object o);

    /**
     * 从队列中删除所有能访问的元素并且将他们增加到指定的集合中。
     * 这个操作可能比重复的调用poll方法效率更高。当尝试将元素增加
     * 到集合中时如果失败啊,那么可能队列和集合中都包含或都不包含
     * 此元素。如果指定的集合是它自己,那么抛出一个IllegalArgumentException。
     * 当此操作进行时,如果指定的集合被修改,那么这个操作将是未定义的。
     *
     * @param c the collection to transfer elements into
     * @return the number of elements transferred
     * @throws UnsupportedOperationException if addition of elements
     *         is not supported by the specified collection
     * @throws ClassCastException if the class of an element of this queue
     *         prevents it from being added to the specified collection
     * @throws NullPointerException if the specified collection is null
     * @throws IllegalArgumentException if the specified collection is this
     *         queue, or some property of an element of this queue prevents
     *         it from being added to the specified collection
     */
    int drainTo(Collection<? super E> c);

    /**
     * Removes at most the given number of available elements from
     * this queue and adds them to the given collection.  A failure
     * encountered while attempting to add elements to
     * collection {@code c} may result in elements being in neither,
     * either or both collections when the associated exception is
     * thrown.  Attempts to drain a queue to itself result in
     * {@code IllegalArgumentException}. Further, the behavior of
     * this operation is undefined if the specified collection is
     * modified while the operation is in progress.
     *
     * @param c the collection to transfer elements into
     * @param maxElements the maximum number of elements to transfer
     * @return the number of elements transferred
     * @throws UnsupportedOperationException if addition of elements
     *         is not supported by the specified collection
     * @throws ClassCastException if the class of an element of this queue
     *         prevents it from being added to the specified collection
     * @throws NullPointerException if the specified collection is null
     * @throws IllegalArgumentException if the specified collection is this
     *         queue, or some property of an element of this queue prevents
     *         it from being added to the specified collection
     */
    int drainTo(Collection<? super E> c, int maxElements);
}
上一篇下一篇

猜你喜欢

热点阅读