Condition源码分析
我们先来看看Condtion类。
public interface Condition {
//当前线程在接收到信号或被中断之前一直处于等待状态
void await() throws InterruptedException;
//当前线程在接到信号之前一直处于等待状态
void awaitUninterruptibly();
//当前线程在接收到信号,被中断或到达指定等待时间之前一直处于等待状态
long awaitNanos(long nanosTimeout) throws InterruptedException;
//当前线程在接收到信号,被中断或到达指定等待时间之前一直处于等待状态
boolean await(long time, TimeUnit unit) throws InterruptedException;
//当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒一个等待线程
void signal();
//唤醒所有等待线程
void signalAll();
}
我们先来看个例子。
class ConditionDemo {
private static Lock mLock=new ReentrantLock();
private static Condition condition=mLock.newCondition();
public static void main(String[] args) {
Thread threadWait=new Thread(() -> {
mLock.lock();
try{
System.out.println(Thread.currentThread().getName()+"正在运行");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+"停止运行");
condition.await();
}catch (Exception ex){
ex.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"获得一个Signal信号继续执行");
mLock.unlock();
},"WaitThread");
threadWait.start();
try {
Thread.sleep(1000); //保证线程threadWait先执行
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread threadSignal=new Thread(() -> {
mLock.lock();
try {
System.out.println(Thread.currentThread().getName()+"正在运行");
condition.signal();//发送信息 唤醒其他线程
System.out.println(Thread.currentThread().getName()+"发送一个signal");
System.out.println(Thread.currentThread().getName()+"发送一个signal后,结束");
}catch (Exception e){
e.printStackTrace();
}
mLock.unlock();
},"SignalThread");
threadSignal.start();
}
}
运行结果:
WaitThread正在运行
WaitThread停止运行
SignalThread正在运行
SignalThread发送一个signal
SignalThread发送一个signal后,结束
WaitThread获得一个Signal信号继续执行
我们来分析下:
- 当WaitThread拿到锁之后,开始执行,当调用condition.await()方法之后,WaitThread开始睡眠并释放锁
- WaitThread开始睡眠并释放锁之后,SignalThread拿到锁,拿到锁之后开始运行,并调用condition.signal()发射一个信号来唤醒正在等待此条件condition的线程。发射信号之后 SignalThread会继续执行,执行完毕后SignalThread释放锁。
- 当SignalThread释放锁之后,WaitThread拿到锁开始继续运行直到结束
从上面的分析可以得知:Condition是一个多线程协调通信的一个工具类。使得某个或者某些线程一起等待某个条件,只有当该条件具备(signal或者SignalAll方法被调用)时,这些等待线程才会被唤醒,从而重新争夺锁。
看了上面的例子,你可能会又这样的疑问:当WaitThread拿到锁之后开始工作,然后调用condition.await()方法开始睡眠等待信号的到达。但是没有看见此线程释放锁啊,当SignalThread发出signal信号且释放锁之后也没有看见它重新获取锁啊??
下面我们来分析下:
我们都知道,ReetrantLock是独占锁,一个线程拿到锁之后如果不释放,那么另外一个线程肯定是拿不到锁的,所以在lock.lock()和lock.unlock()之间可能又一次释放锁的操作(同样也必然会有一个获取锁的操作)。我们回头看下代码,WaitThread在进入lock.lock()后唯一可能释放锁的操作就是await()了。也就是说await()操作实际上就是释放锁,然后挂起线程,一旦条件满足就被唤醒,再次获取锁。
下面我们从源码来分析下:
ReentrantLock类中的newCondtion方法的代码如下:
public Condition newCondition() {
return sync.newCondition();
}
此方法直接调用了AQS的实现类Sync中的newCondition()方法.
Sync类中的newCondition()方法的代码如下:
final ConditionObject newCondition() {
return new ConditionObject();
}
直接new了一个ConditionObject类的对象。ConditionObject类是Conditon的实现类,ConditionObject是AQS同步器中的一个内部类。
因此,在前面的例子中当调用condition.await方法时,就是调用的ConditionObject类中的await()方法。
下面我们就开始分析这个await方法的内部实现。
public final void await() throws InterruptedException {
if (Thread.interrupted())//判断当前线程时否被中断
throw new InterruptedException();
//将当前线程作为内容构造的节点node放入到条件队列中并返回此节点
Node node = addConditionWaiter();
//释放当前线程所拥有的锁,返回值为AQS的状态位
int savedState = fullyRelease(node);
int interruptMode = 0;
//检测此节点是否在同步队列上,如果不在,说明此线程还没有资格竞争锁,此线程就继续挂起睡觉。
//直到检测到此节点在同步队列上(在有线程发出signal信号的时候)
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//并检测此线程有没有被中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//此线程尝试获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清理条件队列中不是在等待条件的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//报告异常
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
//CONDITION 值为-2 表示当前节点在等待condition,
//也就是 在condition队列中,如果此节点的状态不是CONDITION,
//则需要将此节点在条件队列中移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter; //获取最后一个在等待的节点
}
//将此线程作为内容构造一个节点加入到条件队列末尾
Node node = new Node(Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
final int fullyRelease(Node node) {
try {
int savedState = getState();
if (release(savedState))//释放锁
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
node.waitStatus = Node.CANCELLED;
throw t;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
如果一个节点刚开始在条件队列上,现在在同步队列上获取锁则返回true
*/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
/**
将waitStatus不是CONDITION的节点全部删除
**/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
await方法大概意思就是:首先将此代表该当前线程的节点加入到条件队列中去,然后释放该线程所有的锁并开始睡眠,最后不停的检测AQS队列中是否出现了此线程节点,如果收到signal信号之后就会在AQS队列中检测到,检测到之后,说明此线程又参与了竞争锁。
回到上面的例子,锁被释放后,线程WaitThread开始睡眠,这个时候线程因为线程WaitThread沉睡时调用fullyRelease方法释放锁,接着会唤醒AQS队列中的头节点,所以线程SignalThread开始竞争锁,并获取到锁,然后开始工作,线程SignalThread调用signal方法,发送signal信号。
我们来看下signal方法
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
//检测当前线程是否为拥有锁的独占线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//firstWait为condition自己维护的一个链表头节点
//取出第一个节点后开始唤醒操作
Node first = firstWaiter;
if (first != null)
doSignal(first); //开始唤醒
}
说明下,其实Condition内部维护了等待队列的头节点和尾节点,该队列的作用时存放等待signal信号的线程,该线程被封装为Node节点后存放于此。
下面为ConditonObject类中维护等待队列的头节点和尾节点的声明。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
..............
}
这里又出现了一个条件队列,可能我们就有点晕了,了解AQS同步器的都知道,这个类中还维护着一个队列,AQS自己维护的队列时当前等待资源(这里的资源就是锁)的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行,直到队列为空。
而Condition自己也维护了一个队列,该队列的作用时维护一个等待signal信号的队列,两个队列的作用时不同的,事实上,每个线程也仅仅会同时存在以上个队列中的一个,流程时这样:
用上面的例子的两个线程来描述
1、首先,线程WaitThread调用lock.lock()时,由于此时锁并没有被其它线程占用,因此线程WaitThread直接获得锁并不会进入AQS同步队列中进行等待。
2、在线程WaitThread执行期间,线程SignalThread调用lock.lock()时由于锁已经被线程WaitThread占用,因此,线程SignalThread进入AQS同步队列中进行等待。
3、在线程WaitThread中执行condition.await()方法后,线程WaitThread释放锁并进入条件队列Condition中等待signal信号的到来。
4、线程SignalThread,因为线程WaitThread释放锁的关系,会唤醒AQS队列中的头结点,所以线程SignalThread会获取到锁。
5、线程SignalThread调用signal方法,这个时候Condition的等待队列中只有线程WaitThread一个节点,于是它被取出来,并被加入到AQS的等待队列中。注意,这个时候,线程WaitThread并没有被唤醒。只是加入到了AQS等待队列中去了
6、待线程SignalThread执行完成之后并调用lock.unlock()释放锁之后,会唤醒此时在AQS队列中的头结点.所以线程WaitThread开始争夺锁(由于此时只有线程WaitThread在AQS队列中,因此没人与其争夺),如果获得锁继续执行。
直到线程WaitThread释放锁整个过程执行完毕。
可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。
然后我们继续看doSignal方法
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
//修改头节点,完成旧头节点的移出工作
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
doSignal(Node first)方法干了两件事:第一件事为修改条件队列中的头节点,第二件事为完成旧的头节点的移出工作,即从condition队列中移出到AQS同步队列中去。
节点的移出工作是调用transferForSignal(Node node)来完成的。
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
从条件队列中转移一个节点到同步队列中去
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
如果不能改变waitStatus的值 则说明此节点已经被取消了
*/
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);//将节点加入到同步队列中去
int ws = p.waitStatus;
//如果节点p的状态为cancel 或者修改waitStatus失败 则直接唤醒
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
可以看到,正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这个判断是不会为true的,所以,不会在这个时候唤醒该线程。
只有到发送signal信号的线程调用reentrantLock.unlock()后,因为它已经被加到AQS的等待队列中,所以才可能会被唤醒。
以上就是关于Condition的相关知识。