Condition源码解读
Condition是一个接口,其提供的就两个核心方法,await和signal方法。分别对应着Object的wait和notify方法。调用Object对象的这两个方法,需要在同步代码块里面,即必须先获取到锁才能执行这两个方法。同理,Condition调用这两个方法,也必须先获取到锁。只不过Object的锁是synchronized,而Condition是Lock锁。这里先声明两个概念:
- 等待队列:Condition上用来等待这个条件的队列,它是个单向队列
- 同步队列:就是AQS等待拿锁的CLH队列,它是个双向队列。
下文涉及这两个概念,一定要注意区分。
一、解读Condition的await方法
当然是先上源码:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
1.1 第一步是addConditionWaiter方法。
这个方法是为了构造等待Condition条件的等待队列。这个队列是单向链表,有别于等待Lock锁的同步队列(CLH队列,它是双向队列)。 这里一定要区别好这两个队列,虽然队列的元素都是Node实现的,但是两者的用途完全不一样。好了,我们看下这个方法:
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
其实这个方法很简单,看下最后的等待元素node是否已经取消了。如果取消了,那么就从队列里面清除。这里有个疑问:
1. 为啥要判断最后一个node,而不是第一个,或第二个,第三个等等?难道最后一个元素,它特别香吗
2. 为啥最后一个node取消了,unlinkCancelledWaiters要遍历所有等待队列,清除所有已取消元素?
1.2 第二步是fullyRelease方法
这个方法其实就是模拟Object.wait方法,用来释放锁的。如果重入过,要把它释放干净。所以看名字fullyRelease,起的挺好的很直观。源码我们也来看一下:
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
fullyRelease是通过getState,然后全部release掉。
1.3 第三步是isOnSyncQueue方法
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
这个方法,乍一看莫名其妙。刚刚上面不是说了,node是单向队列吗,怎么用node.prev来判空。WT?单向队列的prev不是必空吗。其实这里的Node节点,已经被偷换了。 这个方法的Node,如果是第一次调用,那么确实还是Condition等待队列上node,它是单向队列,且它的waitStqatus = -2。这个方法在第一行的判断逻辑一定是true的。但是第2次,第3次调用,这个Node就不是等待队列的Node的,而是同步队列的Node,它是双向的,它有prev和next指针。具体是怎么悄悄就被转换了,第二节的signal方法会解释清楚的。
咱们继续看,循环调用isOnSyncQueue方法,是怎么回事?
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
第一次调用,一定会进入进来(上面已经解释了,不在赘述)。这里的park会阻塞自己,等待其他线程的唤醒。现在假如其他线程唤醒了该线程,它首先检查自己在阻塞期间,是否被中断过。如果没有中断过,继续执行isOnSyncQueue方法。上面也说了,第二次调用的时候,这个Node元素就变成了同步队列元素。再回头先下isOnSyncQueue方法,node.waitStatus一定不再等于-2了,且node.prev必不为null。其实这里为啥要判断node.prev == null 我也还没搞懂。然后第二个判断条件,node.next != null,大概率不为空。
总结下:这个方法主要作用就是用来判断该node节点,没有加入到同步队列。为啥要判断有没有加入到同步队列呢?因为await的时候,你先释放了锁,现在唤醒了,你不是要重新获取锁嘛?和Object的wait效果一致。学习了!
1.4 第四步是acquireQueued方法
这个方法,其实就是被唤醒后,用来获取锁。其和Lock的流程就一模一样了,可以看以前的文章,有讲过,不在赘述。有个细节acquireQueued(node, savedState)的savedState,是之前fullyRealese留下来的,这里要全部拿回。释放了我多少,我就要拿回来多少。拿锁那一套,走Lock流程。
该方法后面又来一次unlinkCancelledWaiters,咋这么喜欢清理等待队列呢?
二、解读Condition的signal方法
上源码,不上源码我怎么讲。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
里面就一个核心方法doSignal。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
里面也是一个核心方法transferForSignal
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
这个方法的作用就是,把等待队列的Node元素,转换为同步队列的node元素。然后修改node的waitStatus 由 -2 变为0。
compareAndSetWaitStatus(node, Node.CONDITION, 0) 这个动作就要要告诉node节点,你已经等到condition啦,你可以安全的去竞争锁了。然后就把node 执行enq方法。这个enq方法是不是很熟悉,就是把node加入到CLH锁的同步队列里面。所以这里就是转换的关键,也是上文提到的Node在第一次和非第一次执行时的不同。转换结束后,看下p节点(p是node的前置节点)。如果p节点已经取消了,或者p节点修改失败,就唤醒当前node节点的线程。搞不懂为啥要这么做,还没想明白?因为同步队列里面,可能还有有其他很多很多线程等待着锁呢,这里仅仅根据node的前置节点状态去判断,就要唤醒node线程,唤醒了又能怎么样呢?醒了后,发现自己不是head的next节点,还是要继续阻塞等待,还不如不唤醒。
总结:Condition所有的操作,就是为了实现Object的wait和notify的功能,实现线程间更优雅的通信。
吃饭去了。。。