16. 并发终结之CountDownLatch
CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。它与ReentrantLock一样也是基于AQS来实现的,但是不一样的地方是ReentrantLock是独占锁,而CountDownLatch是共享锁。
这里我们顺带分析一下AQS对共享锁的支持。
例子
用一个简单的小栗子,main线程和Await-thread1线程会调用await(),CountDown-thread1和CountDown-thread2会进行countDown()。看到结果只有在CountDown-thread2将state值减到0之后,两个await的线程会被唤醒。
CountDownLatch latch = new CountDownLatch(2);
new Thread(()->{
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "====countDown");
latch.countDown();
}, "CountDown-thread1").start();
new Thread(()->{
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "====await");
try {
latch.await();
System.out.println(Thread.currentThread().getName() + "====invoke");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Await-thread1").start();
new Thread(()->{
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "====countDown");
latch.countDown();
},"CountDown-thread2").start();
System.out.println(Thread.currentThread().getName() + "====await");
latch.await();
System.out.println(Thread.currentThread().getName() + "====invoke");
============
main====await
CountDown-thread1====countDown
Await-thread1====await
CountDown-thread2====countDown
main====invoke
Await-thread1====invoke
CountDownLatch源码分析
public class CountDownLatch {
//基于AQS的实现
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//初始化有几个latch,就给state设多少值
//state>0表示存在共享锁,那么调用await()的线程会被阻塞,
//需要别的线程调用countDown()方法来唤醒
//state=0表示没有共享锁
Sync(int count) {
setState(count);
}
//拿到共享锁的个数
int getCount() {
return getState();
}
//AQS实现尝试获取共享锁
// 如果state=0则表示无共享锁,则不需要等待;
// 如果state!=0表示有共享锁,需要等待;
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//AQS尝试释放共享锁
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
//如果state=0,表示锁已经全部释放,return false
if (c == 0)
return false;
int nextc = c-1;
//CAS设置state的值为当前值-1,然后如果减完之后等于0,表示释放共享锁成功。
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
/**
* 当前线程会被挂起,直到state被countDown到0或者当前线程被中断
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 当前线程会被挂起,直到state被countDown到0或者当前线程被中断或者等待时间到了
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//countDown释放一个锁,当state减为0表示没有共享锁时,则唤醒所有调用await()的线程
public void countDown() {
sync.releaseShared(1);
}
//返回state的值
public long getCount() {
return sync.getCount();
}
}
CountDownLatch源码里面主要看一下await()和countDown()方法,主要实现在AQS中。
await源码+AQS
//await()方法调用acquireSharedInterruptibly,主要是申请共享锁
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//首先检查当前线程是否被中断,如果中断,抛出InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
//调用CountDownLatch里面的tryAcquireShared,来判断共享锁状态;
// 如果state>0表示有共享锁,则这里返回<0
//就需要doAcquireSharedInterruptibly来一直判断state的个数,如果state>0,则挂起当前线程。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//申请共享锁
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//加入一个共享状态的节点到sync queue
//如果sync queue是空的,则new Node()作为head节点,head节点的后继节点是当前节点
//注意Node.SHARED节点会被加到nextWaiter属性里,
//根据nextWaiter来判断当前节点Node是共享节点还是独占节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//拿到当前节点的前继节点
final Node p = node.predecessor();
if (p == head) {
//如果前继节点是head,则再次判断state的是否等于0
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果state减到0,则将当前节点设为头结点,并传播给后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//如果前继节点不是head,或者state不是0,则要park当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//这里跟独占锁有区别,acquire()方法里面acquireQueued()只是setHead(node)
//而共享锁这边还要PROPAGATE传播给后继节点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 这里就是为什么countDownLatch支持一个或多个线程await(),直到countDown将state减到0
* 然后才会调用doReleaseShared来唤醒后继节点。
*
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//拿到当前节点的后继节点,然后判断后继节点是共享节点,
//则继续doReleaseShared来唤醒后继节点
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
//这个跟独占锁park当前线程代码一样
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
countDown源码+AQS
//释放共享锁
public final boolean releaseShared(int arg) {
//调用CountDownLatch的tryReleaseShared方法,来操作state的数量
if (tryReleaseShared(arg)) {
//当state数量减到0就开始调用doReleaseShared来释放共享锁
doReleaseShared();
return true;
}
return false;
}
//尝试释放锁
private void doReleaseShared() {
//注意这里跟await()方法类似都是死循环处理
for (;;) {
//拿到head节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果head节点是SIGNAL,则唤醒后继节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases设置不成功则一直循环
//如果head从SIGNAL更新到0成功,则唤醒后继节点
unparkSuccessor(h);
}
//如果head节点的状态不是SIGNAL,则尝试设置成PROPAGATE状态,然后一值循环
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果head节点被换了(就是被唤醒的线程更新了head),则继续循环。
//否则跳出循环结束唤醒;正常情况下,唤醒一个节点都会break掉,然后被唤醒的节点设置head,
//然后同时调用doReleaseShared()来释放下一个await的节点
if (h == head) // loop if head changed
break;
}
}
流程总结
在new CountDownLatch的时候会提供state的值,表示共享锁的数量。
- await流程
1.1 await是支持中断的,所以首先会判断当前线程是否中断。
1.2 tryAcquireShared()来判断state共享锁的个数,如果state>0表示有共享锁,则方法返回-1,就需要调用doAcquireSharedInterruptibly(),生成当前线程所属的node节点,加入sync queue,然后挂起当前线程,等待唤醒。
1.3 doAcquireSharedInterruptibly()就类似于独占锁的acquireQueued()方法,只是addWaiter()以及setHead()方法有区别:
1.3.1 独占锁:addWaiter(Node.EXCLUSIVE),新建node的nextWaiter=Node.EXCLUSIVE(是Null值),且在当前节点的前继节点是header并且tryAcquire()成功,则sethead(node)将当前节点设置为head节点。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
1.3.2 共享锁:addWaiter(Node.SHARED),新建node的nextWaiter=Node.SHARED(是final static的Node对象),这样判断是不是共享锁,就可以通过nextWaiter来判断。当前节点的前继节点是header并且tryAcquireShared()成功,这里不仅仅是setHead了,而是setHeadAndPropagate()。因为当countDownLatch的等待可能是一组线程,那么唤醒的时候需要将sync queue里的等待线程全部唤醒。
final Node node = addWaiter(Node.SHARED);
//根据nextWaiter判断是不是共享锁
final boolean isShared() {
return nextWaiter == SHARED;
}
//正常最后一个countDown释放共享锁之后,开始唤醒sync queue的等待线程,
//那么一个正常唤醒之后,会继续唤醒他后面的等待线程,而不是countDown来唤醒。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below,这里记录了原来的head节点
setHead(node);//这里将node节点设置为head节点
//1. propagate>0表示有后继节点需要传播唤醒
//2. 老的head节点为null
//3. 老的head节点waitStatus<0,SIGNAL或者PROPAGATE
//4. 新的head节点为null
//5. 新的head节点的waitStatus<0,SIGNAL或者PROPAGATE
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//唤醒后继节点,可能会造成虚假唤醒,
//如果是虚假唤醒,则为了保证release不会被中断,如果head的status是0,会设置成PROPAGATE
if (s == null || s.isShared())
doReleaseShared();
}
}
//唤醒的正常步骤就是拿到head节点,然后判断head节点的状态是不是SIGNAL,然后将head节点状态设为0,并unparkSuccessor,即head的后继节点
//但是这里注意的是如果head的状态是0,则会将head的状态设置为PROPAGATE,来保证release不会被中断(主要是并发release的问题)
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
- countDown(signal)流程(上面已经介绍的八九不离十了)
2.1 每次调用countDown都会进行tryReleaseShared,来减少共享锁的count,直到减到0,开始唤醒sync queue的等待线程。
2.2. 唤醒就是调用doReleaseShared(),这是一个for循环,来唤醒head节点的后继节点,如果存在虚假唤醒,唤醒的时候head不是SIGNAL状态,则会改成PROPAGATE状态,来保证唤醒的可能性。
总结一下head节点的状态:
1.在enq()方法里面初始化sync queue的时候,会new Node()来作为head,那么初始head的状态是0。
2.获取锁失败,那么需要挂起当前线程shouldParkAfterFailedAcquire()这个方法里面会将前继节点的状态从0或者PROPAGATE设置为SIGNAL。
3.在释放锁的时候会将head节点的状态变化根据锁类型不同而不同
3.1如果是释放独占锁,那么直接判断head的状态是不是!=0,不等于0就是SIGNAL,就唤醒后继节点。
3.2如果是释放共享锁,那么如果head的状态是SIGNAL,则将SIGNAL更新为0,成功则唤醒后继节点;如果head状态为0,则将0更新成PROPAGATE,并继续循环;
对于PROPAGATE在Semaphore里面更容易理解。
Reference: AbstractQueuedSynchronizer源码解读