JUC学习笔记之CountDownLatch源码初步理解

2019-02-24  本文已影响0人  Moine0828

注:文中代码的解释基本上都以注释的形式和代码写在一起

CountDownLatch是并发环境中常用的计数组件,也是基于AQS实现的。主要的方法有两个,countDown和await,实现了AQS模板方法的tryReleaseShared方法来完成countDown计数减的过程,实现了AQS模板方法的tryAcquireShared方法来实现await阻塞等待功能。

countDown方法

countDown方法源码如下,直接调用了内部类sync的releaseShared方法来实现,这里的Sync和ReentrantLock的内部类Sync一样,是继承了AQS的内部类,releaseShared方法正是AQS提供的共享模式的模板方法。

public void countDown() {
   //直接调用了AQS的releaseShared 
   sync.releaseShared(1); 
}

AQS的releaseShared方法源码如下

public final boolean releaseShared(int arg) {
        //tryReleaseShared方法AQS留给子类自己实现
        //尝试将status减去arg,如果返回为true,执行doRelease方法,否则返回
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

releaseShared方法中调用了CountDownLatch中实现的tryReleaseShared方法,源码如下

protected boolean tryReleaseShared(int releases) {
            //通过源码我们发现传入的参数releases并没有什么用,每次计数固定减一
            // 无限循环直到值减一成功或者status变成0
            for (;;) {
                //获取status的值,CountDownlatch中status的值代表要等待的总计数
                int c = getState();
                //如果已经是0了说明已经不能再减计数了,返回false
                if (c == 0)
                    return false;
                int nextc = c-1;
                //CAS的方式将status减一
                if (compareAndSetState(c, nextc))
                    如果当前减完之后,status是0,也就意味着计数结束了,返回true
                    return nextc == 0;
            }
        }

tryReleaseShared方法返回为true,也就是计数结束时,会接着执行doReleaseShared方法。doReleaseShared方法在CountDownLatch中没有重写,直接调用的是AQS的doReleaseShared方法,源码如下,其中unparkSuccessor源码解析见另一篇博客《AQS源码解析》

private void doReleaseShared() {
    //无限循环
    for (;;) {
        //获取头节点
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //如果头节点设置的是要唤醒下一个节点的等待状态
            if (ws == Node.SIGNAL) {
                //将节点的waitStatus设置成0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                   continue;            // loop to recheck cases
                   //如果设置成功,唤醒后面的等待节点
                   unparkSuccessor(h);
            }
            //ws==0说明第一步设置成功或者原先就是0
            //将其状态设置为PROPAGATE
            //失败(PROPAGATE状态表示同步状态将会无条件传播,意思就是节点可运行)
            else if (ws == 0 &&
                 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //如果h还是头节点,就结束循环
            if (h == head)                   // loop if head changed
                break;
     }
 }

await方法

await方法直接调用了内部类Sync的acquireSharedInterruptibly方法,阻塞线程直到count为0,当前线程才能拿到锁(或者抛出异常也有可能结束阻塞)。源码如下:

public void await() throws InterruptedException {
        //直接调用了内部类Sync的方法(其实是AQS的方法)
        sync.acquireSharedInterruptibly(1);
    }

acquireSharedInterruptibly方法,如果线程被中断过就抛出异常结束阻塞,不然就判断计数的值,为0就返回,等于当前阻塞的线程获得了锁,如果计数不为0,进入doAcquireSharedInterruptibly方法进行排队等待。源码如下:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //如果线程被中断过,抛出异常,线程不再等待 
        if (Thread.interrupted())
            throw new InterruptedException();
        //tryAcquireShared尝试获取共享状态
        //tryAcquireShared源码见下方,实际就是获取计数
        //计数不为0则执行doAcquireSharedInterruptibly方法
        //计数为0则返回值大于0,方法直接返回,线程阻塞结束。等于线程获得了锁
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

doAcquireSharedInterruptibly方法只有在线程阻塞时会被调用,也就是计数不为0时被调用。方法将当前线程构造为一个共享模式的等待节点加入等待队列中,然后开启无限自循环,直到计数等于0获取到锁,或者抛出异常为止。源码如下:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //新建一个共享模式节点加入等待队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //获取新建节点的前一个节点
                final Node p = node.predecessor();
                //如果前一个就是头节点
                if (p == head) {
                    //说明当前线程是第一个等待的,尝试获取锁
                    //也就是获取计数
                    int r = tryAcquireShared(arg);
                    //r不小于0说明计数已经是0了,等于当前线程已经获得了锁
                    if (r >= 0) {
                        //将当前线程的节点设置成头节点
                        setHeadAndPropagate(node, r);
                        //原先的头节点p从队列中解除,便于垃圾回收
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //shouldParkAfterFailedAcquire检查如果获取锁失败当前节点是否需要挂起
                //只有当前一个节点的waitStatus是SIGNAL也就是说前一个节点
                //获得锁以后会把自己唤醒,当前节点才能放心挂起
                //parkAndCheckInterrupt判断节点是否被中断过
                //这里意思是如果当前节点是在被挂起状态而且被中断过就抛出异常
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            //如果failed为true说明线程被中断,没有获得锁
            //所以取消获取锁的动作
            if (failed)
                cancelAcquire(node);
        }
    }

总结来说就是await方法会让当前线程阻塞,进入方法后先判断一次计数是否为0,如果是0则直接返回,线程获得锁,阻塞结束。如果不为0则进入doAcquireSharedInterruptibly方法,将当前线程构造成了一个等待节点,开启无限循环,线程被阻塞。无限循环直到当前线程的节点排队排到了头节点的后面,就可以尝试获得锁了,如果成功了就可以返回,阻塞结束。在排队的过程中如果线程被中断那么就抛出异常。简而言之阻塞是由无限的for循环造成的,所以结束循环就是线程结束阻塞的关键了。

提问:await方法支持多个线程一起等待吗
回答:支持的。从源码角度看,await方法并没有做任何的同步控制,多个线程等待和一个线程等待,结果没有什么不同,所有等待的线程都会阻塞到status为0,阻塞过程中这些线程就乖乖的在队列里等待。

上一篇下一篇

猜你喜欢

热点阅读