LinkedBlockingDeque
LinkedBlockingDeque: 由双向链表组成的有界阻塞队列,队列容量大小可选,默认大小为Integer.MAX_VALUE。队头部和队尾都可以写入和移除元素,因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半锁的竞争 。
LinkedBlockingDeque是双向链表实现的阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除);
在不能够插入元素时,它将阻塞住试图插入元素的线程;在不能够抽取元素时,它将阻塞住试图抽取的线程。;
LinkedBlockingDeque还是可选容量的,防止过度膨胀,默认等于Integer.MAX_VALUE。;
LinkedBlockingDuque没有进行读写锁的分离,因此同一时间只能有一个线程对其操作,因此在高并发应用中,它的性能要远远低于LinkedBlockingQueue。
Deque特性: 队头和队尾都可以插入和移除元素,支持FIFO和FILO。
相比于其他阻塞队列,LinkedBlockingDeque多了addFirst()、addLast()、peekFirst()、peekLast()等方法,以XXXFirst结尾的方法,表示插入、获取获移除双端队列的队头元素。以xxxLast结尾的方法,表示插入、获取获移除双端队列的队尾元素。
场景: 常用于工作窃取模式
假设:有多个消费者,每个消费者有自己的一个消息队列,生产者不断的生产数据扔到队列中,消费者消费数据有快有慢。为了提升效率,速度快的消费者可以从其它消费者队列的队尾出队元素放到自己的消息队列中,由于是从其它队列的队尾出队,这样可以减少并发冲突(其它消费者从队首出队元素),又能提升整个系统的吞吐量。这其实是一种“工作窃取算法”的思路。
image.png
三.继承关系
1.继承图
image.pngBlockingDeque相对于BlockingQueue,最大的特点就是增加了在队首入队/队尾出队的阻塞方法。下面是两个接口的比较:
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
//------------------- 双端阻塞队列方法start -------------------
//-----双端入队方法
//队首入队,元素为null,抛出NullPointerException,队列已满,抛出IllegalStateException("Deque full")
void addFirst(E e);
//队尾入队,元素为null,抛出NullPointerException,队列已满,抛出IllegalStateException("Deque full")
void addLast(E e);
//非阻塞式队首入队,成功返回true,队列已满返回false,元素为null,抛出NullPointerException
boolean offerFirst(E e);
//非阻塞式队尾入队,成功返回true,队列已满返回false,元素为null,抛出NullPointerException
boolean offerLast(E e);
//可响应中断阻塞式队首入队,如果队列已满,进入阻塞状态,等待消费者调用出队操作后被唤醒
void putFirst(E e) throws InterruptedException;
//可响应中断阻塞式队尾入队,如果队列已满,进入阻塞状态,等待消费者调用出队操作后被唤醒
void putLast(E e) throws InterruptedException;
//限时阻塞队首入队,成功返回true,队列已满返回,超时未写入返回false,元素为null,抛出NullPointerException
boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException;
//限时阻塞队尾入队,成功返回true,队列已满返回,超时未写入返回false,元素为null,抛出NullPointerException
boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException;
//-----双端出队方法
//可响应中断阻塞式队首出队,如果队列为空,进入阻塞状态,等待生产者调用入队操作后被唤醒
E takeFirst() throws InterruptedException;
//可响应中断阻塞式队尾出队,如果队列已满,进入阻塞状态,等待生产者调用入队操作后被唤醒
E takeLast() throws InterruptedException;
//非阻塞式队首出队,成功返回元素,失败返回null
E pollFirst();
//非阻塞式队尾出队,成功返回元素,失败返回null
E pollLast();
//可响应中断限时阻塞式队首出队,成功返回元素,失败返回null
E pollFirst(long timeout, TimeUnit unit)throws InterruptedException;
//可响应中断限时阻塞式队尾出队,成功返回元素,失败返回null
E pollLast(long timeout, TimeUnit unit)throws InterruptedException;
//从队首至队尾遍历(从前往后),移除通过equals判断相等的第一个元素,并返回true
boolean removeFirstOccurrence(Object o);
//从队尾至队首遍历(从后往前),移除通过equals判断相等的第一个元素,并返回true
boolean removeLastOccurrence(Object o);
//------------------- 双端阻塞队列方法end -------------------
//------------------- 阻塞队列方法start -------------------
E poll(long timeout, TimeUnit unit)throws InterruptedException;
//-----写入start-----
//写入元素至队尾,成功返回true, 如果队列已满,抛出IllegalStateException("Queue full")
//如只往指定长度的队列中写入值,推荐使用offer()方法。
boolean add(E e);
//写入元素至队尾,成功返回true, 如果队列已满,返回false, e的值不能为空,否则抛出NullPointerException。
boolean offer(E e);
//写入元素至队尾, 如果队列已满, 则阻塞调用线程直到队列有空闲空间.
void put(E e) throws InterruptedException;
//写入元素至队尾, 如果队列已满, 则限时阻塞调用线程,直到队列有空闲空间或超时.
boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;
//----写入end----
//-------读取start-----
//从队首读取并移除元素,如果队列为空, 则阻塞调用线程直到队列中有元素写入.
E take() throws InterruptedException;
//从队首读取并移除元素,如果队列为空, 则限时阻塞调用线程,直到队列中有元素写入或超时.
E poll(long timeout, TimeUnit unit)throws InterruptedException;
//-----读取end--------
//读取但不移除元素,如果元素为空,抛出NoSuchElementException
E element();
//读取但不移除元素
E peek();
//------------------- 阻塞队列方法end -------------------
//从队列中通过equals移除相等的元素,若队列为空,抛出NoSuchElementException异常
boolean remove(Object o);
//从队列中通过equals判断是否存在指定元素
public boolean contains(Object o);
//返回deque容器中的元素个数
public int size();
//返回队列迭代器
Iterator<E> iterator();
//从队头写入元素,如果队列已满,抛出异常IllegalStateException("Deque full")
void push(E e)
}
四.主要属性
/**
* 头节点
* Invariant: (first == null && last == null) ||
* (first.prev == null && first.item != null)
*/
transient Node<E> first;
/**
* 尾节点
* Invariant: (first == null && last == null) ||
* (last.next == null && last.item != null)
*/
transient Node<E> last;
//元素个数
private transient int count;
//容量大小,默认是 Integer.MAX_VALUE
private final int capacity;
/** 唯一全局可重入独占“”:掌管所有读写操作的锁 */
final ReentrantLock lock = new ReentrantLock();
/** 出队条件队列:队列为空时,用于阻塞读线程,唤醒写线程 */
private final Condition notEmpty = lock.newCondition();
/** 入队条件队列:队列已满时,用于阻塞写线程,唤醒读线程 */
private final Condition notFull = lock.newCondition();
可以看first指向队首节点,last指向队尾节点。利用ReentrantLock来保证线程安全,所有对队列的修改操作都需要先获取全局锁
五.双向链表节点-Node
/** 双向链表节点内部类 */
static final class Node<E> {
/**
* 节点值, null表示该节点已被移除
*/
E item;
/**
* One of:r
* 1.真实的前驱节点
* 2.执行,意思是前任是尾巴
* 3.null,表示没有前驱节点
*/
Node<E> prev;
/**
* - 真正的后继节点
* - 这个节点,意思是后继节点是头节点
* - null,表示没有继后继节点
*/
Node<E> next;
//写入元素存储值item
Node(E x) {
item = x;
}
}
六.构造方法
//默认构造器,使用 Integer.MAX_VALUE 作为容量
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
//可指定容量的构造器
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
//使用 Integer.MAX_VALUE 作为容量,同时将指定集合添加到队列中的构造器
public LinkedBlockingDeque(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock lock = this.lock;
lock.lock(); //为了保证数据一致性,需要获取锁
try {
//遍历添加到队列
for (E e : c) {
// 队列不能包含null元素
if (e == null)
throw new NullPointerException();
//入队时队列已满则抛出异常,否则继续执行入队操作,链接原队尾节点与入队节点并唤醒一个读线程
if (!linkLast(new Node<E>(e)))
throw new IllegalStateException("Deque full");
}
} finally {
lock.unlock();
}
}
//将节点链接到队尾,如果队列已满则返回false。
private boolean linkLast(Node<E> node) {
//队列已满,直接返回false
if (count >= capacity)
return false;
//获取原队尾节点
Node<E> l = last;
//链接入队节点到原队尾元素上,即入队节点的prev指向原队尾节点
node.prev = l;
//更新入队节点为队尾
last = node;
//队头为空,即第一次写入,初始化当前节点为队首节点
if (first == null)
first = node;
//非第一次写入,链接入队节点到原队尾节点上,即原队尾节点的next指向入队节点
else
l.next = node;
//更新元素计数
++count;
//唤醒一个等待出队线程
notEmpty.signal();
return true;//入队成功
}
七.入队
1.Dueue入队方法
队首入队核心方法-linkFirst(Node node)
- 将节点链接到
队首
,队列已满的情况则返回false。插入成功,则唤醒一个正在等待的出队线程
private boolean linkFirst(Node<E> node) {
//队列已满,直接返回false
if (count >= capacity)
return false;
//获取原队首节点
Node<E> f = first;
//链接原队首节点到现队首入队节点next上,即队首入队节点的next指向原队首节点
node.next = f;
//更新入队节点为新队首
first = node;
//如果队列为空(last才会为null),那么不用重连prev指针,只需要更新last为当前队首入队节点
if (last == null)
last = node;
//如果队列非空,链接队首入队节点到原队首节点的prev上,即原队首节点的prev指向队首入队节点
else
f.prev = node;
//更新元素计数
++count;
//唤醒一个等待出队线程
notEmpty.signal();
return true;//入队成功
}
初始:
image.png
队首插入节点node:
image.png
队尾入队核心方法- boolean linkLast(Node node)
将节点链接到队尾,队列已满的情况则返回false。插入成功,则唤醒一个正在等待的出队线程
private boolean linkLast(Node<E> node) {
//队列已满,直接返回false
if (count >= capacity)
return false;
//获取原队尾节点
Node<E> l = last;
//链接队尾入队节点到原队尾节点的prev上,即入队节点的prev指向原队尾节点
node.prev = l;
//更新队尾入队节点为新队尾
last = node;
//如果队列为空(first 才会为null),那么不用重连next指针,只需要更新first为当前队尾入队节点
if (first == null)
first = node;
//如果队列非空,链接队尾入队节点到原队尾节点的next上,即原队尾节点的next指向队尾入队节点
else
l.next = node;
//更新元素计数
++count;
//唤醒一个等待出队线程
notEmpty.signal();
return true;//入队成功
}
初始:
image.png
队尾插入节点node:
image.png
可响应中断阻塞式队首入队-void putFirst(E e)
- 向
队首阻塞式插入元素
,如果队列未满则插入,如果队列已满,则阻塞当前线程直到队列为空闲,或者元素被其他线程取出
。- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
InterruptedException异常
并返回。
- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//如果队列已满,进入阻塞状态(不断自旋,直到入队成功)
//否则链接入队节点与原队首节点,然后唤醒一个正在等待的出队线程
while (!linkFirst(node))
notFull.await();//入队线程阻塞
} finally {
lock.unlock();//释放锁
}
}
可响应中断阻塞式队尾入队-void putLast(E e)
- 向
队尾阻塞式插入元素
,如果队列未满则插入,如果队列已满,则阻塞当前线程直到队列为空闲,或者元素被其他线程取出
。- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
InterruptedException异常
并返回。
- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//如果队列已满,进入阻塞状态(不断自旋,直到入队成功)
//否则链接入队节点与原队尾节点,然后唤醒一个正在等待的出队线程
while (!linkLast(node))
notFull.await();//入队线程阻塞
} finally {
lock.unlock();//释放锁
}
}
非阻塞式队首入队-boolean offerFirst(E e)
- 向
队首插入元素
,如果队列未满则插入返回true,如果队列已满,则返回false
public boolean offerFirst(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//如果队列已满,退出返回false
//否则链接入队节点与原队首节点,然后唤醒一个正在等待的出队线程,返回true
return linkFirst(node);
} finally {
lock.unlock();//释放锁
}
}
非阻塞式队尾入队-boolean offerLast(E e)
- 向
队尾插入元素
,如果队列未满则插入返回true,如果队列已满,则返回false
public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//如果队列已满,退出返回false
//否则链接入队节点与原队尾节点,然后唤醒一个正在等待的出队线程,返回true
return linkLast(node);
} finally {
lock.unlock();//释放锁
}
}
可响应中断限时阻塞队首入队-boolean offerFirst(E e, long timeout, TimeUnit unit)
- 向
队首插入元素
,如果队列未满则插入返回true,如果队列已满,则阻塞指定时间
,如果超时未写入就返回false。- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
InterruptedException异常
并返回。
- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
public boolean offerFirst(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
//封装成入队节点
Node<E> node = new Node<E>(e);
//获取纳秒数
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//加可中断锁
try {
//如果队列已满,阻塞指定纳秒,超时未写入,退出返回false
//否则链接入队节点与原队首节点,然后唤醒一个正在等待的出队线程,返回true
while (!linkFirst(node)) {
//写入失败,且时间到了,返回false
if (nanos <= 0)
return false;
//获取阻塞剩余时间
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();//释放锁
}
}
可响应中断限时阻塞队尾入队-boolean offerLast(E e, long timeout, TimeUnit unit)
- 向
队尾插入元素
,如果队列未满则插入返回true,如果队列已满,则阻塞指定时间
,如果超时未写入就返回false。- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
InterruptedException异常
并返回。
- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
public boolean offerLast(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//加可中断锁
try {
//如果队列已满,阻塞指定纳秒,超时未写入,退出返回false
//否则链接入队节点与原队尾节点,然后唤醒一个正在等待的出队线程,返回true
while (!linkLast(node)) {
return false;
//获取阻塞剩余时间
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();//释放锁
}
}
非阻塞队首入队(抛异常)-void addFirst(E e)
- 向
队首插入元素
,如果队列未满则插入返回true,如果队列已满,,则抛出IllegalStateException("Deque full")
异常
public void addFirst(E e) {
//非阻塞式队首入队,队列已满,则抛出异常
if (!offerFirst(e))
throw new IllegalStateException("Deque full");
}
非阻塞队尾入队(抛异常)-void addLast(E e)
- 向
队尾插入元素
,如果队列未满则插入返回true,如果队列已满,,则抛出IllegalStateException("Deque full")
异常
public void addLast(E e) {
//非阻塞式队尾入队,队列已满,则抛出异常
if (!offerLast(e))
throw new IllegalStateException("Deque full");
}
2.Queue入队方法
非阻塞式队尾入队-boolean add(E e)
- 底层调用
addLast(e)
,即 非阻塞式队尾入队, 如果队列未满则插入返回true,如果队列已满,,则抛出IllegalStateException("Deque full")
异常
public boolean add(E e) {
addLast(e);
return true;
}
可响应中断阻塞式队尾入队-void put(E e)
- 底层调用
putLast(e)
,即 可响应中断阻塞式队尾入队,队尾阻塞式插入元素
,如果队列未满则插入,如果队列已满,则阻塞当前线程直到队列为空闲,或者元素被其他线程取出
。
public void put(E e) throws InterruptedException {
putLast(e);
}
非阻塞式队尾入队-boolean offer(E e)
- 底层调用
offerLast(e)
,即 非阻塞式队尾入队, 向队尾插入元素
,如果队列未满则插入返回true,如果队列已满,则返回false
public boolean offer(E e) {
return offerLast(e);
}
可响应中断限时阻塞队尾入队-boolean offer(E,long,TimeUnit)
- 底层调用
offerLast(e, timeout, unit)
,即 可响应中断限时阻塞队尾入队, 向队尾插入元素
,如果队列未满则插入返回true,如果队列已满,则阻塞指定时间
,如果超时未写入就返回false。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
return offerLast(e, timeout, unit);
}
八.出队
1.Dueue出队方法
队首出队核心方法-E unlinkFirst()
- 断开
队首
出队节点与其后继节点的链接,队首为空的情况则返回null。否则唤醒一个正在等待的入队线程,并返回出队节点元素值
private E unlinkFirst() {
//获取原队首节点(即出队节点)
Node<E> f = first;
//队列为空,直接返回null退出
if (f == null)
return null;
//获取原队首节点的后继节点(即出队节点的后继节点,原队首节点出队成功后,该节点就是队首)
Node<E> n = f.next;
//保存原队首节点要返回的节点值(即出队节点的节点值)
E item = f.item;
//-----先逻辑删除,再next指向自身-------
//将原队首节点的节点值item置为null, null表示该节点已被移除(即出队节点的节点值置空)
f.item = null;
//断开出队节点与其后继节点的链接,原队首节点的后继节点指向自己, 即出队节点后继节点next指向自身,以区别于队尾节点(next为null)
f.next = f; // help GC
//更新现在队首为原队首节点的后继节点
first = n;
//队首出队节点的后继节点为空,直接返回null,说明如果队列之前只有一个节点,更新last为null
if (n == null)
last = null;
//队首出队节点的后继节点不为空,说明如果队列之前不只有一个节点,断开队首出队节点与其后继节点next的链接
else
n.prev = null;
//更新元素计数
--count;
//唤醒一个等待的入队线程
notFull.signal();
//返回出队节点的元素值
return item;
}
- 从
队尾出队first节点
,一定会将next指针指向自身
,以区别于非出队时first指向具体节点
(即next指针不会指向自身,可能为真实节点或null)。
初始:
image.png
删除队首节点:
image.png
队尾出队核心方法- E unlinkLast()
断开队尾出队节点与其前驱节点的链接,队列为空的情况则返回null。否则唤醒一个正在等待的入队线程,并返回出队节点元素值
private E unlinkLast() {
//获取原队尾节点(即出队节点)
Node<E> l = last;
//队列为空,直接返回null退出
if (l == null)
return null;
//获取原队尾节点的前驱节点(即出队节点的前驱节点,原队尾节点出队成功后,该节点就是队尾)
Node<E> p = l.prev;
//保存原队尾节点要返回的节点值(即出队节点的节点值)
E item = l.item;
//-----先逻辑删除,再prev 指向自身-------
//将原队尾节点的节点值item置为null, null表示该节点已被移除(即出队节点的节点值置空)
l.item = null;
//断开出队节点与其前驱节点的链接,原尾节点的前驱节点指向自己, 即出队节点前驱节点next指向自身,以区别于队首节点(prev为null)
l.prev = l; // help GC
//更新队尾为原队尾节点的前驱节点prev
last = p;
//队尾出队节点的前驱节点为空,直接返回null,,说明如果队列之前只有一个节点,更新first为null
if (p == null)
first = null;
//队尾出队节点的前驱节点不为空,说明如果队列之前不只有一个节点,断开队尾出队节点与其前驱节点next的链接
else
p.next = null;
//更新元素计数
--count;
//唤醒一个等待的入队线程
notFull.signal();
//返回出队节点的元素值
return item;
}
从队尾出队last节点,一定会将prev指针指向自身,以区别于非出队时last指向具体节点(即prev指针不会指向自身,可能为真实节点或null)。
初始:
image.png
删除队尾节点:
image.png
出队核心方法-void unlink(Node x)
- 该方法通过判断入参Node的prev、next来决定是队首出队、队尾出队、还是队中出队
void unlink(Node<E> x) {
//获取出队节点的前驱节点
Node<E> p = x.prev;
//获取出队节点的后继节点
Node<E> n = x.next;
//当前节点的前驱节点为空,说明为队首节点出队
if (p == null) {
unlinkFirst();
//当前节点前驱节点不为空,但是后继节点为空,说明为队尾节点出队
} else if (n == null) {
unlinkLast();
}
//当前节点的前驱、后继节点均不为空,说明为中间节点出队,需要断开当前节点与前驱后继节点的链接,同时当前节点的前驱、后继节点链上
// 这里x的prev和next指针都没有改变,因为他们可能在被iterator使用
else {
//x的前驱节点断开与x的链接(前节点的下一个节点就是x): 出队节点的前驱节点的next 指向 原出队节点的后继节点n
p.next = n;
//x的后继节点断开与x的链接(后节点的前一个节点就是x): 出队节点的后继节点的prev指向 原出队节点的前驱节点n
n.prev = p;
//将出队节点值item置为null, null表示该节点已被移除(即出队节点的节点值置空)
x.item = null;
//更新元素计数
--count;
//唤醒一个等待的入队线程
notFull.signal();
}
}
可响应中断阻塞式队首出队-E takeFirst()
- 从
队首获取并移除元素
,如果队列为空则阻塞当前线程直到队列不为空,退出阻塞时返回获取的元素。- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
InterruptedException异常
并返回。
- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
E x;
//如果队首节点为空,直接返回null,然后进入阻塞状态(不断自旋,直到出队成功)
//否则断开队首出队节点与其后继节点的链接,然后唤醒一个正在等待的入队线程,并返回出队节点元素值
while ( (x = unlinkFirst()) == null)
notEmpty.await();//出队线程阻塞
return x;//返回队首出队元素
} finally {
lock.unlock();//释放锁
}
}
可响应中断阻塞式队尾出队-E takeLast()
- 从
队尾获取并移除元素
,如果队列为空则阻塞当前线程直到队列不为空,退出阻塞时返回获取的元素。- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
InterruptedException异常
并返回。
- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
E x;
//如果队尾节点为空,直接返回null,然后进入阻塞状态(不断自旋,直到出队成功)
//否则从断开与队尾出队节点与其前驱节点的链接,然后唤醒一个正在等待的入队线程,并返回出队节点元素值
while ( (x = unlinkLast()) == null)
notEmpty.await();//出队线程阻塞
return x;//返回队首出队元素
} finally {
lock.unlock();//释放锁
}
}
非阻塞式队首出队-E pollFirst()
- 从
队首获取并移除元素
,如果队列为空返回null,,否则返回队首元素值
public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//队尾为空,返回null,否则返回队首元素值
return unlinkFirst();
} finally {
lock.unlock();//释放锁
}
}
非阻塞式队尾出队-E pollLast()
- 从
队尾获取并移除元素
,如果队列为空返回null,,否则返回队尾元素值
public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//队尾为空,返回null,否则返回队尾元素值
return unlinkLast();
} finally {
lock.unlock();//释放锁
}
}
可响应中断限时阻塞队首出队- E pollFirst(long timeout, TimeUnit unit)
- 如果队列不为空则从
队首获取并移除元素
,如果队列为空,则阻塞指定时间
,如果超时获取就返回null- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
InterruptedException异常
并返回。
- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//加可中断锁
try {
E x;
//是否队首出队失败
while ( (x = unlinkFirst()) == null) {
//如果队首出队失败,且时间到了,直接返回null
if (nanos <= 0)
return null;
//阻塞当前线程指定纳秒数,并更新剩余时间
nanos = notEmpty.awaitNanos(nanos);
}
return x;//返回出队元素
} finally {
lock.unlock();//释放锁
}
}
可响应中断限时阻塞队尾出队- E pollLast(long timeout, TimeUnit unit)
- 如果队列不为空则从
队尾获取并移除元素
,如果队列为空,则阻塞指定时间
,如果超时获取就返回null- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
InterruptedException异常
并返回。
- 如果线程在阻塞时被其他线程设置了中断标志,则抛出
public E pollLast(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//加可中断锁
try {
E x;
//是否队尾出队失败
while ( (x = unlinkLast()) == null) {
//如果队尾出队失败,且时间到了,直接返回null
if (nanos <= 0)
return null;
//阻塞当前线程指定纳秒数,并更新剩余时间
nanos = notEmpty.awaitNanos(nanos);
}
return x;//返回出队元素
} finally {
lock.unlock();//释放锁
}
}
不移除元素队首出队-E peekFirst()
-
不移除元素
队首出队,队列为空返回null,否则返回元素值
public E peekFirst() {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//队首为空,返回null,否则返回元素值
return (first == null) ? null : first.item;
} finally {
lock.unlock();//释放锁
}
}
不移除元素队尾出队-E peekLast()
-
不移除元素
队尾出队,队列为空返回null,否则返回元素值
public E peekLast() {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//队尾为空,返回null,否则返回元素值
return (last == null) ? null : last.item;
} finally {
lock.unlock();//释放锁
}
}
非阻塞队首出队(抛异常)-E removeFirst()
- 队列不为空, 获取并移除队首元素,如果队列为空,
抛出NoSuchElementException异常
public E removeFirst() {
//非阻塞式队尾出队,如果队列为空,返回null,抛出NoSuchElementException异常
E x = pollFirst();
if (x == null) throw new NoSuchElementException();
return x;
}
非阻塞队尾出队(抛异常)–E removeLast()
- 队列不为空, 获取并移除队尾元素,如果队列为空,
抛出NoSuchElementException异常
public E removeLast() {
//非阻塞式队尾出队,如果队列为空,返回null,抛出NoSuchElementException异常
E x = pollLast();
if (x == null) throw new NoSuchElementException();
return x;
}
从队首向后移除指定元素-boolean removeFirstOccurrence(Object o)
- 从队首至队尾遍历(
从前往后
),移除通过equals判断相等的第一个元素,并返回true
public boolean removeFirstOccurrence(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//从队首至队尾遍历(从前往后)
for (Node<E> p = first; p != null; p = p.next) {
//equals判断相等
if (o.equals(p.item)) {
//移除元素,并返回true
unlink(p);
return true;
}
}
return false;
} finally {
lock.unlock();//释放锁
}
}
从队尾向前移除指定元素-boolean removeLastOccurrence(Object o)
- 从队尾至队首遍历(
从后往前
),移除通过equals判断相等的第一个元素,并返回true
public boolean removeLastOccurrence(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//从队尾至队首遍历(从后往前)
for (Node<E> p = last; p != null; p = p.prev) {
//equals判断相等
if (o.equals(p.item)) {
//移除元素,并返回true
unlink(p);
return true;
}
}
return false;
} finally {
lock.unlock();//释放锁
}
}
2.Queue出队方法
可响应中断阻塞式队首出队-E take()
- 底层调用takeFirst(),即 可响应中断阻塞式队首出队, 从
队首获取并移除元素
,如果队列为空则阻塞当前线程直到队列不为空,退出阻塞时返回获取的元素。
public E take() throws InterruptedException {
return takeFirst();
}
非阻塞式出队-E poll()
- 底层调用
pollFirst(e)
,即 非阻塞式队首出队, 从队首获取并移除元素
,如果队列为空返回null,,否则返回队首元素值
public E poll() {
return pollFirst();
}
阻塞式超时出队-E poll(timeout, unit)
- 底层调用
pollFirst(long timeout, TimeUnit unit)
,即 可响应中断限时阻塞队首出队,如果队列不为空则从队首获取并移除元素
,如果队列为空,则阻塞指定时间
,如果超时获取就返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return pollFirst(timeout, unit);
}
阻塞式出队-E peek()
- 底层调用
peekFirst()
,即 不移除元素队首出队,队列为空返回null,否则返回元素值
public E peek() {
return peekFirst();
}
移除元素- E remove()
- 底层调用
removeFirst()
,即 不移除元素队首出队, - 队列不为空, 获取并移除队首元素,如果队列为空,抛出NoSuchElementException异常
public E remove() {
return removeFirst();
}
九.迭代器
此迭代器是弱一致性的。因为即使节点被删除,迭代器也会照样返回被删除节点的item
。
- 弱一致性是因为
并发操作
。当迭代器遍历到某个位置后,你调用hasNext返回true
说明下一个节点存在。但之后有别人删除掉了你的这个节点,然后你再调用next()理论上来说我应该返回这个节点的item给你,但删除操作会使得节点的item为null,所以迭代器中必须使用E currentElement
提前保存。
private abstract class AbstractItr implements Iterator<E> {
Node<E> next;//提前保存下一次next()的节点
E nextItem;//提前保存下一次next()的节点值
private Node<E> lastRet;//用来实现remove
abstract Node<E> firstNode();//抽象方法,获得遍历起点
abstract Node<E> nextNode(Node<E> n);//抽象方法,根据迭代方式方向获得下一个节点
AbstractItr() {
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
//初始化就准备好next的两个成员
next = firstNode();
nextItem = (next == null) ? null : next.item;
} finally {
lock.unlock();
}
}
private Node<E> succ(Node<E> n) {
for (;;) {
//获取遍历的下一个节点
Node<E> s = nextNode(n);
//遍历到尽头了,返回null
if (s == null)
return null;
//没有遍历到尽头,并且是一个有效节点,则返回该节点
else if (s.item != null)
return s;
//如果下一个节点已经被unlink,则跳转到first
else if (s == n)
return firstNode();
//执行到这里,n有后继,但n的item为null,所以后移n,继续找
else
n = s;
}
}
void advance() {
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
next = succ(next);//得到后继
nextItem = (next == null) ? null : next.item;
} finally {
lock.unlock();
}
}
public boolean hasNext() {
return next != null;//是否存在下一个节点
}
public E next() {
if (next == null)
throw new NoSuchElementException();
lastRet = next;
E x = nextItem;
advance();//准备下一个next()的数据,这个函数才开始加锁
return x;
}
public void remove() {
Node<E> n = lastRet;
if (n == null)
throw new IllegalStateException();
lastRet = null;
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
if (n.item != null)//如果它还没有被删除
unlink(n);
} finally {
lock.unlock();
}
}
}
正向迭代器和反向迭代器
只需要实现2个抽象方法。
/** 正向迭代 */
private class Itr extends AbstractItr {
Node<E> firstNode() { return first; }
Node<E> nextNode(Node<E> n) { return n.next; }
}
/** 反向迭代 */
private class DescendingItr extends AbstractItr {
Node<E> firstNode() { return last; }
Node<E> nextNode(Node<E> n) { return n.prev; }
}
十总结
LinkedBlockingDeque是一个无界阻塞双端队列
,相比普通队列,主要是多了【队尾出队元素】/【队首入队元素】的功能。
-
LinkedBlockingQueue:
- FIFO;
-
读写锁分离
,分为2个ReentrantLock,2个对应的Condition来分别控制 出队和入队的并发安全与阻塞;
-
LinkedBlockingDeque:
- FIFO & FILO;
-
全局一把ReentrantLock
,2个Condition
来控制并发和阻塞。因为两端都可以入队出队,所以用一个全局锁
才能保证线程安全
;