Java1.8-ArrayBlockingQueue源码学习(五

2019-02-28  本文已影响31人  骑着乌龟去看海

一、概述

  ArrayBlockingQueue是一个有界的阻塞队列,底层通过数组来实现,会按照常规的先进先出(FIFO)的原则来操作队列,元素从队尾入队,从队头出队。同样,如果在获取队列元素的时候队列为空,则会阻塞;在往队列添加元素的时候,如果队列已满,则会阻塞;

接下来我们来看一下该类的实现源码。

二、源码

1. 继承结构及构造方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

可以看到,继承结构都比较常规,继承自AbstractQueue,实现了BlockingQueue,并且实现了序列化接口。

然后来看一下它的构造方法:

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

先来看一下它前两个构造方法。因为是有界队列,所以要制定队列的容量大小;并且由于是通过ReentrantLock来实现锁,而ReentrantLock有公平锁和非公平锁之分,因此要制定对应的访问策略,默认是非公平锁;对于公平锁而言,先阻塞的会先获取到锁;而对于非公平锁,则是进行抢占式获取。

public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    // 构造方法
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            // 遍历集合
            for (E e : c) {
                // 校验集合中元素不能是null
                checkNotNull(e);
                // 往数组中添加元素
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            // 如果初始化容量小于传入集合容量,异常
            throw new IllegalArgumentException();
        }
        // 设置队列数量
        count = i;
        // 初始化入队的索引
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

最后一个构造方法是通过一个给定的集合来创建队列,这里会把集合中的元素挨个添加到队列中,同时初始化的容量不能小于所给定集合的容量。

2. 属性

接下来,我们来看一下该类的一些属性:

/** 保存元素的数组 */
final Object[] items;

/** 获取元素(出队)的索引 */
int takeIndex;

/** 添加元素(入队)的索引 */
int putIndex;

/** 队列中元素的数量 */
int count;

/** 可重入锁 */
final ReentrantLock lock;

/** 获取元素(出队)的Condition条件 */
private final Condition notEmpty;

/** 添加元素(入队)的Condition条件 */
private final Condition notFull;

/**
 * 队列的迭代器
 */
transient Itrs itrs = null;

可以看到,ArrayBlockingQueue定义了一个可重入锁,并且定义了两个Condition条件,分别用于出队和入队的时候进行阻塞。

3. 方法
3.1 add方法

add方法表示入队,将数据插入到队尾,入队成功返回true;如果队列已满,抛出异常。最终还会间接通过offer方法来实现:

public boolean add(E e) {
    return super.add(e);
}

// 继承类AbstractQueue
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
3.2 offer方法

而对offer方法而言,如果队列已满,返回false,不会抛出异常:

public boolean offer(E e) {
    // 非空校验
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 判断队列是否已满
        if (count == items.length)
            return false;
        else {
            // 没满,入队
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

可以看到,ArrayBlockingQueue不允许存储null元素,这里会调用enqueue方法进行入队操作:

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    // 把元素放入到 数组的入队索引处
    items[putIndex] = x;
    // 元素放进去后,如果入队索引等于队列长度,表示已满
    // 重置入队索引为0
    if (++putIndex == items.length)
        putIndex = 0;
    // 数组元素加1
    count++;
    // 唤醒条件notEmpty上等待的线程
    notEmpty.signal();
}

可以看到该方法用于将元素保存到数组对应的入队索引处,并且唤醒条件notEmpty对应的线程,提醒线程该队列已经不为空了(not empty)。

这里重置入队索引为0,表示队列其实是一种循环队列,也就是环形队列,队尾不一定要是物理上的队列末尾,而是逻辑上的队尾,通过这种环形队列的用法,可以减少不必要的元素拷贝(元素出队以后,不用把元素整体往前移动)。

3.3 put方法

put方法是一个阻塞方法,在入队的时候,如果队列已满,会一直阻塞,直到队列可用,并且该方法在当前线程被中断时会抛出异常。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 可中断线程
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 阻塞notFull条件对应的线程
            notFull.await();
        // 入队方法
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
3.4 offer(E, long, TimeUnit)方法

支持超时的offer方法,表示入队的时候,如果队列已满,则等待指定的超时时间,如果超时时间结束,队列仍然已满,则返回false;

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    // 获取超时时间
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 如果队列已满,无限循环
        while (count == items.length) {
            // 如果超时(超时时间小于等于0),返回false
            if (nanos <= 0)
                return false;
            // 没有超时,等待
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}
3.5 poll方法

出队方法poll比较简单,表示获取并移除队头元素,如果队列为空,则返回null:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取队头元素
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

这里,如果队列不为空的话,会调用出队方法dequeue方法,该方法和入队方法enqueue恰好相反:

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // 获取出队元素索引
    E x = (E) items[takeIndex];
    // 原位置设置为空
    items[takeIndex] = null;
    // 如果队列为空(如果新的出队索引==数组长度),重置出队索引
    if (++takeIndex == items.length)
        takeIndex = 0;
    // 数组容量减1
    count--;
    // 迭代器维护
    if (itrs != null)
        itrs.elementDequeued();
    // 唤醒notFull条件上的线程
    notFull.signal();
    return x;
}

首先获取出队元素索引处的值,然后判断队列是否为空,如果队列为空,重置出队索引为0,然后队列容量减1,出队后,唤醒notFull条件上的线程,提示线程表示队列已经not full了。

这里通过++takeIndex == items.length来判断队列是否为空,和入队中的++putIndex == items.length来判断队列是否已满,可以看出ArrayBlockingQueue其实是一种环形队列。

3.6 take方法

出队方法take是一个阻塞方法,用于获取并移除队列中的队头元素,如果队列为空,则会一直阻塞,直到队列中元素可用;

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 如果队列为空,阻塞notEmpty条件对应的线程
        while (count == 0)
            notEmpty.await();
        // 否则,出队
        return dequeue();
    } finally {
        lock.unlock();
    }
}
3.7 poll(long, TimeUnit)

