10. LinkedBlockingQueue
LinkedBlockingQueue
类实现了BlockingQueue
接口。阅读BlockingQueue
文本以获取有关的更多信息。
LinkedBlockingQueue
在内部将元素存储在链接结构(链接节点)中。如果需要,该链接结构可以具有一个上限。如果未指定上限,则使用Integer.MAX_VALUE
作为上限。
LinkedBlockingQueue
内部将元素以FIFO(先入先出)次序存储。队列的头部是已在队列中的时间最长的元素,队列的尾部是已在队列中的时间最短的元素。
以下是如何实例化和使用LinkedBlockingQueue
:
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024);
bounded.put("Value");
String value = bounded.take();
源码
LinkedBlockingQueue
内部使用了一个单向链表,同时它提供了两个锁,一个用于获取并删除元素,一个用于增加元素。count
字段使用原子变量,避免修改它时需要同时获取两个锁。
static class Node<E> {
E item;
/**
* 下面中的一个:
* - 真实的后继节点
* - 这个节点本身,此时原后继节点现在是head.next,即第一个元素
* - null, 意味没有后继节点,此节点是队列最后一个节点
*/
Node<E> next;
Node(E x) { item = x; }
}
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
增加操作
注意进行增加操作时,只对putLock
加锁,如果还对takeLock
也进行加锁,那么就会影响性能。同时,为了弥补此方法带来的后果,count
使用原子变量,进行CAS更新,防止数据不一致。
为了提升性能,在增加元素成功后,如果队列还没有满,那么便唤醒其他因队列满而被阻塞的插入线程。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 注意即使count没有被锁保护,它依然可以被用作等待条件
// 判定。因为此时count只会被减少(putLock已加锁),如果容量
// 改变,会被唤醒。count在其他地方的使用也与此相似。
// 队列已满,阻塞自己
while (count.get() == capacity) {
notFull.await();
}
// 插入队列中
enqueue(node);
// CAS更新count值
c = count.getAndIncrement();
// 如果队列没满,唤醒其他等待插入的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果队列原来是空队列,唤醒等待提取元素的线程
if (c == 0)
signalNotEmpty();
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
// 先加锁,才能调用对应Condtion的signal()方法
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 队列已满,返回false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 等待-超时机制
while (count.get() == capacity) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
删除操作
删除操作与增加操作一样。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 当队列为空,阻塞自己
while (count.get() == 0) {
notEmpty.await();
}
// 将头节点出队
x = dequeue();
c = count.getAndDecrement();
// 如果队列还有元素,唤醒其他等待提取元素的线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果原本队列是满的,唤醒增加线程,因为现在元素已经被取出,队列不满
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
// 头节点为空,其中不存储元素
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
// 删除一个指定元素
public boolean remove(Object o) {
if (o == null) return false;
// 将两个锁全部加锁
fullyLock();
try {
for (Node<E> pred = head, p = pred.next;
p != null;
pred = p, p = p.next) {
if (o.equals(p.item)) {
// 从队列中移除此节点
unlink(p, pred);
return true;
}
}
return false;
} finally {
// 释放全部两个锁
fullyUnlock();
}
}
void unlink(Node<E> p, Node<E> pred) {
// assert putLock.isHeldByCurrentThread();
// assert takeLock.isHeldByCurrentThread();
// p.next没有被设置为null,为了保证迭代器遍历到p时继续工作,
// 保证弱一致性
p.item = null;
pred.next = p.next;
if (last == p)
last = pred;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
访问操作
public E peek() {
// 队列为空,返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 返回第一个元素
return (count.get() > 0) ? head.next.item : null;
} finally {
takeLock.unlock();
}
}
其他操作
public void clear() {
fullyLock();
try {
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
// 使得next指向自己
h.next = h;
// 解除对元素实体的引用
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
// 如果原来队列是满的,唤醒等待的插入线程
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
fullyUnlock();
}
}
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
public int drainTo(Collection<? super E> c, int maxElements) {
Objects.requireNonNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 获取当前队列中的元素数量
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
// 将n个元素加入到指定集合中
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}
迭代器
LinkedBlockingQueue
的迭代器与DelayQueue
的不同,DelayQueue
的迭代器与原组件没有任何的一致性,而LinkedBlockingQueue
的迭代器与内部的链表保持了弱一致性。
注意它的next()
方法,它会跳过内容为null的节点,回忆前面删除操作中的remove(Object)
方法,他没有修改节点的next字段,如果修改了,迭代器就会无法正常工作,而为了保证一致性,迭代器也需要跳过这个空节点。
而它的forEachRemaining(Consumer<? super E> action)
方法是分批次进行处理的,每批64个元素,如果数量小于64,那就使用此数量。
private class Itr implements Iterator<E> {
private Node<E> next; // 持有nextItem的节点
private E nextItem; // 下一个进行处理的元素
private Node<E> lastRet; // 上一个返回的元素,即当前正在使用的
private Node<E> ancestor; // Helps unlink lastRet on remove()
Itr() {
fullyLock();
try {
// 保存第一个元素
if ((next = head.next) != null)
nextItem = next.item;
} finally {
fullyUnlock();
}
}
public boolean hasNext() {
return next != null;
}
public E next() {
Node<E> p;
if ((p = next) == null)
throw new NoSuchElementException();
lastRet = p;
E x = nextItem;
fullyLock();
try {
E e = null;
// 注意此处,遇到空节点会跳过去访问下一个节点
for (p = p.next; p != null && (e = p.item) == null; )
p = succ(p);
next = p;
nextItem = e;
} finally {
fullyUnlock();
}
return x;
}
Node<E> succ(Node<E> p) {
// 正常出队的元素next字段会指向自己
if (p == (p = p.next))
p = head.next;
return p;
}
public void forEachRemaining(Consumer<? super E> action) {
// A variant of forEachFrom
Objects.requireNonNull(action);
Node<E> p;
if ((p = next) == null) return;
lastRet = p;
next = null;
final int batchSize = 64;
Object[] es = null;
int n, len = 1;
do {
fullyLock();
try {
if (es == null) {
p = p.next;
// 获取真正存在的元素的数量,如果多于64,分批进行,一批为64个
for (Node<E> q = p; q != null; q = succ(q))
if (q.item != null && ++len == batchSize)
break;
es = new Object[len];
es[0] = nextItem;
nextItem = null;
n = 1;
} else
n = 0;
// n为1的使用只因为p=p.next,经过此步后p已经不是首元素,
// 而是第二个元素。而后面批次的插入直接从0开始即可
// 将元素放入数组中
for (; p != null && n < len; p = succ(p))
if ((es[n] = p.item) != null) {
lastRet = p;
n++;
}
} finally {
fullyUnlock();
}
// 分别调用accept方法
for (int i = 0; i < n; i++) {
@SuppressWarnings("unchecked") E e = (E) es[i];
action.accept(e);
}
} while (n > 0 && p != null);
}
public void remove() {
// 获取当前元素
Node<E> p = lastRet;
if (p == null)
throw new IllegalStateException();
lastRet = null;
fullyLock();
try {
if (p.item != null) {
if (ancestor == null)
ancestor = head;
// 获取p的前驱结点
ancestor = findPred(p, ancestor);
// 从链表中删除结点p
unlink(p, ancestor);
}
} finally {
fullyUnlock();
}
}
}
测试:
import org.junit.Test;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueTest {
private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
@Test
public void test() {
queue.offer("1");
queue.offer("2");
queue.offer("3");
queue.offer("4");
Iterator<String> itr = queue.iterator();
queue.remove("3");
itr.forEachRemaining(System.out::println);
}
}
输出如下:
1
2
4
核心要点
- 内部使用一个单向链表,以FIFO顺序存储
- 可以在链表两头同时进行操作,所以使用两个锁分别保护
- 插入线程在执行完操作后如果队列未满会唤醒其他等待插入的线程,同时队列非空还会唤醒等待获取元素的线程;提取线程同理。
- 迭代器与单向链表保持弱一致性,调用
remove(T)
方法删除一个元素后,不会解除其对下一个结点的next引用,否则迭代器将无法工作。 - 迭代器的
forEachRemaining(Consumer<? super E> action)
以64个元素为一批进行操作