LinkedBlockingQueue
一、LinkedBlockingQueue是什么?
LinkedBlockingQueue是一个线程安全的阻塞队列,具有先进先出等特性。主要实现BlockingQueue接口,BlockingQueue接口继承自java.util.Queue接口,并在这个接口的基础上增加了take和put方法。
二、线程安全?
在LinkedBlockingQueue的所有共享的全局变量 中,final声明的capacity在构造器生成实例时就成了不可变量了。而final声明的count由于是AtomicInteger类型的,所以能 够保证其操作的原子性。剩下的final的变量都是初始化成了不可变量,并且不包含可变属性,所以都是访问安全的。那么剩下的就是Node类型的head和 last两个可变量。所以要保证LinkedBlockingQueue是线程安全的就是要保证对head和last的访问是线程安全的 ,其中LinkedBloakingQueue是使用读和写两把锁来控制并发操作的。 如下:
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
三、链表的入队和出队
LInkedBlockingQueue中的链表,包含头指针和尾指针,其中:
头指针用来管理元素出队,和 take(), poll(), peek() 三个操作关联
尾指针用来管理元素入队,和 put(), offer() 两个操作关联
入队:
put()算法,为阻塞算法,直到队列有空余时,才能为队列加入新元素
offer()算法为非阻塞算法,如果队列已满,立即返回或等待一会再返回,通过返回值ture或false,标记本次入队操作是否成功;
出队:
take()算法为阻塞算法,直到队列有非空时,才将允许调用线程取出数据
poll()算法为非阻塞算法,如果队列为空,立即返回或等待一会再返回,通过返回值ture或false,标记本次出队操作是否成功
peek()算法比较特殊,只返回队列中的第一个元素,既不出队,也不阻塞,如果没有元素,就返回null
四、take和put
//出队
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//随时保证响应中断
try {
while (count.get() == 0) {
notEmpty.await();//队列为null时,,此时线程将处于阻塞状态,直到有新的元素
}
x = dequeue();//取出元素
c = count.getAndDecrement(); // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。
if (c > 1)
notEmpty.signal();//唤醒其他正在等待出队列的线程
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//入队
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();//随时保证响应中断
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();//队列满时,,此时线程将处于阻塞状态,直到队列有空闲的位置才继续执行
}
enqueue(node);// //让元素进入队列的末尾
c = count.getAndIncrement();//c是旧值,为了下面的c==0的判断。意思就是,之前没有元素,现在有了,其他线程赶紧来获取。
if (c + 1 < capacity)
notFull.signal();//唤醒其他正在等待入队列的线程
} finally {
putLock.unlock();
}
if (c == 0) /*当c=0时,当前队列为空队列,出队列的线程都处于等待状态,
现在新添加了一个新的元素,即队列不再为空,因此它会唤醒正在等待获取元素的线程。
*/
signalNotEmpty();
}
其中enqueue和dequeue
enqueue()的源码如下:将元素E添加到队列的末尾,设置node为新的尾节点!
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
dequeue()的源码如下:删除队列的头节点,并将表头指向“原头节点的下一个节点”。
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
问题:
补充两个线程去生产和消费的图
Lock的条件变量? c= count.getAndIncrement();?count原子性的理解