支持超时的poll方法,表示获取并移除对头的元素,如果队列为空,则等待指定的超时时间,如果超时时间结束,队列仍然为空,返回false;

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 获取超时时间
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 如果队列为空
        while (count == 0) {
            // 超时,返回null
            if (nanos <= 0)
                return null;
            // 没有超时,等待
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}
3.8 peek方法

peek方法表示获取队列的对头元素,但不是个出队方法,也就是不移除元素:

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果队列为空,返回null
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

final E itemAt(int i) {
    return (E) items[i];
}

从源码可以看出,peek方法直接获取数组出队索引处对应的元素。

3.9 remove(Object)方法

该方法表示从队列中删除指定的元素:

public boolean remove(Object o) {
   // 如果元素是null,直接返回
   if (o == null) return false;
   final Object[] items = this.items;
   final ReentrantLock lock = this.lock;
   lock.lock();
   try {
       // 如果队列中有元素
       if (count > 0) {
           // 入队索引
           final int putIndex = this.putIndex;
           // 出队索引
           int i = takeIndex;
           // 循环判断,从队头直到队尾
           do {
               // 如果出队索引处(这里的出队索引不是原来的takeIndex,而是自定义的i,一直自增)元素
               // 与要移除的元素相等,进行移除操作
               if (o.equals(items[i])) {
                   // 移除元素
                   removeAt(i);
                   return true;
               }
               if (++i == items.length)
                   i = 0;
           } while (i != putIndex);
       }
       return false;
   } finally {
       lock.unlock();
   }
}

这里会调用removeAt方法来删除对应索引处的元素:

