面试精选

CountDownLatch原理

2020-12-07  本文已影响0人  Lnstark

上篇讲了ReentrantLock和AQS的独占模式,这篇讲一下CountDownLatch,他是利用AQS的共享模式实现的。

基本用法

CountDownLatch主要有2个方法: await()和countDown(),先创建一个CountDownLatch并指定count

CountDownLatch cdl = new CountDownLatch(1);

假如现在要让几个线程同时执行里面的某一段代码,可以让几个线程先执行CountDownLatch的await方法,然后在外面执行countDown方法。

    public static void main(String[] args) {
        CountDownLatch cdl = new CountDownLatch(1);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    cdl.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("thread " + Thread.currentThread().getName() + " start");
            }, i + "").start();
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        cdl.countDown();
    }

运行结果

thread 0 start
thread 2 start
thread 4 start
thread 1 start
thread 3 start

源码原理

类似ReentranLock, CountDownLatch 内部也有一个Sync类。
我们先看看await方法:

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 如果已中断,就抛出中断异常
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

他先尝试获取,如果没有拿到,就执行节点入队和park操作。
我们先来看看tryAcquireShared方法在CountDownLatch里的实现:

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

如果state为0返回1,否则返回-1,就是说state减到0的时候才可以拿到锁。
再来看doAcquireSharedInterruptibly方法:

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 入队一个shared节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            // 自旋
            for (;;) {
                final Node p = node.predecessor();
                // 如果是head的next节点,则去尝试去拿锁
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    // 如果拿到锁就唤醒head的后继节点
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 将前置节点置为signal,返回true的话接着执行线程park
                // 如果线程已被中断,则抛出中断异常
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            // 如果发生线程中断了,取消获取锁,这也说明了AQS是可以响应中断的
            if (failed)
                cancelAcquire(node);
        }
    }

这里的逻辑和上篇讲到的AQS独占锁的入队+竞争队列的逻辑基本类似,都是先入队再去自旋竞争锁,拿不到就挂起。下面再看看setHeadAndPropagate方法是怎么唤醒后继节点的:

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 设置当前节点为头结点
        setHead(node);
        
        // propagate == 1一般情况成立
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // s.isShared()一般成立
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

再看看doReleaseShared方法:

    private void doReleaseShared() {
        // 自旋
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 唤醒head的next节点
                    // next节点被唤醒之后,他在doAcquireSharedInterruptibly方法里
                    // 继续执行setHeadAndPropagate方法唤醒他的后继节点
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

以上就是CountDownLatch的await的大致逻辑了。他和独占模式最主要的区别就是:他拿到锁之后会唤醒后继节点,后继节点被唤醒了,继续去抢锁,抢到锁再唤醒他的后继节点,直到唤醒同步队列里所有的节点;而独占模式里节点抢到锁就直接返回了,释放锁是通过unlock()方法,后继节点被唤醒,然后抢到锁也就返回了,并不会唤醒后继节点。
下面看看countDown方法:

    public void countDown() {
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                // 如果已经是0,返回false
                if (c == 0)
                    return false;
                int nextc = c-1;
                // 如果减完之后为0且cas成功则返回true
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
     }

releaseShared里首先将state-1,如果state-1之后等于0,那就释放锁,返回true,否则返回false。tryReleaseShared方法里就是通过自旋+CAS的方式设置state,设置成功且state为0就返回true。
以上就是目前我对CountDownLatch的理解了。

上一篇下一篇

猜你喜欢

热点阅读