Java 中ArrayBlockingQueue 用的是循环数组

2022-01-30  本文已影响0人  多多的大白

我们都知道Queue是一种操作线性表,可以用数组、链表来实现。
Queue的特点是先进先出。

存在问题:
如果我们使用普通的数组来实现Queue,一定会存在一个问题,每次出对的时候,从数组的第一位开始取,那么只要出对完成,就会涉及到整个数组搬迁,会导致时间最差复杂度成为O(n) 。单纯从设计Queue来讲,我觉得是应该要避免这样的搬迁操作。

循环数组

我们都知道数组是一块连续的内存空间。但是循环数组,确实不多见。如何理解?

环形数组

如上图,是一个大神的画的,看着挺复杂的,实现起来并没有那么复杂,只要抓住核心tail和head 即可,还有一个很重要的一个算法 (tail + 1) % n ,如下代码。


public class CircularQueue {
  // 数组:items,数组大小:n
  private String[] items;
  private int n = 0;
  // head表示队头下标,tail表示队尾下标
  private int head = 0;
  private int tail = 0;

  // 申请一个大小为capacity的数组
  public CircularQueue(int capacity) {
    items = new String[capacity];
    n = capacity;
  }

  // 入队
  public boolean enqueue(String item) {
    // 队列满了
    if ((tail + 1) % n == head) return false;
    items[tail] = item;
    tail = (tail + 1) % n;
    return true;
  }

  // 出队
  public String dequeue() {
    // 如果head == tail 表示队列为空
    if (head == tail) return null;
    String ret = items[head];
    head = (head + 1) % n;
    return ret;
  }
}

这是一段使用循环数组实现简单Queue的代码,这样实现完全解决了上面的问题,出对的时候时间复杂度从O(n) 降成O(1)。

如果仔细看代码你会发现,在执行到临界点的时候,即队列满了,会存在有一个位置为空,内存如此宝贵怎么能忍。
有个简单操作,show you code。


public class CircularQueue {

  // 数组:items,数组大小:n
  private String[] items;
  private int n = 0;
  // 定义数组的长度
  private int size = 0;
  // head表示队头下标,tail表示队尾下标
  private int head = 0;
  private int tail = 0;

  // 申请一个大小为capacity的数组
  public CircularQueue(int capacity) {
    items = new String[capacity];
    n = capacity;
  }

  // 入队
  public boolean enqueue(String item) {

    if (size == n) {
      return false;
    }
    // 队列满了
    items[tail] = item;
    tail = (tail + 1) % n;
    size++;
    return true;
  }

  // 出队
  public String dequeue() {
    // 如果head == tail 表示队列为空
    if (size == 0) {
      return null;
    }
    String ret = items[head];
    head = (head + 1) % n;
    size--;
    return ret;
  }
}

上述简单介绍了循环队列,设计确认比较巧妙,对于一些底层的编码,什么都要节省,如果有方式方法能节省就节省吧,毕竟现在我们都是天天把节省成本挂在嘴边。

重点看下ArrayBlockingQueue

之前一直用ArrayBlockingQueue,也看过几次源码,看了也忘记了。一直没有抓住核心点。这次看到了循环数组突然对ArrayBlockingQueue有些好奇。

是不是也是循环数组的方式来实现Queue
我们看下ArrayBlockingQueue源码,其他方法可以先不着急去看,先去看核心的方法enqueue和dequeue

 /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
 private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

从上述代码来看,涉及到几个成员变量和ArrayBlockingQueue构造函数

    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;
  /** Condition for waiting puts */
    private final Condition notFull;

  /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and the specified access policy.
     *
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

其中takeIndex、putIndex、count 都是int 类型,默认为0。

notEmpty 表示队列为空的时候,出对操作的线程状态变成等待状态,并释放锁,入队操作的时候,会通知唤醒等待时间最长的线程。

notFull 表示队列为满的时候,入队操作线程状态变成等待状态,并释放锁,出对操作的时候,会通知唤醒等待时间最长的线程。

从ArrayBlockingQueue 中enqueue 和dequeue 源码看很容易理解使用的就是循环数组来实现的,如下代码

if (++putIndex == items.length)
    putIndex = 0;

if (++takeIndex == items.length)
  takeIndex = 0;

光看enqueue 和dequeue 源码发现其实整个逻辑非常不严谨,不严谨也没关系,主要看他们俩都是私有方法。更何况有个核心成员变量 count还没有涉及到。

ArrayBlockingQueue 对外提供的出对和入对操作都是成对,我们一一看来

 /**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue's capacity,
     * returning {@code true} upon success and {@code false} if this queue
     * is full.  This method is generally preferable to method {@link #add},
     * which can fail to insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

offer和poll方法中 通过count 来 判断队列是否满了,或者是否为空队列,前提都加了Lock,为了保证线程安全,出入对都统一锁了。

到这里可能会有疑问阻塞去哪了?
我们再看另外一对的出入对方法

 /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

notFull.await() 和 notEmpty.await() 使用上了,表示这俩个操作是阻塞的,很容易理解。
这里面有一个细节点需要注意

 while (count == items.length)
          notFull.await();
 while (count == 0)
         notEmpty.await();

我刚开始看的时候,我也有疑问为什么使用while而非if判断,其实是做了双重保证,防止过早或者意外通知唤醒。

再看下最后一对

 /**
     * Inserts the specified element at the tail of this queue, waiting
     * up to the specified wait time for space to become available if
     * the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

理解了上面俩对出入对的方法,看到这对也是比较容易理解,加了阻塞时间而已。

总结

今天是辛丑年最后一天,除夕,回不了自己的家乡,只能待在杭城。
在此祝所有的亲朋好友除夕安康,来年虎虎生威!

上一篇下一篇

猜你喜欢

热点阅读