CountDownLatch源码分析

2019-03-12  本文已影响0人  RealityVibe

所需知识储备:队列、AQS、自旋锁、线程池等。

如果对AbstractQueeuedSynchronizer(AQS)有所了解的话,CountDownLatch的源码学习并不复杂。如果对AQS不了解的话,CAS和AQS或许可以帮助您的理解。

CountDownLatch的示例

public class CountDownDemo {
    
    private static class CustomThread implements Runnable {
        @Override
        public void run() {
            System.out.println("do sth.");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        int size = 10;
        final CountDownLatch startLatch = new CountDownLatch(1);
        final CountDownLatch stopLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 线程等待开始的Latch
                        startLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    try {
                        new CustomThread().run();
                    } finally {
                        // 调用结束的Latch的countDown
                        stopLatch.countDown();
                    }
                }
            });
        }
        long start = System.currentTimeMillis();
        System.out.println("------ time frame ------");
        // 线程
        startLatch.countDown();
        stopLatch.await();
        // 等待所有线程执行完任务
        System.out.println("All thread is finished");
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
    }
}

CountDownLatch通过计数器的方式来工作,当调用countDown()函数,减完所设定的size时就会释放锁。

计数器工作机制

CountDownLatch源码解析

下图为该类的类结构,Sync类继承AQS,负责锁资源的同步。

类结构和Sync类

Sync类

由上图可见,Sync的函数比较简单,主要是调用父类的一些方法进行state的同步操作。

await():void函数

分析await():void的调用链可以核心代码是调用AQS中的doAcquireSharedInterruptibly(int arg),代码和注释如下:

    /**
     * 以**共享/可中断模式**请求锁资源.
     * @param 请求的资源数
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        /**
         * 1. 定义一个Shared模式的节点并加入等待队列
         * addWaiter中初始化节点Node node = new Node(Thread.currentThread(), mode);
         */
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        
        // 2. 尝试自旋获取锁
        try {
            for (;;) {
                final Node p = node.predecessor();
                // 2.1 在队列首位,尝试获取锁
                if (p == head) {
                    /**
                     * 返回值r:-1:失败 
                     * 0:成功,但是队列后续节点无法unpark
                     * >0:成功,继续通知后续节点
                     */
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 2.2 判断获取失败后是否应该挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
       }
    }

await(long timeout, TimeUnit unit)

对比上一个await函数,在获取锁时增加了对限定时间的判断,如果已经超时则会返回获取失败。

await函数对比

参考

什么时候使用CountDownLatch

上一篇 下一篇

猜你喜欢

热点阅读