JUC并发相关

16. 并发终结之CountDownLatch

2020-09-27  本文已影响0人  涣涣虚心0215

CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。它与ReentrantLock一样也是基于AQS来实现的,但是不一样的地方是ReentrantLock是独占锁,而CountDownLatch是共享锁。
这里我们顺带分析一下AQS对共享锁的支持。

例子

用一个简单的小栗子,main线程和Await-thread1线程会调用await(),CountDown-thread1和CountDown-thread2会进行countDown()。看到结果只有在CountDown-thread2将state值减到0之后,两个await的线程会被唤醒。

CountDownLatch latch = new CountDownLatch(2);
new Thread(()->{
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(Thread.currentThread().getName() + "====countDown");
    latch.countDown();
}, "CountDown-thread1").start();
new Thread(()->{
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(Thread.currentThread().getName() + "====await");
    try {
        latch.await();
        System.out.println(Thread.currentThread().getName() + "====invoke");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
},"Await-thread1").start();
new Thread(()->{
    try {
        Thread.sleep(20000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(Thread.currentThread().getName() + "====countDown");
    latch.countDown();
},"CountDown-thread2").start();
System.out.println(Thread.currentThread().getName() + "====await");
latch.await();
System.out.println(Thread.currentThread().getName() + "====invoke");
============
main====await
CountDown-thread1====countDown
Await-thread1====await
CountDown-thread2====countDown
main====invoke
Await-thread1====invoke

CountDownLatch源码分析

public class CountDownLatch {
    //基于AQS的实现
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        //初始化有几个latch,就给state设多少值
        //state>0表示存在共享锁,那么调用await()的线程会被阻塞,
        //需要别的线程调用countDown()方法来唤醒
        //state=0表示没有共享锁
        Sync(int count) {
            setState(count);
        }
        //拿到共享锁的个数
        int getCount() {
            return getState();
        }
        //AQS实现尝试获取共享锁
        // 如果state=0则表示无共享锁,则不需要等待;
        // 如果state!=0表示有共享锁,需要等待;
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        //AQS尝试释放共享锁
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                //如果state=0,表示锁已经全部释放,return false
                if (c == 0)
                    return false;
                int nextc = c-1;
                //CAS设置state的值为当前值-1,然后如果减完之后等于0,表示释放共享锁成功。
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    /**
     * 当前线程会被挂起,直到state被countDown到0或者当前线程被中断
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    /**
     * 当前线程会被挂起,直到state被countDown到0或者当前线程被中断或者等待时间到了
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    //countDown释放一个锁,当state减为0表示没有共享锁时,则唤醒所有调用await()的线程
    public void countDown() {
        sync.releaseShared(1);
    }
    //返回state的值
    public long getCount() {
        return sync.getCount();
    }
}

CountDownLatch源码里面主要看一下await()和countDown()方法,主要实现在AQS中。

await源码+AQS

//await()方法调用acquireSharedInterruptibly,主要是申请共享锁
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    //首先检查当前线程是否被中断,如果中断,抛出InterruptedException
    if (Thread.interrupted())
        throw new InterruptedException();
    //调用CountDownLatch里面的tryAcquireShared,来判断共享锁状态;
    // 如果state>0表示有共享锁,则这里返回<0
    //就需要doAcquireSharedInterruptibly来一直判断state的个数,如果state>0,则挂起当前线程。
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
//申请共享锁
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //加入一个共享状态的节点到sync queue
    //如果sync queue是空的,则new Node()作为head节点,head节点的后继节点是当前节点
    //注意Node.SHARED节点会被加到nextWaiter属性里,
    //根据nextWaiter来判断当前节点Node是共享节点还是独占节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            //拿到当前节点的前继节点
            final Node p = node.predecessor();
            if (p == head) {
                //如果前继节点是head,则再次判断state的是否等于0
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                     //如果state减到0,则将当前节点设为头结点,并传播给后继节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //如果前继节点不是head,或者state不是0,则要park当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
//这里跟独占锁有区别,acquire()方法里面acquireQueued()只是setHead(node)
//而共享锁这边还要PROPAGATE传播给后继节点
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * 这里就是为什么countDownLatch支持一个或多个线程await(),直到countDown将state减到0
     * 然后才会调用doReleaseShared来唤醒后继节点。
     * 
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        //拿到当前节点的后继节点,然后判断后继节点是共享节点,
        //则继续doReleaseShared来唤醒后继节点
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
//这个跟独占锁park当前线程代码一样
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

countDown源码+AQS

//释放共享锁
public final boolean releaseShared(int arg) {
    //调用CountDownLatch的tryReleaseShared方法,来操作state的数量
    if (tryReleaseShared(arg)) {
        //当state数量减到0就开始调用doReleaseShared来释放共享锁
        doReleaseShared();
        return true;
    }
    return false;
}
//尝试释放锁
private void doReleaseShared() {
    //注意这里跟await()方法类似都是死循环处理
    for (;;) {
        //拿到head节点
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //如果head节点是SIGNAL,则唤醒后继节点
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases设置不成功则一直循环
                //如果head从SIGNAL更新到0成功,则唤醒后继节点
                unparkSuccessor(h);
            }
            //如果head节点的状态不是SIGNAL,则尝试设置成PROPAGATE状态,然后一值循环
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        //如果head节点被换了(就是被唤醒的线程更新了head),则继续循环。
        //否则跳出循环结束唤醒;正常情况下,唤醒一个节点都会break掉,然后被唤醒的节点设置head,
        //然后同时调用doReleaseShared()来释放下一个await的节点
        if (h == head)                   // loop if head changed
            break;
    }
}

流程总结

在new CountDownLatch的时候会提供state的值,表示共享锁的数量。

  1. await流程
    1.1 await是支持中断的,所以首先会判断当前线程是否中断。
    1.2 tryAcquireShared()来判断state共享锁的个数,如果state>0表示有共享锁,则方法返回-1,就需要调用doAcquireSharedInterruptibly(),生成当前线程所属的node节点,加入sync queue,然后挂起当前线程,等待唤醒。
    1.3 doAcquireSharedInterruptibly()就类似于独占锁的acquireQueued()方法,只是addWaiter()以及setHead()方法有区别:
    1.3.1 独占锁:addWaiter(Node.EXCLUSIVE),新建node的nextWaiter=Node.EXCLUSIVE(是Null值),且在当前节点的前继节点是header并且tryAcquire()成功,则sethead(node)将当前节点设置为head节点。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

1.3.2 共享锁:addWaiter(Node.SHARED),新建node的nextWaiter=Node.SHARED(是final static的Node对象),这样判断是不是共享锁,就可以通过nextWaiter来判断。当前节点的前继节点是header并且tryAcquireShared()成功,这里不仅仅是setHead了,而是setHeadAndPropagate()。因为当countDownLatch的等待可能是一组线程,那么唤醒的时候需要将sync queue里的等待线程全部唤醒。

final Node node = addWaiter(Node.SHARED);
//根据nextWaiter判断是不是共享锁
final boolean isShared() {
  return nextWaiter == SHARED;
}
//正常最后一个countDown释放共享锁之后,开始唤醒sync queue的等待线程,
//那么一个正常唤醒之后,会继续唤醒他后面的等待线程,而不是countDown来唤醒。
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below,这里记录了原来的head节点
    setHead(node);//这里将node节点设置为head节点
        //1. propagate>0表示有后继节点需要传播唤醒
        //2. 老的head节点为null
        //3. 老的head节点waitStatus<0,SIGNAL或者PROPAGATE
        //4. 新的head节点为null
        //5. 新的head节点的waitStatus<0,SIGNAL或者PROPAGATE
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
                //唤醒后继节点,可能会造成虚假唤醒,
                //如果是虚假唤醒,则为了保证release不会被中断,如果head的status是0,会设置成PROPAGATE
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
//唤醒的正常步骤就是拿到head节点,然后判断head节点的状态是不是SIGNAL,然后将head节点状态设为0,并unparkSuccessor,即head的后继节点
//但是这里注意的是如果head的状态是0,则会将head的状态设置为PROPAGATE,来保证release不会被中断(主要是并发release的问题)
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
  1. countDown(signal)流程(上面已经介绍的八九不离十了)
    2.1 每次调用countDown都会进行tryReleaseShared,来减少共享锁的count,直到减到0,开始唤醒sync queue的等待线程。
    2.2. 唤醒就是调用doReleaseShared(),这是一个for循环,来唤醒head节点的后继节点,如果存在虚假唤醒,唤醒的时候head不是SIGNAL状态,则会改成PROPAGATE状态,来保证唤醒的可能性。

总结一下head节点的状态:

1.在enq()方法里面初始化sync queue的时候,会new Node()来作为head,那么初始head的状态是0
2.获取锁失败,那么需要挂起当前线程shouldParkAfterFailedAcquire()这个方法里面会将前继节点的状态从0或者PROPAGATE设置为SIGNAL
3.在释放锁的时候会将head节点的状态变化根据锁类型不同而不同
3.1如果是释放独占锁,那么直接判断head的状态是不是!=0,不等于0就是SIGNAL,就唤醒后继节点。
3.2如果是释放共享锁,那么如果head的状态是SIGNAL,则将SIGNAL更新为0,成功则唤醒后继节点;如果head状态为0,则将0更新成PROPAGATE,并继续循环;
对于PROPAGATE在Semaphore里面更容易理解。

Reference: AbstractQueuedSynchronizer源码解读

上一篇 下一篇

猜你喜欢

热点阅读