Java8 LinkedBlockingQueue 源码解析
LinkedBlockingQueue 链表阻塞队列
链表阻塞队列,顾名思义,也就是一个基于队列的阻塞式的链表实现,里面的代码写的很漂亮,生产者消费者模式在这个类中用的酣畅淋漓,其作者是大名鼎鼎的 Doug Lea,掌握这个类是比较重要的。里面很多实现基于锁,可以好好学习一下。
前世今生
继承自AbstractQueue 实现了 BlockingQueue,那我们就来看一下有哪些接口,这些是基础,要牢记

主要就是take(拿)和put(放),其余的offer,poll,add 有更高级的用法,稍后再说
使用场景
我查了一些资料,感觉和 MQ 有点联系,就是说我们可以使用这个东西进行解耦,或者负载均衡,比如说,有很多任务需要提交,我们可以把任务提交给 Queue,消费者负责处理消息,这个可以根据消费者的能力决定任务的执行效率,不会一下字任务过来而导致崩溃,讲道理,可以适合多生产者,多消费者模式,如果有这个,我们可以很好的进行解耦,负载均衡
实现原理大白话
属性
绝对的重头戏,上来就一堆干活,可以去补补 ReentrantLock 和 Condition 的知识,看一下,拿和取都用了一把锁,takeLock 对应的是 notEmpty Condition,putLock 对应的是 notFull Condition,
大致的意思就是,当你拿的时候,如果发现队列为空,则需要等待,当你放的时候,如果发现队列满了,也需要等待,这里的等待靠的就是notEmpty 和 notFull 的 await 方法,当你往队列添加元素成功后,则notEmpty.signal() 提醒消费者可以拿东西了,反之,当你取完一个东西之后,则 notFull.singal()提醒生产者可以放东西了。
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
还有一些其他的参数,容量以及对头队尾,默认容量为 Interger.MAX__VALUE,还有原子类的AtomicInterger 用于计数,保证原子性
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
函数
构造函数,head 和 tail 设置为空的节点对象,稍后在分析作用
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
put 函数,// 首先获取 putLock 的锁,保证putLock的线程安全性
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
// 构造一个新的节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 首先获取 putLock 的锁,保证putLock的线程安全性
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
// 如果队列已满,则等待消费者调用 notFull.signal()
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
// 原子加1
c = count.getAndIncrement();
// 如果此时队列没有满,调用 signal 更新 count
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
// 如果 c 为 0;这里的 c 为之前的值
// 也就是说,现在已经加1了,可以提示生产者
if (c == 0)
signalNotEmpty();
}
入队,很简单,写的很巧妙,学习一下
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
出队,手动GC
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
获取 takeLock,通知消费者可以消费了
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
顺便看一下,通知生产者可以放入队列了
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
拿数据,首先还是要获取拿的锁,如果count == 0 ,那么则说明没有数据,需要阻塞等待,想象一下,这时候又想放入数据,那么将阻塞在takeLock里面,知道生产者调用了signal,执行出队。
注意一个问题,由于多线程的不定性,每次c > 1时,都需要再次signal,由于,多个消费者到会阻塞到await,而不是外面的 lockInterruptibly,所以每次都需要 signal,更新 count 的状态
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 阻塞,请注意,多个消费者都会阻塞在这里
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
// 注意,获取到更新前的值
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果之前队列满了,那么现在不会满了,提示生产者可以生产了
if (c == capacity)
signalNotFull();
return x;
}
这个方法如果没有数据直接返回空
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
同样的,这个方法利用了condition 的延迟性,可等待一定时间,如果超过这个时间还没有结果则返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
放入数据,同样,如果等待一定时间还不能放入,那么将抛弃
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
为了让大家更加明白 ReentrantLock,我这里给出一个例子供大家学习
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Create by xuantang
* @date on 8/22/18
*/
public class ReentrantLockDemo {
private static ReentrantLock mLock = new ReentrantLock();
private static Condition mCondition = mLock.newCondition();
public static void main(String[] args) {
new WaitThread("waiter one").start();
new WaitThread("waiter two").start();
new WaitThread("waiter three").start();
new NotifyThread("notify one").start();
}
static class WaitThread extends Thread {
WaitThread(String name) {
super(name);
}
@Override
public void run() {
try {
mLock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(this.getName() + " Waiting......");
mCondition.await();
System.out.println(this.getName() + " Finished.....");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mLock.unlock();
}
}
}
static class NotifyThread extends Thread {
NotifyThread(String name) {
super(name);
}
@Override
public void run() {
try {
mLock.lockInterruptibly();
mCondition.signal();
System.out.println(this.getName() + " Notify.....");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mLock.unlock();
}
}
}
}
输入结果,只能唤醒一个,当然你可以使用 signalAll() 唤醒所有的
waiter one Waiting......
waiter two Waiting......
waiter three Waiting......
notify one Notify.....
waiter one Finished.....
小结
我们看到了可重入锁,Condition的高级用法利用,以及生产者消费者模型,通过了解源码,我们更加深入的学习到了这个模型的用法以及实现。