ArrayBlockingQueue实现原理
概述
ArrayBlockingQueue是常用的的FIFO阻塞队列,实现了BlockingQueue接口,是线程安全的。内部主要通过数组(Array)、锁(ReentrantLock)实现。
注:此队列中不能存放null元素。
常用方法
数据写入
boolean add(E e)
插入成功返回true,失败时抛出异常,add方法调用了父类AbstractQueue的add方法:
public boolean add(E e){
return super.add(e);
}
AbstractQueue的add方法:
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
可见,add方法最终调用了本类的offer方法,当offer失败时,抛出"Queue full"异常,成功时返回true。
boolean offer(E e)
offer方法不会阻塞写入,写入成功返回true,当队列已满时,写入数据失败返回false,在写入过程中加锁,在使用offer写入数据是线程安全的。
public boolean offer(E e) {
//非空验证,如果为空时抛出空指针异常
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
//验证写入数量是否等于数组长度,如果相等,不进行写入
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
boolean offer(E e, long time, timeunit unit)
方法加入了超时等待功能,在设定时间内会进行自旋等待,在设定时间内如果没有写入成功,返回false,写入成功返回true,此方法会响应中断:
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//非空验证,如果为空,会抛出异常
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//可中断锁,响应中断
lock.lockInterruptibly();
try {
//自旋等待
while (count == items.length) {
//超时,返回false写入失败
if (nanos <= 0)
return false;
//使用condition进行等待,此时线程进行time-waiting状态
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
put(E e)
put方法在添加元素时,如果队列已满,会进入阻塞状态,此方法会响应中断:
public void put(E e) throws InterruptedException {
//非空验证,如果为空,会抛出异常
checkNotNull(e);
final ReentrantLock lock = this.lock;
//可响应中断锁
lock.lockInterruptibly();
try {
//自旋
while (count == items.length)
//无限等待,直到notFull.signal或signalAll方法唤醒
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
equeue(E e)
私有方法,equeue是写入底层方法,此方法直接对数组进行操作,将新元素写入到数组中:
private void enqueue(E x) {
//this.items即为存储元素的数组
final Object[] items = this.items;
//将元素按照顺序写入到数组的指定位置,并更新数组下标(加1)
items[putIndex] = x;
//++putIndex将putIndex加1并返回加1后的结果
if (++putIndex == items.length)
//当下标更新后达到数长度是,从数组下标为0开始写入,循环往复
putIndex = 0;
count++;
//写入成功后,通知消费线程可继续进行消费
notEmpty.signal();
}
数据读取
E take()
从阻塞队列中读取一个元素,如果队列中无元素,将进入阻塞状态,如果超时返回null,此方法可响应中断:
//此方法与put方法类似,dequeue方法是从数组中顺序取出一个元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
//此时线程状态是waiting
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
E poll(long timeout, TimeUnit unit)
在设定时间内取出元素,如果超时,返回为空:
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//可响应中断锁
lock.lockInterruptibly();
try {
while (count == 0) {
//超时,则返回为空
if (nanos <= 0)
return null;
//进入超时等待
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
E peek()
获取队列头部元素,此方法不会阻塞,如果队列中无元素,返回为空,此方法不会移除队列中的元素:
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
E element()
此方法是AbstractQueue的的方法,ArrayBlockingQueue没有对其重写,element方法调用了子类的peek方法,所以不会删除队列中的元素,当队列为空时,抛出NoSuchElementException:
public E element() {
//调用子类的peek方法
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
boolean remove(Object o)
删除一个元素,如果要删除的元素为null或者不存在,返回false,否则返回true,此方法不会阻塞线程:
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
//不可中断锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
//循环比较元素并删除
do {
if (o.equals(items[i])) {
//删除指定元素,并重新对数组元素位置进行调整保证FIFO
removeAt(i);
//删除成功返回true
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
其它方法
boolean contains(Object o)
判断是否包含一个元素,是返回true,否返回false,此方法需要遍历队列中的元素,并比较,性能较低:
public boolean contains(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
int drainTo(Collection<? super E> c)
一次性获取队列中所有元素,内部调用了int drainTo(Collection<? super E> c, int maxElements)方法,maxElements参数为Integer.MAX_VALUE,优点是一次获取队列中所有元素,此操作会移除队列中锁获取的元素,避免了多次加锁造成的性能开销。
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
int drainTo(Collection<? super E> c, int maxElements)
从头部获取指定数量的元素,获取的元素将会被从队列中移除。