J.U.C 阻塞队列(三) - LinkedBlockingQu
1 概述
LinkedBlockingQueue 是一个用单项链表实现的的线程安全的有界的阻塞队列。队列按照先进先出(FIFO)的原则对元素进行排序。
通过以下关键词分析我们来更深入理解ArrayBlockingQueue
1.1 如何理解“队列”
队列这个概念非常好理解。你可以把它想象成排队买票,先来的先买,后来的人只能站末尾,不允许插队。先进者先出,这就是典型的“队列”。
相对于栈只支持两个基本操作:入栈 push()和出栈 pop(),对于也只支持两个操作入队 enqueue(),放一个数据到队列尾部;出队 dequeue(),从队列头部取一个元素,因此队列跟栈一样,也是一种操作受限的线性表数据结构
image1.2 如何理解“线程安全的”
在多线程情况下,会有多个线程同时操作队列,这个时候就会存在线程安全问题,线程安全的队列我们叫作并发队列.
那如何实现一个线程安全的队列呢?
方式一
最简单直接的实现方式是直接在 enqueue()、dequeue() 方法上加锁,但是锁粒度大并发度会比较低,同一时刻仅允许一个存或者取操作。
方式二
利用 CAS 原子操作,可以实现非常高效的并发队列。
1.3 阻塞队列
阻塞队列其实就是在队列基础上增加了阻塞操作。简单来说,就是在队列为空的时候,从队头取数据会被阻塞。因为此时还没有数据可取,直到队列中有了数据才能返回;如果队列已经满了,那么插入数据的操作就会被阻塞,直到队列中有空闲位置后再插入数据,然后再返回。
1.4 有界队列
有界队列表示队列中存储数据是有限,如果队列满后在次向队列中添加数据会失败或阻塞。
2 实现一个"队列"
我们知道了,队列跟栈一样,也是一种抽象的逻辑存储结构。它具有先进先出的特性,支持在队尾插入元素,在队头删除元素。如果要想实现一个队列可以用数组来实现,也可以用链表来实现,用数组实现的队列叫作顺序队列,用链表实现的队列叫作链式队列。
2.1 链式队列
基于链表的实现,我们同样需要两个指针:head 指针和 tail 指针。它们分别指向链表的第一个结点和最后一个结点。
入队
tail->next= new_node, tail = tail->next;
出队
head = head->next
3 LinkedBlockingQueue源码解析
3.1 类结构
image3.2 实现原理
1 LinkedBlockingQueue时基于链表实现的队列
2 相对于ArrayBlockingQueue,链表实现的队列出队入队完全可以多线程执行,并不相互依赖。因而按时锁的原则来说锁的粒度越小越好,因而LinkedBlockingQueue使用了2把锁,分别针对出队入队各自分配一把锁(takeLock,putLock)来保证线程安全。
3 LinkedBlockingQueue内部存在着二个可重入的锁(takeLock,putLock),同时分别生成二个等待队列Condition(notFull,notEmpty) -- 生产者和消费者模式
-
当入队时首先获取入队的锁,并查看队列是否已经满了,如果满了则阻塞当前线程。如果没有满则入队成功,同时查看入队前的队列是否为空队列,如果是则释放出队等待队列中线程
-
当出队时首先获取出队的锁,并查看队列是否已经为空,如果为空则阻塞当前线程。如果不位空则出队成功,同时查看出队前的队列是否为满队列,如果是则释放入队等待队列中线程
3.3 核心属性
static class Node<E> {
//队列节点中保存元素
E item;
//当前节点的下一个节点
Node<E> next;
Node(E x) { item = x; }
}
/** 队列容量 */
private final int capacity;
/** 队列保存的元素个数 */
private final AtomicInteger count = new AtomicInteger();
/**
* 头部节点
*/
transient Node<E> head;
/**
* 尾部节点
*/
private transient Node<E> last;
/** 出队锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 出队等待队列 */
private final Condition notEmpty = takeLock.newCondition();
/** 入队锁c */
private final ReentrantLock putLock = new ReentrantLock();
/** 入队等待队列 */
private final Condition notFull = putLock.newCondition();
3.4 构造函数
//创建一个带有默认的(Integer.MAX_VALUE)容量队列
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//创建一个带有给定的(capacity)容量队列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
//创建一个带有默认的(Integer.MAX_VALUE)容量,它最初包含给定 collection 的元素,并以 collection 迭代器的遍历顺序添加元素。
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
//将指定容器中元素依此添加到队列中
final ReentrantLock putLock = this.putLock;
//获取putLock独占锁,成功返回,失败则阻塞
putLock.lock();
try {
int n = 0;
for (E e : c) {
//检查添加数据不能为null,NullPointerException
if (e == null)
throw new NullPointerException();
//添加collection超出,IllegalStateException
if (n == capacity)
throw new IllegalStateException("Queue full");
//添加到队列尾部
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
3.5 入队操作
将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛出 IllegalStateException。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
public boolean offer(E e) {
//添加元素为null,抛出异常
if (e == null) throw new NullPointerException();
// 如果“队列已满”,则返回false,表示插入失败。
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//获取putLock独占锁,成功返回,失败则阻塞
putLock.lock();
try {
if (count.get() < capacity) {
//将当前添加节点添加队列尾部
enqueue(node);
// 将“当前节点数量”+1,并返回“原始的数量”
c = count.getAndIncrement();
// 如果在插入元素之后,队列仍然未满,则唤醒notFull等待队列上等待线程。
if (c + 1 < capacity)
notFull.signal();
}
} finally {
// 释放“插入锁putLock”
putLock.unlock();
}
// 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty等待队列上等待线程
if (c == 0)
signalNotEmpty();
return c >= 0;
}
//将当前添加节点添加队列尾部
private void enqueue(Node<E> node) {
last = last.next = node;
}
//释放notEmpty等待队列中线程
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
//获取takeLock独占锁
takeLock.lock();
try {
//释放notEmpty等待队列中线程
notEmpty.signal();
} finally {
//释放takeLock独占锁
takeLock.unlock();
}
}
将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间,在成功时返回 true。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//添加元素为null,抛出异常
if (e == null) throw new NullPointerException();
//获取等待的时间
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//获取putLock独占锁,成功返回,失败则阻塞
putLock.lockInterruptibly();
try {
//如果队列已满,等待时间大于0。将当前线程添加notFull等待队列,并限时阻塞当前线程
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
//将当前添加节点添加队列尾部
enqueue(new Node<E>(e));
//将“当前节点数量”+1,并返回“原始的数量”
c = count.getAndIncrement();
// 如果在插入元素之后,队列仍然未满,则唤醒notFull等待队列上的等待线程。
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放“插入锁putLock”
putLock.unlock();
}
// 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty等待队列上的等待线程
if (c == 0)
signalNotEmpty();
return true;
}
put,该操作成功返回 true,失败则进入阻塞。
public void put(E e) throws InterruptedException {
//添加元素为null,抛出异常
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//获取putLock独占锁,成功返回,失败则阻塞,响应中断
putLock.lockInterruptibly();
try {
//如果队列已满。将当前线程添加notFull等待队列,并阻塞当前线程
while (count.get() == capacity) {
notFull.await();
}
//将当前添加节点添加队列尾部
enqueue(node);
//将“当前节点数量”+1,并返回“原始的数量”
c = count.getAndIncrement();
// 如果在插入元素之后,队列仍然未满,则唤醒notFull等待队列上等待线程。
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放“插入锁putLock”
putLock.unlock();
}
// 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty等待队列上等待线程
if (c == 0)
signalNotEmpty();
}
3.6 出队操作
- 获取并移除此队列的头,如果此队列为空,则返回 null。
public E poll() {
final AtomicInteger count = this.count;
//如果队列元素为空,返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
//获取putLock独占锁,失败则阻塞
takeLock.lock();
try {
//队列中存在元素
if (count.get() > 0) {
//获取队列首部元素
x = dequeue();
// 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。
c = count.getAndDecrement();
// 取出元素之后,队列仍不为空,则唤醒notFull等待队列上的等待线程
if (c > 1)
notEmpty.signal();
}
} finally {
//释放takeLock独占锁
takeLock.unlock();
}
//如果出队操作前队列时满的,则释放notFull等待队列中头部节点线程阻塞状态
if (c == capacity)
signalNotFull();
return x;
}
- 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
//获取等待的时间
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//如果队列为空,将当前线程添加到notEmpty等待队列,并限时阻塞当前线程
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//获取队列首部元素
x = dequeue();
// 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。
c = count.getAndDecrement();
// 取出元素之后,队列仍不为空,则唤醒notFull等待队列上的等待线程
if (c > 1)
notEmpty.signal();
} finally {
//释放takeLock独占锁
takeLock.unlock();
}
//如果出队操作前队列时满的,则释放notFull等待队列中头部节点线程阻塞状态
if (c == capacity)
signalNotFull();
return x;
}
- 获取并移除此队列的头部,在元素变得可用之前一直等待
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//如果队列为空,将当前线程添加notEmpty等待队列,并阻塞当前线程
while (count.get() == 0) {
notEmpty.await();
}
//获取队列首部元素
x = dequeue();
// 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。
c = count.getAndDecrement();
// 取出元素之后,队列仍不为空,则唤醒notFull等待队列上的等待线程
if (c > 1)
notEmpty.signal();
} finally {
//释放takeLock独占锁
takeLock.unlock();
}
//如果出队操作前队列时满的,则释放notFull等待队列中头部节点线程阻塞状态
if (c == capacity)
signalNotFull();
return x;
}
3.7 指定元素出队
- 查找我们需要遍历链表找到需要删除节点和前置节点
-
通过链表操作将删除节点出队
image
public boolean remove(Object o) {
if (o == null) return false;
//同时获取putLock,takeLock独占锁
fullyLock();
try {
//从头部节点开始向后遍历查找删除节点以及其前置前置节点
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
//将节点从链表中删除
//p表示将要删除的节点
//trail表示要删除的节点前置系欸但
unlink(p, trail);
return true;
}
}
return false;
} finally {
//同时释放putLock,takeLock独占锁
fullyUnlock();
}
}
//将节点从链表中删除
//p表示将要删除的节点
//trail表示要删除的节点前置节点
void unlink(Node<E> p, Node<E> trail) {
//将节点p从链表中删除
p.item = null;
trail.next = p.next;
//如果节点p是链表尾部,last指向trail
if (last == p)
last = trail;
//如果出队操作前队列时满的,则释放notFull等待队列中头部节点线程阻塞状态
if (count.getAndDecrement() == capacity)
notFull.signal();
}
3.8 查询操作
获取队列中元素数量
public int size() {
return count.get();
}
查看队列中剩余的位置
public int remainingCapacity() {
return capacity - count.get();
}
4 LinkedBlockingQueue使用
4.1 函数列表
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue。
LinkedBlockingQueue()
// 创建一个容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加。
LinkedBlockingQueue(Collection<? extends E> c)
// 创建一个具有给定(固定)容量的 LinkedBlockingQueue。
LinkedBlockingQueue(int capacity)
// 从队列彻底移除所有元素。
void clear()
// 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
int drainTo(Collection<? super E> c)
// 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 返回在队列中的元素上按适当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
boolean offer(E e)
// 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。
boolean offer(E e, long timeout, TimeUnit unit)
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回 null。
E poll()
// 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
E poll(long timeout, TimeUnit unit)
// 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
void put(E e)
// 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。
int remainingCapacity()
// 从此队列移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回队列中的元素个数。
int size()
// 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
E take()
// 返回按适当顺序包含此队列中所有元素的数组。
Object[] toArray()
// 返回按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()
示例
import java.util.*;
import java.util.concurrent.*;
/*
* LinkedBlockingQueue是“线程安全”的队列,而LinkedList是非线程安全的。
*
* 下面是“多个线程同时操作并且遍历queue”的示例
* (01) 当queue是LinkedBlockingQueue对象时,程序能正常运行。
* (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
*
* @author skywang
*/
public class LinkedBlockingQueueDemo1 {
// TODO: queue是LinkedList对象时,程序会出错。
//private static Queue<String> queue = new LinkedList<String>();
private static Queue<String> queue = new LinkedBlockingQueue<String>();
public static void main(String[] args) {
// 同时启动两个线程对queue进行操作!
new MyThread("ta").start();
new MyThread("tb").start();
}
private static void printAll() {
String value;
Iterator iter = queue.iterator();
while(iter.hasNext()) {
value = (String)iter.next();
System.out.print(value+", ");
}
System.out.println();
}
private static class MyThread extends Thread {
MyThread(String name) {
super(name);
}
@Override
public void run() {
int i = 0;
while (i++ < 6) {
// “线程名” + "-" + "序号"
String val = Thread.currentThread().getName()+i;
queue.add(val);
// 通过“Iterator”遍历queue。
printAll();
}
}
}
}