J.U.C之阻塞队列(二)
2019-10-25 本文已影响0人
今年五年级
LinkedBlockingQueue
与ArrayBlockingQueue区别
linkedBlockingQueue与ArrayBlockingQueue主要区别为
- ArrayBlockingQueue是数组结构的队列,是有界队列
- LinkedBlockingQueue是链表结构的队列,可以是有界队列也可以是无界队列(capacity=Integer.MAX_VALUE)
主要属性
//队列容量
private final int capacity;
//队列元素个数
private final AtomicInteger count = new AtomicInteger();
//对头
transient Node<E> head;
//队尾
private transient Node<E> last;
//take,poll,peek等读操作的方法需要获取读锁
private final ReentrantLock takeLock = new ReentrantLock();
//如果读操作的时候队列为空,则等待不空条件
private final Condition notEmpty = takeLock.newCondition();
//put,offer等写操作的方法需要获取写锁
private final ReentrantLock putLock = new ReentrantLock();
//如果写操作的时候队列为满,则等待不满条件
private final Condition notFull = putLock.newCondition();
锁和条件的搭配关系:如要读取,则不仅要获取读锁,而且还要满足队列不为空的条件
未命名文件 (11).png
从上图可以看出来,读操作和写操作单独看都是安全的,并发问题在于一个写操作和一个读操作同时进行
源码分析
构造方法
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
初始化的时候会构造一个空节点,第一个元素入队的时候,队列中会有两个元素,读取元素时,也总是会获取头节点后面的一个节点,count的计数值不包括这个头节点
put(E e)
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
//将待插入的元素构造为一个Node节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//获取队列中元素个数
final AtomicInteger count = this.count;
//用写锁加一个可以被中断抛出异常的锁
putLock.lockInterruptibly();
try {
//当队列中容量满则等待不满的条件
while (count.get() == capacity) {
//释放锁+park+被唤醒(signal or interrupt)
notFull.await();
}
//此时被通知不满开始入队
enqueue(node);
//先返回队列中元素的数量给c再给队列元素数量+1
c = count.getAndIncrement();
//如果这个元素入队后还有位置可以插入
if (c + 1 < capacity)
//唤醒等待在不满的条件的线程
notFull.signal();
} finally {
//入队后,释放掉写锁
putLock.unlock();
}
//c==0,则代表队列这个元素入队列之前是空的(不包括head节点)
if (c == 0)
//这里做一次唤醒操作让其他线程读取新加入的元素
signalNotEmpty();
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
signalNotEmpty()
private void signalNotEmpty() {
//获取读锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//唤醒线程来读取
notEmpty.signal();
} finally {
//释放读锁
takeLock.unlock();
}
}
take()
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//获取到读锁才能进行出队操作
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
//出队
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
//出队以后队列还不为空,唤醒等待在不为空队列的线程
notEmpty.signal();
} finally {
//出队释放掉读锁
takeLock.unlock();
}
//读取发生之前满了
if (c == capacity)
//刚刚进行了一次出队操作,已经有位置了,唤醒写线程往里面写入
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
//设置这个节点Wie新的头节点
head = first;
E x = first.item;
first.item = null;
return x;
}
signalNotFull()
类比上面的signalNotEmpty
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
PriorityBlockingQueue
带优先级(排序)的blockingQueue实现