死磕java concurrent包系列(三)基于Reentra
基于Codition分析AQS的条件队列
首发于:https://juejin.im/post/5c053e546fb9a049fc034924
前言
上一篇我们讲了AQS中的同步队列队列,现在我们研究一下条件队列。
在java中最常见的加锁方式就是synchorinzed和Reentrantlock,我们都说Reentrantlock比synchorinzed更加灵活,其实就灵活在Reentrantlock中的条件队列的用法上。
Condition接口
它是在java1.5中引入的一个接口,主要是为了替代object类中的wait、notify方法,以一种更灵活的方式解决线程之间的通信问题:
public interface Condition {
/**
* 使当前线程进入等待状态直到被通知(signal)或中断
* 当其他线程调用singal()或singalAll()方法时,该线程将被唤醒
* 当其他线程调用interrupt()方法中断当前线程
* await()相当于synchronized等待唤醒机制中的wait()方法
*/
void await() throws InterruptedException;
//当前线程进入等待状态,直到被唤醒,该方法不响应中断要求
void awaitUninterruptibly();
//调用该方法,当前线程进入等待状态,直到被唤醒或被中断或超时
//其中nanosTimeout指的等待超时时间,单位纳秒
long awaitNanos(long nanosTimeout) throws InterruptedException;
//同awaitNanos,但可以指明时间单位
boolean await(long time, TimeUnit unit) throws InterruptedException;
//调用该方法当前线程进入等待状态,直到被唤醒、中断或到达某个时
//间期限(deadline),如果没到指定时间就被唤醒,返回true,其他情况返回false
boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须
//获取与Condition相关联的锁,功能与notify()相同
void signal();
//唤醒所有等待在Condition上的线程,该线程从等待方法返回前必须
//获取与Condition相关联的锁,功能与notifyAll()相同
void signalAll();
}
最重要的是await方法使线程进入等待状态,再通过signal方法唤醒。接下来我们结合实际例子分析。
Condition可以解决什么问题
假设有一个生产者-消费者的场景:
1、生产者有两个线程产生烤鸡;消费者有两个线程消费烤鸡
2、四个线程一起执行,但同时只能有一个生产者线程生成烤鸡,一个消费者线程消费烤鸡。
3、只有产生了烤鸡,才能通知消费线程去消费,否则只能等着;
4、只有消费了烤鸡,才能通知生产者线程去生产,否则只能等着
于是乎,我们使用ReentrantLock控制并发,并使用它生成两组Condition对象,productCondition和consumeCondition:前者控制生产者线程,后者控制消费者线程。当isHaveChicken为true时,代表烤鸡生成完毕,生产线程必须进入等待状态同时唤醒消费线程进行消费,消费线程消费完毕后将flag设置为false,代表烤鸡消费完成,进入等待状态,同时唤醒生产线程生产烤鸡。。。。。。
package com.springsingleton.demo.Chicken;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ChikenStore {
ReentrantLock reentrantLock = new ReentrantLock();
Condition productCondition = reentrantLock.newCondition();
Condition consumeCondition = reentrantLock.newCondition();
private int count = 0;
private volatile boolean isHaveChicken = false;
//生产
public void ProductChicken() {
reentrantLock.lock();
while (isHaveChicken) {
try {
System.out.println("有烤鸡了" + Thread.currentThread().getName() + "不生产了");
productCondition.await();
} catch (Exception e) {
System.out.println("error" + e.getMessage());
}
}
count++;
System.out.println(Thread.currentThread().getName() + "产生了第" + count + "个烤鸡,赶紧开始卖");
isHaveChicken = true;
consumeCondition.signal();
reentrantLock.unlock();
}
public void SellChicken() {
reentrantLock.lock();
while (!isHaveChicken) {
try {
System.out.println("没有烤鸡了" + Thread.currentThread().getName() + "不卖了");
consumeCondition.await();
} catch (Exception e) {
System.out.println("error" + e.getMessage());
}
}
count--;
isHaveChicken = false;
System.out.println(Thread.currentThread().getName() + "卖掉了第" + count + 1 + "个烤鸡,赶紧开始生产");
productCondition.signal();
reentrantLock.unlock();
}
public static void main(String[] args) {
ChikenStore chikenStore = new ChikenStore();
new Thread(() -> {
Thread.currentThread().setName("生产者1号");
while (true) {
chikenStore.ProductChicken();
}
}).start();
new Thread(() -> {
Thread.currentThread().setName("生产者2号");
for (; ; ) {
chikenStore.ProductChicken();
}
}).start();
new Thread(() -> {
Thread.currentThread().setName("消费者1号");
while (true) {
chikenStore.SellChicken();
}
}).start();
new Thread(() -> {
Thread.currentThread().setName("消费者2号");
while (true) {
chikenStore.SellChicken();
}
}).start();
}
}
输出:
生产者1号产生了第1个烤鸡,赶紧开始卖
有烤鸡了生产者1号不生产了
有烤鸡了生产者2号不生产了
消费者1号卖掉了第01个烤鸡,赶紧开始生产
没有烤鸡了消费者1号不卖了
生产者1号产生了第1个烤鸡,赶紧开始卖
有烤鸡了生产者1号不生产了
消费者1号卖掉了第01个烤鸡,赶紧开始生产
没有烤鸡了消费者1号不卖了
没有烤鸡了消费者2号不卖了
生产者2号产生了第1个烤鸡,赶紧开始卖
有烤鸡了生产者2号不生产了
消费者1号卖掉了第01个烤鸡,赶紧开始生产
没有烤鸡了消费者1号不卖了
生产者1号产生了第1个烤鸡,赶紧开始卖
有烤鸡了生产者1号不生产了
消费者2号卖掉了第01个烤鸡,赶紧开始生产
没有烤鸡了消费者2号不卖了
如果用synchorinzed的话:
package com.springsingleton.demo.Chicken;
public class ChickenStoreSync {
private int count = 0;
private volatile boolean isHaveChicken = false;
public synchronized void ProductChicken() {
while (isHaveChicken) {
try {
System.out.println("有烤鸡了" + Thread.currentThread().getName() + "不生产了");
this.wait();
} catch (Exception e) {
System.out.println("error" + e.getMessage());
}
}
count++;
System.out.println(Thread.currentThread().getName() + "产生了第" + count + "个烤鸡,赶紧开始卖");
isHaveChicken = true;
notifyAll();
}
public synchronized void SellChicken() {
while (!isHaveChicken) {
try {
System.out.println("没有烤鸡了" + Thread.currentThread().getName() + "不卖了");
this.wait();
} catch (Exception e) {
System.out.println("error" + e.getMessage());
}
}
count--;
isHaveChicken = false;
System.out.println(Thread.currentThread().getName() + "卖掉了第" + count + 1 + "个烤鸡,赶紧开始生产");
notifyAll();
}
public static void main(String[] args) {
ChickenStoreSync chikenStore = new ChickenStoreSync();
new Thread(() -> {
Thread.currentThread().setName("生产者1号");
while (true) {
chikenStore.ProductChicken();
}
}).start();
new Thread(() -> {
Thread.currentThread().setName("生产者2号");
for (; ; ) {
chikenStore.ProductChicken();
}
}).start();
new Thread(() -> {
Thread.currentThread().setName("消费者1号");
while (true) {
chikenStore.SellChicken();
}
}).start();
new Thread(() -> {
Thread.currentThread().setName("消费者2号");
while (true) {
chikenStore.SellChicken();
}
}).start();
}
}
如上代码,在调用notify()或者 notifyAll()方法时,由于synchronized等待队列中同时存在生产者线程和消费者线程,所以我们并不能保证被唤醒的到底是消费者线程还是生产者线程,而Codition则可以避免这种情况。
AQS中Condition的实现原理
Condition的具体实现类是AQS的内部类ConditionObject,前面我们分析过AQS中存在两种队列,一种是同步队列,一种是条件队列,而条件队列就相对于Condition而言的。注意在使用Condition前必须获得锁,同时在Condition的条件队列上的结点与前面同步队列的结点是同一个类即Node,其结点的waitStatus的值为CONDITION。在实现类ConditionObject中有两个结点分别是firstWaiter和lastWaiter,firstWaiter代表等待队列第一个等待结点,lastWaiter代表等待队列最后一个等待结点
public class ConditionObject implements Condition, java.io.Serializable {
//等待队列第一个等待结点
private transient Node firstWaiter;
//等待队列最后一个等待结点
private transient Node lastWaiter;
//省略.......
}
每个Condition都对应着一个条件队列,也就是说如果一个锁上创建了多个Condition对象,那么也就存在多个等待队列。等待队列是一个FIFO的队列,在队列中每一个节点都包含了一个线程的引用,而该线程就是Condition对象上等待的线程。当一个线程调用了await()相关的方法,那么该线程将会释放锁,并构建一个Node节点封装当前线程的相关信息加入到条件队列中进行等待,直到被唤醒、中断、超时才从队列中移出。Condition中的等待队列模型如下
image正如图所示,Node节点的数据结构,在等待队列中使用的变量与同步队列是不同的,Condtion中等待队列的结点只有直接指向的后继结点并没有指明前驱结点,而且使用的变量是nextWaiter而不是next,这点我们在前面分析结点Node的数据结构时讲过。firstWaiter指向条件队列的头结点,lastWaiter指向条件队列的尾结点,等待队列中结点的状态只有两种即CANCELLED和CONDITION,前者表示线程已结束需要从等待队列中移除,后者表示条件结点等待被唤醒。
再次强调每个Codition对象对于一个等待队列,也就是说AQS中只能存在一个同步队列,但可拥有多个等待队列。下面从代码层面看看被调用await()方法(其他await()实现原理类似)的线程是如何加入等待队列的,而又是如何从等待队列中被唤醒的。
public final void await() throws InterruptedException {
//判断线程是否被中断
if (Thread.interrupted())
throw new InterruptedException();
//创建新结点加入等待队列并返回
Node node = addConditionWaiter();
//释放当前线程锁即释放同步状态
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断结点是否同步队列(SyncQueue)中,即是否被唤醒
while (!isOnSyncQueue(node)) {
//挂起线程
LockSupport.park(this);
//判断是否被中断唤醒,如果是退出循环。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被唤醒后执行自旋操作争取获得锁,同时判断线程是否被中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// clean up if cancelled
if (node.nextWaiter != null)
//清理等待队列中不为CONDITION状态的结点
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
再看看addConditionWaiter方法,添加到等待队列:
private Node addConditionWaiter() {
Node t = lastWaiter;
// 判断是否为结束状态的结点并移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建新结点状态为CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//加入等待队列
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
await()方法主要做了3件事:
一是调用addConditionWaiter()方法将当前线程封装成node结点加入等待队列。
二是调用fullyRelease(node)方法释放同步状态并唤醒后继结点的线程。
三是调用isOnSyncQueue(node)方法判断结点是否在同步队列中,注意是个while循环,如果同步队列中没有该结点就直接挂起该线程,需要明白的是如果线程被唤醒后就调用acquireQueued(node, savedState)执行自旋操作争取锁,即当前线程结点从等待队列转移到同步队列并开始努力获取锁。
接下来看看Singnal
public final void signal() {
//判断是否持有独占锁,如果不是抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//唤醒等待队列第一个结点的线程
if (first != null)
doSignal(first);
}
这里signal()
方法做了两件事:
一是判断当前线程是否持有独占锁,没有就抛出异常,从这点也可以看出只有独占模式(例如ReentrantLock)先采用等待队列,而共享模式(例如Semaphor)下是没有等待队列的,也就没法使用Condition。
二是唤醒等待队列的第一个结点,即执行doSignal(first)
private void doSignal(Node first) {
do {
//移除条件等待队列中的第一个结点,
//如果后继结点为null,那么说没有其他结点将尾结点也设置为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//如果被通知节点没有进入到同步队列并且条件等待队列还有不为空的节点,则继续循环通知后续结点
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//transferForSignal方法
final boolean transferForSignal(Node node) {
//尝试设置唤醒结点的waitStatus为0,即初始化状态
//如果设置失败(compareAndSetWaitStatus返回false),说明当期结点node的waitStatus已不为
//CONDITION状态,那么只能是结束状态了,因此返回false
//返回doSignal()方法中继续唤醒其他结点的线程,注意这里并
//不涉及并发问题,所以CAS操作失败只可能是预期值不为CONDITION,
//而不是多线程设置导致预期值变化,毕竟操作该方法的线程是持有锁的。
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)){
return false;
}
//加入同步队列并返回前驱结点p
Node p = enq(node);
int ws = p.waitStatus;
//判断前驱结点是否为结束结点(CANCELLED=1)或者在设置
//前驱节点状态为Node.SIGNAL状态失败时,唤醒被通知节点代表的线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){
//唤醒node结点的线程
LockSupport.unpark(node.thread);
return true;
}
}
doSignal(first)方法中做了2件事:
一是从条件队列移除被唤醒的节点,然后重新维护条件条件队列的firstWaiter和lastWaiter的指向。
二是将从条件队列移除的结点加入同步队列(在transferForSignal()方法中完成的),如果进入到同步队列失败并且条件队列还有不为空的节点,则继续循环唤醒后续其他结点的线程。
总结:
signal()被调用后,先判断当前线程是否持有独占锁,如果有,那么唤醒当前Condition对象中条件队列的第一个结点的线程,并从条件队列中移除该结点,移动到同步队列中,如果加入同步队列失败(此时只有可能线程被取消),那么继续循环唤醒条件队列中的其他结点的线程,如果成功加入同步队列,那么如果其前驱结点是否已结束或者设置前驱节点状态为Node.SIGNAL状态失败,则通过LockSupport.unpark()唤醒被通知节点代表的线程,到此signal()任务完成,注意被唤醒后的线程,将从前面的await()方法中的while循环中退出,因为此时该线程的结点已在同步队列中,那么while (!isOnSyncQueue(node))将不在符合循环条件,进而调用AQS的acquireQueued()方法加入获取同步状态的竞争中,这就是等待唤醒机制的整个流程实现原理,流程如下图(注意无论是同步队列还是条件队列使用的Node数据结构都是同一个,不过是使用的内部变量不同罢了)
image