void removeAt(final int removeIndex) {
    // assert lock.getHoldCount() == 1;
    // assert items[removeIndex] != null;
    // assert removeIndex >= 0 && removeIndex < items.length;
    final Object[] items = this.items;
    // 如果要移除元素就是出队索引处元素,按照一般出队方法移除即可
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // an "interior" remove

        // slide over all others up through putIndex.
        final int putIndex = this.putIndex;
        for (int i = removeIndex;;) {
            // 循环中,会从要移除的下标处,向后递增,把数据依次前移,直到队尾
            int next = i + 1;
            if (next == items.length)
                next = 0;
            // 判断是否到了队尾的入队索引处
            if (next != putIndex) {
                // 往前移动元素
                items[i] = items[next];
                i = next;
            } else {
                // 到了队尾索引处,表示元素移动完成,再重新设置下对尾索引
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    // 最后唤醒notFull对应线程
    notFull.signal();
}

这里进行移除操作的时候,会先从要移除的下标处开始,后面的元素依次前移,最后直到队尾的入队索引,然后将队尾的入队索引前的最后一个元素设置为null,这样就移动完成,最后的时候唤醒notFull条件对应的线程,告诉该线程,队列已经not full了。

需要注意的是,这种删除元素的方式,和数组中删除非两端的元素一样,会移动数组中许多元素,从本质上来讲是一种缓慢且有破坏性的操作,因此官方建议我们只有在特殊情况下才进行这个操作。理想情况下,只有当已知队列不被其他线程访问时才应该这样做。

3.10 contains方法

contains方法用来判断队列中是否包含某元素,步骤和remove操作差不多,这里就不多说了。

public boolean contains(Object o) {
    // 非空判断
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果队列中有元素
        if (count > 0) {
            // 入队索引
            final int putIndex = this.putIndex;
            // 出队索引
            int i = takeIndex;
            do {
                // 循环判断,从队头到队尾
                if (o.equals(items[i]))
                    return true;
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}
3.11 clear方法

clear方法表示清空队列中所有的元素,

public void clear() {
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取队列长度,如果队列中有元素
        int k = count;
        if (k > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            // 从队头开始遍历,直到队尾
            do {
                // 将对应索引处元素设置为null
                items[i] = null;
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
            takeIndex = putIndex;
            count = 0;
            // 迭代器处理
            if (itrs != null)
                itrs.queueIsEmpty();
            // 唤醒所有等待notFull条件的线程
            for (; k > 0 && lock.hasWaiters(notFull); k--)
                notFull.signal();
        }
    } finally {
        lock.unlock();
    }
}

该方法会从队列出队索引开始遍历,直到入队索引(也就是从队头到队尾),然后将对应索引处的值都设置为null,并且最后唤醒所有等待notFull条件处理的线程。

4. 代码示例

接下来简单看一个简单的生产者与消费者例子,例子参考自官方API文档及本文底部的链接。首先来看下生产者代码:

/**
 * 生产者
 */
static class Producer implements Runnable {
    private final BlockingQueue<Integer> queue;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                queue.put(produce(i));
                TimeUnit.MILLISECONDS.sleep(500);
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    public int produce(int i) {
        System.out.println("=== put: " + i + ", thread: " + Thread.currentThread().getName());
        return i;
    }
}

然后再看下消费者代码:

/**
 * 消费者
 */
static class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;

    Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                TimeUnit.MILLISECONDS.sleep(500);
                consume(queue.take());
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    public void consume(int x) {
        System.out.println("--- take: " + x + ", thread: " + Thread.currentThread().getName());
    }
}

最后,进行简单测试:

private static ExecutorService executorService = Executors.newFixedThreadPool(3);

public static void main(String[] args) {
    BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
    Producer producer = new Producer(blockingQueue);
    Consumer consumer1 = new Consumer(blockingQueue);
    Consumer consumer2 = new Consumer(blockingQueue);
    executorService.submit(producer);
    executorService.submit(consumer1);
    executorService.submit(consumer2);
}

目的很简单,就是为了测试阻塞队列,先打印的肯定是put线程的语句,然后才会打印take线程的语句,来看下运行结果:

=== put: 0, thread: pool-1-thread-1
=== put: 1, thread: pool-1-thread-1
--- take: 0, thread: pool-1-thread-2
--- take: 1, thread: pool-1-thread-3
=== put: 2, thread: pool-1-thread-1
--- take: 2, thread: pool-1-thread-2
=== put: 3, thread: pool-1-thread-1
--- take: 3, thread: pool-1-thread-3
=== put: 4, thread: pool-1-thread-1
--- take: 4, thread: pool-1-thread-2
=== put: 5, thread: pool-1-thread-1
--- take: 5, thread: pool-1-thread-3
=== put: 6, thread: pool-1-thread-1
--- take: 6, thread: pool-1-thread-2
=== put: 7, thread: pool-1-thread-1
--- take: 7, thread: pool-1-thread-3
=== put: 8, thread: pool-1-thread-1
--- take: 8, thread: pool-1-thread-2
=== put: 9, thread: pool-1-thread-1
--- take: 9, thread: pool-1-thread-3

四、总结

到这,ArrayBlockingQueue中大部分的方法源码都学习过了,剩余一些源码,等用到的时候再来了解下,接下来来简单总结下ArrayBlockingQueue的一些特性:

本文参考自:
《Java并发编程实战》
Java 并发 --- 阻塞队列之ArrayBlockingQueue源码分析 - csdn.net
【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)

上一篇下一篇

猜你喜欢

热点阅读