JUC并发包

JUC下的阻塞队列-ArrayBlockingQueue

2021-04-12  本文已影响0人  于情于你

    ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列。下面是主要结构。
    大概思想就是使用items数组存放元素,使用ReentrantLock和Condition实现线程安全。
    每次入队的时候都会通过putIndex找到新元素应该存放的下标,如果putIndex到头了,那么再从0开始,循环使用。入队完成后会通知取元素的线程拿元素。
    每次出队的时候都会通过takeIndex找到这次要取的元素的下标,如果takeIndex到头了,那么也从0开始,循环使用。出队完成后会通知存放元素的线程继续存元素。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -817911632652898426L;

    // 队列的数组
    final Object[] items;

   // 调用take, poll, peek or remove方法,元素的位置
    int takeIndex;

   // 调用 put, offer, or add方法,元素的位置
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    // 等待拿元素线程的等待条件队列
    private final Condition notEmpty;

     // 等待存元素线程的等待条件队列
    private final Condition notFull;

    /**
     * Shared state for currently active iterators, or null if there
     * are known not to be any.  Allows queue operations to update
     * iterator state.
     */
    transient Itrs itrs = null;

入队API:

add,如果队列满了,则会抛出IllegalStateException
public boolean add(E e) {
        return super.add(e);
    }


public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }


public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 队列满了,返回false
            if (count == items.length)
                return false;
            else {
            // 否则入队
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

  private void enqueue(E x) {
        // ArrayBlockingQueue存放元素的数组
        final Object[] items = this.items;
        // 把新元素放到该放的index
        items[putIndex] = x;
        // 如果加完这个元素队列满了,那么把putIndex置为0
        if (++putIndex == items.length)
            putIndex = 0;
        // 元素计数
        count++;
        // 把等待拿元素的线程,从AQS的等待条件队列拿到同步队列,也就是告诉消费线程,可以消费了
        notEmpty.signal();
    }
offer,如果队列满了,则返回false,入队成功返回true,不会抛IllegalStateException

源码同上

put,将指定的元素插入此队列的尾部,如果队列已满,则等待空间。
 public void put(E e) throws InterruptedException {
        checkNotNull(e);
        // 加一个可中断的锁
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
          // 如果队列满了,则等待,加入notFull等待条件队列。
            while (count == items.length)
                notFull.await();
          // 直到有空间了,入队
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

出队API:

poll,如果队列为空,直接返回null,不等待。并且删除元素
public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {  
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

 private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
      // 在可以拿元素的位置takeIndex,获取元素
        E x = (E) items[takeIndex];
        // 把takeIndex位置的元素置为null
        items[takeIndex] = null;
        // takeIndex到头了,那么再从0开始
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();

        // 通知存放元素的线程,可以存放元素了
        notFull.signal();
        return x;
    }
take,如果队列为空,则加入到条件等待队列。并且删除元素
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
peek,如果队列为空,返回null。不删除元素
public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }
上一篇下一篇

猜你喜欢

热点阅读