Java1.8-LinkedBlockingQueue源码学习(
一、概述
上文我们学习了ArrayBlockingQueue的源码,本篇我们接着来学习LinkedBlockingQueue。ArrayBlockingQueue底层通过数组来实现,而与数组相对应,LinkedBlockingQueue底层是通过链表来实现的。
LinkedBlockingQueue 是基于链表实现的有界的阻塞队列。为了防止链表过度扩展,LinkedBlockingQueue的容量是可选的,如果未指定容量,容量最大值是Integer.MAX_VALUE;
二、源码
1. 继承结构
继承结构和ArrayBlockingQueue是一样的,这里就不多说了:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
2. 属性
接下来,我们先来看一下它的一些属性:
/** 链表容量, 如果没有指定,默认Integer.MAX_VALUE */
private final int capacity;
/** 链表中元素的个数 */
private final AtomicInteger count = new AtomicInteger();
/** 链表的头节点,头节点中不保存元素 */
transient Node<E> head;
/** 链表的尾节点,尾节点中不保存引用 */
private transient Node<E> last;
/** 出队元素锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 出队元素的Condition条件 */
private final Condition notEmpty = takeLock.newCondition();
/** 入队元素锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 入队元素的Condition条件 */
private final Condition notFull = putLock.newCondition();
从这里可以看出,LinkedBlockingQueue包含了读重入锁,写重入锁,实现了锁的读写分离,并且不同的锁有不同的Condition条件。并且头节点和尾节点还有一些特性:
头节点不包含元素,即
head.item == null
,也就是head.item
一直为null;而尾节点不包含引用,即last.next == null
,也就是last.next
一直为null。
3. 构造方法
先来看一下前两个简单的构造方法:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
// 容量值不能小于0
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化头节点和尾节点
last = head = new Node<E>(null);
}
默认情况下,队列的容量是Integer.MAX_VALUE。来看另一个构造方法:
public LinkedBlockingQueue(Collection<? extends E> c) {
// 初始化最大容量
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
// 循环将集合中元素添加到队列中
for (E e : c) {
if (e == null)
throw new NullPointerException();
// 集合容量不能大于队列容量
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
// 设置数组容量
count.set(n);
} finally {
putLock.unlock();
}
}
从给定集合初始化一个队列,并且队列中元素不能为null,所给定集合容量不能大于队列容量,然后将元素转化为Node节点,保存到队列中。这里调用了入队的方法enqueue
,后面再来介绍这个方法。
4. 内部类
LinkedBlockingQueue中提供了一个内部类Node,用来保存元素,这一点和ArrayBlockingQueue是不同的,因为链表除了保存常规的元素外,还需要有指针,指向下一个节点,来简单看一下源码:
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
该节点比较简单,除了有一个元素类型之外,还有一个next指针,指向链表的下一个节点,该值如果为null,表示该节点没有后继节点,也就是该节点是最后一个节点。
5. 方法
5.1 put方法
首先来看下put方法,该方法是一个阻塞的入队方法,来看下源码:
public void put(E e) throws InterruptedException {
// 元素非空判断
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
// 将元素转化为Node节点
Node<E> node = new Node<E>(e);
// 获取入队锁,及链表中元素个数
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 线程可中断
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
// 如果队列满了
while (count.get() == capacity) {
// 阻塞notFull条件对应的线程
notFull.await();
}
// 线程可用了,进行入队
enqueue(node);
// 元素个数原子性增加,返回到是增加前的数量
c = count.getAndIncrement();
// 如果增加后的数量小于队列的容量,唤醒在notFull上等待的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果元素个数为0,表示队列原先为空,此时添加成功了,队列里有元素了,
// 则需要唤醒在notEmpty条件上等待的线程
if (c == 0)
signalNotEmpty();
}
来简单看下流程:
- 首先非空校验,然后将元素转化为节点,并获取锁;
- 然后判断队列是否已满,如果满了,在notFull条件上进行等待;
- 如果没满,入队操作,然后更新队列元素,若元素数量小于队列容量,唤醒在notFull上等待的线程,通知他们队列有可用空间了;
- 释放锁后,判断元素入队前,队列中的元素是否为0,也就是说队列原先是否为空;如果原来的队列为空,而现在已经添加进元素了,所以激活notEmpty条件对应的线程,通知他们队列里已经有元素了。
这中间调用了入队的方法enqueue
,这里来简单看下:
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
该方法表示将节点添加至队列的尾部,然后更新链表的尾节点,可以画个简单的链表操作图来帮助理解。可能需要注意一点:
尾节点是不包含引用的,只包含元素对象;
至于唤醒notEmpty的方法signalNotEmpty
,则比较简单,这里来看下源码:
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
5.2 offer方法
put方法也就是入队方法,如果队列已满,返回false,其他的操作和put类似,这里来简单看下源码:
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 队列满了,直接返回false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 队列没满
if (count.get() < capacity) {
// 入队
enqueue(node);
c = count.getAndIncrement();
// 如果添加后队列依旧没满,唤醒notFull对应线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// 如果原队列为空,现在添加后不为空了,唤醒等在notEmpty上的线程
if (c == 0)
signalNotEmpty();
return c >= 0;
}
而重载的超时方法offer(E, long, TimeUnit)
逻辑实现也差不太多,来简单看下:
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
// 获取超时时间
long nanos = unit.toNanos(timeout);
int c = -1;
// 获取入队锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 如果队列已满
while (count.get() == capacity) {
// 然后再判断超时时间是否大于0,
if (nanos <= 0)
return false;
// 等待对应的超时时间,并且重置超时时间
// 为什么重置呢,因为可能被notFull条件对应的线程进行唤醒操作
nanos = notFull.awaitNanos(nanos);
}
// 入队
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
5.3 take方法
take方法是一个阻塞的出队方法,其与put方法是相对应的,我们来看一下:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果队列为空,阻塞,直到队列不为空
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
c = count.getAndDecrement();
// 如果出队后队列依旧不为空,唤醒notEmpty上等待的线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果原来队列已满,现在出队了一个,说明队列可用,唤醒notFull对应等待的线程
if (c == capacity)
signalNotFull();
return x;
}
来简单看下流程,流程其实是和put相对应:
- 先获取出队的锁,然后判断队列是否为空,如果为空,一直阻塞知道队列不为空;
- 队列不为空,出队,出队后如果队列依旧不为空,唤醒notEmpty上等待的线程;
- 如果原来队列已满,现在出队了一个,说明队列可用了,则唤醒notFull条件上对应等待的线程。
这里调用到了出队方法dequeue
方法,来简单看下:
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
// 原先头节点
Node<E> h = head;
// 原先头节点的下一个节点(第一个节点)
Node<E> first = h.next;
// 原先头节点的next指针指向自己
h.next = h; // help GC
// 头节点后移
head = first;
// 获取第一个节点的元素值
E x = first.item;
// 将原来的第一个节点设置为null
first.item = null;
return x;
}
该方法的目的是为了更新头节点,将头节点更新为原先头节点的下一个节点,可能需要注意的是:
头节点是不保存元素值的,只保存指向下一个节点的指针。
而至于signalNotFull
方法,就不多说了。
5.4 poll方法
出队方法poll,表示如果队列为空,直接返回null,其他操作与take方法类似,来简单看下源码:
public E poll() {
final AtomicInteger count = this.count;
// 队列为空,直接返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 队列不为空,也就是队列中有元素
if (count.get() > 0) {
// 出队
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 同理,唤醒
if (c == capacity)
signalNotFull();
return x;
}
而超时的poll方法与超时的offer方法是类似的,这里只贴下源码,不多说了:
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
5.5 peek方法
至于peek方法,就比较简单,也没必要多说了:
public E peek() {
// 队列为空
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 获取头节点的下一个节点
Node<E> first = head.next;
// 如果第一个节点为空
if (first == null)
return null;
else
// 取值
return first.item;
} finally {
takeLock.unlock();
}
}
5.6 remove方法
remove方法表示移除队列中相应的元素,来看下源码:
public boolean remove(Object o) {
if (o == null) return false;
// 获取入队锁和出队锁
fullyLock();
try {
// 循环遍历,从队头到队尾
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
// 移除元素
unlink(p, trail);
return true;
}
}
return false;
} finally {
// 释放入队锁和出队锁
fullyUnlock();
}
}
我们来简单看下流程:
- 首先会获取入队锁和出队锁,因为移除元素的时候不允许再入队或出队;
- 然后会循环遍历,从队头遍历到队尾,判断元素是否相等,相等的话进行移除操作;
- 最后释放相应的锁。
这里移除元素的操作会调用unlink
方法,该方法用于将指定节点从链表中断开,我们来简单看下这个方法:
/**
* 这里trail节点是p节点的前驱节点
*/
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
// 将p节点元素置为null
p.item = null;
// 将trail节点指向的下一个节点的引用,指向p节点的下一个节点
// 也就是将p节点断开
trail.next = p.next;
// 如果p节点是尾节点,重置尾节点
if (last == p)
last = trail;
// 如果原来队列已满,这里移除后,队列就可用了,唤醒在notFull上等待的线程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
这里的操作还会判断如果移除元素之前,队列已满,那么移除之后,队列就可用了,这时候会唤醒在notFull上等待的线程。另外,fullyLock
和fullyUnlock
比较简单,就不多说了:
/**
* Locks to prevent both puts and takes.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* Unlocks to allow both puts and takes.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
四、总结
到这里,大部分常用的方法都源码我们都学习过了,这里我们进行简单的总结下:
- LinkedBlockingQueue是基于链表实现的阻塞队列,容量可选,不指定容量的话,默认是Integer.MAX_VALUE;
- LinkedBlockingQueue针对入队和出队分别提供了各自的可重入锁,实现了读写分离。
本文参考链接除了官方文档外,还参考自:
《Java并发编程实战》
【JUC】JDK1.8源码分析之LinkedBlockingQueue(四)