BlockingQueue的存与取【史上最清晰】
2019-08-08 本文已影响9人
可爱猪猪
作者:
可爱猪猪
-帅锅一枚
作者的网名很阔爱,如果喜欢本文章一定要点 喜欢 或者 打赏,拜托~
作者一直在进步,需要你们的支持和鼓励,谢谢!
人生理想:在程序猿界混出点名堂!
1.是什么
什么是BlockingQueue
BlockingQueue为阻塞队列,比如线程池
的构建就使用了BlockingQueue
比如LinkedBlockingQueue和ArrayBlockingQueue等,还有如下:
Notice:以下讨论以LinkedBlockingQueue
为例:
2.储备
BlockingQueue的界限
界限为这个队列的最大容量,如果超过界限,会有对应的处理机制。
- 默认构造,也就是无界
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
- 指定界限
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
3.看结果
BlockingQueue的存与取,直接看存取方法区别
存
方法名称 | 是否有返回 | 超出capacity处理 |
---|---|---|
offer | 是 | 返回false |
put | 否 | 阻塞 |
add | 是 | 抛出异常 |
取
方法名称 | 读取后是否移除队列 | 队列为空处理 |
---|---|---|
poll | 移除 | 返回null |
take | 移除 | 阻塞 |
peek | 不移除 | 返回null |
4.知其所以然
BlockingQueue的存与取,对边界处理机制
存
:offer()、 put()、 add()
- offer()
队列尾部追加元素,如果队列已满,则返回false,否则返回true
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 超出界限,直接返回失败
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
// 追加到队列
enqueue(node);
//队列总数+1
c = count.getAndIncrement();
if (c + 1 < capacity)
// 如果增加了反而没满,通知put方法释放等待
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
- put()
添加元素到队列尾部,如果队列已满,则阻塞等待。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
// 如果队列已满,阻塞等待,直到通知解锁,否则就添加成功
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
- add()
该方法为AbstractQueue所有,调用offer,添加元素到队列尾部,如果超出队列,则抛出异常。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
取
:poll() 、take() 、peek(),peek英文为窥视
- poll()
从队列头部获取元素并移除,如果队列为空,返回null。
public E poll() {
final AtomicInteger count = this.count;
//如果队列为空,返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
//并从队列移除
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
- take()
从队列头部获取元素并移除,如果队列为空,则阻塞。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果队列为空,则阻塞等待
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
- peek()
从队列头部获取元素,如果队列为空,则返回null。
public E peek() {
// 如果队列为空则返回空
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 并未从队列移除,所以说只是偷窥,并未做移除操作
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}