手写BlockingQueue及源码剖析(看了肯定不后悔,不看后
2019-08-17 本文已影响31人
可爱猪猪
作者:
可爱猪猪
-帅锅一枚
作者的网名很阔爱,如果喜欢本文章一定要点 喜欢 或者 打赏,拜托~
作者一直在进步,需要你们的支持和鼓励,谢谢!
人生理想:在程序猿界混出点名堂!
出来混不容易,有钱的出个赏钱,没钱的点个喜欢❥(^_-),实在没钱加个粉丝也可以^_^
今天虽然作者吹着小风扇,热的汗流浃背的,但是还是要跟大家交流一下jdk的BlockingQueue的实现及一些底层原理,也是由于最近在看《JAVA并发编程实战》这一本书的第14章“构建自定义的同步工具”,推荐阅读此书
,在JDK中,作者在其他文章中也提到有ArrayBlockingQueue和LinkedBlockingQueue等,今天主要从两个角度跟大家交流下代码和书中的俩个精髓点,好了还是废话不多说,坚持读下去,以下内容可是作者读了一个星期的书领悟出来的
,先看作者写的模拟代码:
/**
*
* @author 可爱猪猪
* @param <T>
*/
public class BoundedQueue<T> {
private int capicity;
private int count = 0;
private Object putLock = new Object();
private Object takeLock = new Object();
public BoundedQueue(int capicity) {
this.capicity = capicity;
}
public void put(T t) throws InterruptedException {
synchronized (putLock) {
if (isFull()) {
putLock.wait();
}
enqueue(t);
}
notifyTakeLock();
}
/**
* 队列中获取元素
* @throws InterruptedException
*/
public T take() throws InterruptedException {
T t;
synchronized (takeLock) {
while (isEmpty()) {
takeLock.wait();
}
t = dequeue();
}
notifyPutLock();
return t;
}
/**
* 唤醒putLock
*/
private void notifyPutLock() {
synchronized (putLock) {
putLock.notifyAll();
}
}
/**
* 唤醒takeLock
*/
private void notifyTakeLock() {
synchronized (takeLock) {
takeLock.notifyAll();
}
}
/**
* 入列
*/
private void enqueue(T t) {
// 省略入列逻辑
// ...
// 计算队列的大小
count++;
}
/**
* 出列
*/
public T dequeue() {
// 省略出列逻辑
// ...
// 计算队列的大小
count--;
return null;
}
/**
* 判断队列是否已满
* @return
*/
public boolean isFull() {
return count == capicity;
}
/**
* 判断队列是否为空
* @return
*/
public boolean isEmpty() {
return count == 0;
}
public static void main(String[] args) throws InterruptedException {
// 初始化capicity=2的有界队列
final BoundedQueue<String> q = new BoundedQueue<String>(2);
// 把队列撑满
q.put(null);
q.put(null);
// 已满,两个线程,让put方法等待
new Thread(() -> {
try {
q.put(null);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
q.put(null);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 唤醒put线程
new Thread(() -> {
try {
q.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}).start();
}
}
如果有BUG,一定评论告诉我哦。
作者总是思路很清晰的按套路出牌,好了,开始我的表演了:
这段代码是什么意思,main中的逻辑想证明什么
- 代码含义
一个有界队列、支持put(队列已满不允许put)、支持take(队列已为空不允许take)
- 想证明什么
书中给出条件等待的标准形式,如下,为什么要进行while循环呢?改成if有什么问题吗?作者带着疑问写了一开始的代码:
// 必须通过一个锁来保护条件谓词
synchronized(lock){
while(!conditionPredicat()){
lock.wait();
}
}
JDK实现也按照这个标准形式实现,如下:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 开始加锁
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
// 循环等待
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
// finally中释放锁
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
- 场景模拟
进入解答“为什么是while不是if?”的环节?
先看下作者在main中的准备的测试用例,逻辑如下图:
图上作者已经标明while和if的执行逻辑,
仔细琢磨6和7两步,作者的例子很具有代表性
。
总结,作者开篇提到收获到两点
- 为什么条件等待的标准是while不是if。上文已很好的解释了。
- 我觉得还有一点共性,锁的范围并没有覆盖到notify。也就是说为什么没有这样写呢?
synchronized(waitLock){
waitLock.wait();
// notifyTakeLock() ; 等于以下代码
synchronized(takeLock){
takeLock.notifyAll();
}
}
而是要这样写
synchronized(waitLock){
waitLock.wait();
// notifyTakeLock() ; 等于以下代码
}
synchronized(takeLock){
takeLock.notifyAll();
}
看过《JAVA并发编程实战》中其实有一章节“死锁”专门解释过,嵌套锁如果不保持锁的顺序一致性容易造成死锁,写锁的时候一定要注意减小锁的粒度和减少锁的调用及嵌套,避免死锁。
- 其实还有一点作者也想在这里说一下,作者的一个思考,大家可以去百度
也就是sychronized本来就是一个代码块锁定,为什么会有两个线程进入了synchronized代码块,而在wait处阻塞呢?留给大家一个疑问吧。