多线程并发编程12-LinkedBlockingQueue源码剖
前面的文章介绍了使用CAS算法实现的非阻塞有界队列ConcurrentLinkedQueue(详情点这里),今天介绍另一个并发队列LinkedBlockingQueue。
LinkedBlockingQueue是一个阻塞有界的队列,使用单链表实现,和ConcurrentLinkedQueue一样也有两个Node,分别存放首、尾节点,并且还有一个初始值为0的原子变量count,用来记录队列元素个数。还要两个ReentrantLock对象,分别控制元素的入队和出队的原子性。另外,还有两个条件变量,条件变量内部都有一个条件队列用来存放进队和出队阻塞的线程,其实就是生产者-消费者模型。这些主要的成员变量如下:
transient Node<E> head;
private transient Node<E> last;
private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final Condition notFull = putLock.newCondition();
下面通过源码来看一看具体的LinkedBlockingQueue队列的读写操作。
offer(E e)
向队列尾部插入一个元素,如果队列未达到指定容量则插入成功返回true,如果达到指定容量则丢弃当前元素然后返回false。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//(1)获取当前元素个数,判断是否达到指定容量,是则返回false。
if (count.get() == capacity)
return false;
int c = -1;
//(2)创建Node变量,并尝试获取putLock 锁
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//(3)当前队列元素小于容量大小则将Node插入到队列尾部,并将记录元素个数的原子变量count+1
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
//(4)插入元素之后,队列大小仍小于容量大小,则调用notFull.signal()通知阻塞的生产者可以继续插入元素。
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//(5)释放锁
putLock.unlock();
}
//(6)如果c等于0,则说明在插入这个元素之前队列中没有元素,队列从无元素状态切换到有元素状态,通知消费者可以进行消费获取队列中元素。
if (c == 0)
signalNotEmpty();
return c >= 0;
}
offer方法通过使用putLock锁保证了在队尾新增元素操作的原子性,进队时只操作队列尾节点,并在消费者-生产者模式中进行通知。在调用条件变量的方法时,要注意先获取对应的锁。
put(E e)
向队列队尾插入一个元素,如果队列中有空闲则插入后直接返回,如果队列已满则阻塞当前线程,直到队列中有空闲插入成功后返回。线程阻塞时如果被其他线程设置了中断标志,则该线程会抛出InterruptedException异常。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
//(1)创建Node变量,并尝试获取putLock锁 ,获取锁是用的lockInterruptibly()方法会对中断进行响应并抛出InterruptedException异常。
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//(2)如果当前队列已满则会调用notFull.await()方法阻塞当前线程,停止生产,即停止向队列中插入元素。这里使用while判断队列是否已满,是为了防止虚假唤醒。
while (count.get() == capacity) {
notFull.await();
}
//(3)队列未满则在队列尾部插入node,并将记录元素的原子变量count+1。
enqueue(node);
c = count.getAndIncrement();
//(4)插入元素之后,队列大小仍小于容量大小,则调用 notFull.signal()通知阻塞的生产者可以继续插入元素。
if (c + 1 < capacity)
notFull.signal();
} finally {
//(5)释放锁
putLock.unlock();
}
//(6)如果c等于0,则说明在插入这个元素之前队列中没有元素,队列从无元素状态切换到有元素状态,通知消费者可以进行消费获取队列中元素。
if (c == 0)
signalNotEmpty();
}
put方法相对于offer方法,会在队列已满的条件下阻塞当前线程,其余操作和offer方法基本一致。
poll()
从队列头部获取一个元素,并从队列里移除该元素,如果队列为空则返回null。
public E poll() {
final AtomicInteger count = this.count;
//(1)如果队列为空则直接返回null。
if (count.get() == 0)
return null;
E x = null;
int c = -1;
//(2)获取takeLock 锁。
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//(3)如果当前队列不为空,则将头节点元素弹出,并将记录队列元素个数的原子变量count-1。
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
//(4)如果弹出元素之后队列中还有元素,则调用notEmpty.signal()方法,通知阻塞消费者可以进行消费数据,即可以从队列中获取原始。
if (c > 1)
notEmpty.signal();
}
} finally {
//(5)释放锁
takeLock.unlock();
}
//(6)如果c等于容量大小,则说明队列状态由已满状态转换为有空闲状态,通知阻塞的生产者可以进行生产数据,即向队列中插入元素。
if (c == capacity)
signalNotFull();
return x;
}
poll方法获取元素只操作队列的头节点。
peek()
获取队列头部的元素但不从队列中移除该元素,队列为空的时候返回null。
public E peek() {
//(1)队列为空则返回null。
if (count.get() == 0)
return null;
//(2)尝试获取takeLock锁,获取锁成功后返回头结点元素。
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
//(3)这里有个细节,判断头节点是否为空。
if (first == null)
return null;
else
return first.item;
} finally {
//(4)释放锁。
takeLock.unlock();
}
}
take()
获取队列中头节点元素并从队列里移除该节点,如果队列为空则会阻塞当前线程直到队列中有元素。如果当前线程在阻塞时被其他线程设置了中断标志,则会抛出InterruptedException异常。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//(1)尝试获取takeLock 锁,使用lockInterruptibly方法进行获取,会对中断进行响应并抛出InterruptedException异常。
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//(2)如果队列为空则阻塞当前线程,使用while来进行判断队列是否为空防止虚假唤醒。
while (count.get() == 0) {
notEmpty.await();
}
//(3)队列不为空则将头节点移出队列,并将记录元素个数的原子变量count-1。
x = dequeue();
c = count.getAndDecrement();
//(4)如果弹出元素之后队列中还有元素,则调用 notEmpty.signal()方法,通知阻塞消费者可以进行消费数据,即可以从队列中获取原始。
if (c > 1)
notEmpty.signal();
} finally {
//(5)释放锁。
takeLock.unlock();
}
//(6)如果c等于容量大小,则说明队列状态由已满状态转换为有空闲状态,通知阻塞的生产者可以进行生产数据,即向队列中插入元素。
if (c == capacity)
signalNotFull();
return x;
}
take方法想比与poll方法不同之处就是前者在队列为空的时候会阻塞当前线程,其余逻辑一致。
remove(Object o)
从队列中移除指定的元素,该方法先从队列的头节点遍历找到指定的元素,找到指定的元素则将其从队列中删除,否则返回false。这里在遍历的时候有可能有其他的线程往队列里插入元素,所以需要获取同时获取putLock和takeLock两把锁。
public boolean remove(Object o) {
if (o == null) return false;
//(1)获取putLock锁和takeLock锁。
fullyLock();
try {
//(2)遍历查找指定的元素 ,找到则删除并返回true
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
//(3)从队列中删除指定的元素,如果在删除前队列已满,则需要在删除后唤醒生产者可以进生产消息。
unlink(p, trail);
return true;
}
}
return false;
} finally {
//(4)是否两把锁,要和获取两把锁的顺序想法。
fullyUnlock();
}
}
由于remove方法在删除指定元素前加了两把锁,所以在遍历和删除过程都是线程安全的。在获取多个资源锁的顺序要和释放的顺序相反。
size()
返回队列中的元素个数。由于入队和出队都有加锁操作,所有记录元素的原子变量count的操作都是线程安全的,所以在size方法获取count变量的时候就不需要加锁了。
public int size() {
return count.get();
}
今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。