LinkedBlockingQueue分析
LinkedBlockingQueue内部结构采用链式存储,元素节点Node,封装对象E泛型,Node#next指向下一个节点。
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
存储结构图。
阻塞队列LinkedBlockingQueue链式存储结构.jpg
LinkedBlockingQueue#构造方法。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
参数说明
capacity:容量,默认是Integer.MAX_VALUE。
count:当前队列元素数量,AtomicInteger线程同步,可根据count与capactiy比较值作为是否已满的依据。
head/last指针:初始化创建第一个Node节点作为头结点与尾节点,封装内容是null。
LinkedBlockingQueue插入节点
put/offer方法:向尾部插入元素,加锁putLock。区别是,若队列已满,put方法线程阻塞等待,当有空余位置时,被唤醒,继续插入,offer直接返回插入失败。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();//以下代码段加锁
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node); // 到这里插入,说明有空余至少一个。
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();//若还有至少一个空余,通知其他等待的不满信号。
} finally {
putLock.unlock();//解锁
}
if (c == 0)
signalNotEmpty();
}
-
插入对象不能是空。
-
封装E对象Node节点。
-
线程并发,一定要ReentrantLock加锁,put锁,确保其他线程此刻无法访问上锁的代码段。
-
判断count,count是AtomicInteger类型,并发保持同步。若达到最大容量capacity,此时线程将阻塞,让出锁,notFull不满条件等待,即等待不满信号唤醒。
-
若未达到最大容量,在队列尾部enqueue插入Node节点。
-
count自增,返回的c仍是count自增前的值,若+1还小于capacity,说明此次插入后还有空余空间,发出未满通知信号。
这里为什么会发出一个notFull未满信号呢?
线程A1执行put方法时,获取putLock锁,其他线程无法获取该锁,当A1线程发现队列已满,在notFull上等待,出让putLock锁,线程A2获取该锁,同样在notFull上等待,此刻,存在多个阻塞的插入线程A1.2.3...都在notFull#await导致阻塞,出让putLock锁,在notFull上等待。
直到take/poll方法获取数据,队列出现空位,signalNotFull执行notFull#signal方法,在notFull上有线程在等待,唤醒A1,signal只唤醒notFull上的一个线程。A1抢到putLock锁,执行元素插入,此刻其他线程仍休眠。enqueue插入成功,再次判断是否有多余容量,若有,notFull#signal继续唤醒在notFull上休眠的其他线程。若没有notFull上的等待线程,执行signal也没有影响,重点在于,它会继续唤醒其他notFull等待的线程。 -
ReentrantLock解锁。
-
若c==0,说明插入之前队列是空,插入后不空,一定要发出不空信号NotEmpty,让阻塞在NotEmpty上的读取线程唤醒。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
offer方法比较简单,判断count未达到最大容量capactiy,插入节点。同理,根据空余空间发出未满信号。c的初始值是-1,若成功插入,则一定会赋值成一个>=0的值,因此。offer的返回的值可判断是否成功插入,offer不会阻塞。
LinkedBlockingQueue取出节点
take/poll方法:从队列头部加锁takeLock获取元素。若队列为空,take方法获取线程阻塞等待,有元素时被唤醒。poll返回失败。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
分析逻辑与put一致。
生产者与消费者模型,元素在头部获取,尾部插入。多线程并发操作时,采用两种锁保证线程同步,互相独立,并行操作队列元素,数量更新AtomicInteger同步。
任重而道远