并发与多线程源码专题工作生活

从源码看JDK8并发工具类CountDownLatch的实现原理

2019-07-01  本文已影响0人  先生zeng

CountDownLatch,是几个重要的并发编程工具类之一,字面意思就是门锁的意思,内部会维护一个计数器的常量,这个常量代表执行的线程数。

在多线程协作完成业务功能时,有时候需要等待其他多个线程完成任务之后,主线程才能继续往下执行业务功能,在这种的业务场景下,通常可以使用Thread类的join方法,让主线程等待被join的线程执行完之后,主线程才能继续往下执行。当然,使用线程间消息通信机制也可以完成。其实,java并发工具类中为我们提供了类似“倒计时”(CountDownLatch)这样的工具类,可以十分方便的完成所说的这种业务场景。

CountDownLatch允许一个或多个线程等待其他线程完成操作,调用await()方法的线程回去判断count的值来判断是否会被挂起,它会等待直到count值为0才会继续执行。控制台输出count=0最后输出,这个时候就看cpu切换到哪个线程上执行了,在初始化的时候我们会设置好count的值,当每调用一次countDown()方法,会使count的值减一也就是将AQS维护同步状态的state值减一。

在我们阅读源码之前,如果你看过AQS源码(https://www.jianshu.com/p/e0066f9349cd)与跟可重入锁(https://www.jianshu.com/p/5d57573b09f5)相关的内容,你会更加对CountDownLatch本身是如何实现的以及他的本质有一个更透彻的理解。

说说我的理解之前看看他的类

可以看到他同样运用了一个继承了AQS同步器的静态内部类来重写父类AQS里面的一些方法然后再调用该父类里面的获取锁的方法来实现具体的功能。

来看看构造方法中:

//设置初始化count的值,并传递给Sync类
public CountDownLatch(int count) {
       if (count < 0) throw new IllegalArgumentException("count < 0");
       this.sync = new Sync(count);
   }
Sync类的源码如下:CountDownLatch的实现依赖于AQS

先介绍下两个方法 countDown()每执行一次该方法,也就是将由AQS维护的同步状态值state值减1,其一般是执行任务的线程调用。
调用countDown()释放同步状态,每次调用同步状态值-1。

 public void countDown() {
        sync.releaseShared(1);
    }

//父类AQS中
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {   //如果释放同步状态线程成功,如果返回false,则表示,获取失败同步状态。
  //返回flase,以CountDownLatch的实现角度来讲,此时还要等待N(N>0)个线程,因为state还没减到等于0,如果返回true,表示此时已经执行N次了,此时state已经减到0了,这时候会执行doReleaseShared(),表示释放其他处于等待的节点。
            doReleaseShared();   //唤醒后续处于等待的节点,看下面具体的解释。
            return true;
        }
        return false;
    }

//在CountDownLatch的静态内部工具类Sync继承了AQS重写的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
            // 自旋
            for (;;) {
                int c = getState();  //获取AQS维护的state值
                if (c == 0)   //如果为0,表示没有一个线程在运行返回false
                    return false;
                int nextc = c-1;     //如果不等于0,这里肯定会>0的,所以减去1
                if (compareAndSetState(c, nextc))  //CAS去直接修改内存地址的偏移量去修改值,保证线程安全。
                 return nextc == 0;       //重点来了。这里的意思是如果共享式获取同步状态后,state还不是为0,则获取失败。返回false
            }
        }

下面这个类,在我的这篇文章也解析过了。


await(),当执行该方法是,内部会检查那个计数常量的值,如果不等于0,就会进入等待(waiting)状态,直到执行了countDown使内部的值减到0的时候,就会恢复线程,同时执行,我们来看看实现:

public void await() throws InterruptedException {
        //共享式获取
        sync.acquireSharedInterruptibly(1);
    }

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())  //响应中断
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)   
      //tryAcquireShared(arg) 返回1,此时state=0不阻塞,返回的是-1,执行doAcquireSharedInterruptibly(arg);
            doAcquireSharedInterruptibly(arg);
    }

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;   
        }

这里需要解释下doAcquireSharedInterruptibly的主要作用:1、将当前线程构造成共享模式的节点,通过自旋的方式尝试获取同步状态2、如果获取同步状态成功,则唤醒后续处于共享模式的节点;如果没有获取到同步状态,则对调用shouldParkAfterFailedAcquire(Node, Node)和parkAndCheckInterrupt()方法挂起当前线程,这样可以避免该线程无限循环而获取不到共享锁,从而造成资源浪费。这里需要注意的是:当有多个线程调用await()方法时,这些线程都会通过addWaiter(Node.SHARED)方法被构造成节点加入到等待队列中。当最后一个调用countDown()方法的线程执行了countDown()后(这里有点拗口),会唤醒处于等待队列中距离头节点最近的一个节点,也就是说该线程被唤醒之后会继续自旋尝试获取同步状态,此时执行到tryAcquireShared(int)方法时,发现r大于0(因为state已经被置为0了),该线程就会调用setHeadAndPropagate(Node, int)方法将唤醒传递下去,并且退出当前循环,开始执行awat()方法之后的代码。


然后说说CountDownLatch的两种用法:

1.可以设置new CountDownLatch(1); 如果需要控制多个线程同时开始执行的时候,可以每个线程刚开始执行run的时候,先执行await,
进入等待状态。当最后所有线程都准备好了,就调用countDown,减一,这时所有线程就会主动同时开始执行。
2.假设可以设置new CountDownLatch(10),这时有10个线程,我们需要做的是等10个线程,依次执行countDown(),等到所有线程都执行好了,这时候再执行await。所有线程都准备就绪了。

await(long timeout,TimeUtil unit)
作用使线程在指定的最大时间内,处于await状态,超过这个时间就会自动唤醒了。
getCount()
能够获取当前计数的值。

下面举一个实现的例子:

默认10个运动员进行跑步比赛的全过程:

public class MyThread extends Thread{

    /**等待运动员到来*/
    private CountDownLatch comingTag;
    /**等待裁判说开始*/
    private CountDownLatch waitTag;
    /** 等待起跑*/
    private CountDownLatch waitRunTag;
    /**起跑*/
    private CountDownLatch beginTag;
    /** 所有运动员道终点*/
    private CountDownLatch endTag;

    public MyThread(CountDownLatch comingTag, CountDownLatch waitTag, CountDownLatch waitRunTag, CountDownLatch beginTag, CountDownLatch endTag) {
        super();
        this.comingTag = comingTag;
        this.waitTag = waitTag;
        this.waitRunTag = waitRunTag;
        this.beginTag = beginTag;
        this.endTag = endTag;
    }

    @Override
    public void run() {
        try {
            System.out.println("运动员正陆续入场");
            Thread.sleep((int)Math.random()*10000);
            System.out.println(Thread.currentThread().getName()+"到起跑点了");
            comingTag.countDown();
            System.out.println("等待裁判说准备");
            waitTag.await();
            System.out.println("准备。。。。。开始");
            waitRunTag.countDown();
            beginTag.await();
            System.out.println(Thread.currentThread().getName()+"开始跑,并且跑步过程不确定");
            Thread.sleep((int)Math.random()*10000);
            endTag.countDown();
            System.out.println(Thread.currentThread().getName()+"到达终点");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

测试类:

public class Run {

    public static void main(String[] args) {


        CountDownLatch comingTag = new CountDownLatch(10);
        CountDownLatch waitTag=new CountDownLatch(1);
        CountDownLatch waitRunTag = new CountDownLatch(10);
        CountDownLatch beginTag=new CountDownLatch(1);
        CountDownLatch endTag = new CountDownLatch(10);

        MyThread[] threads=new MyThread[10];

        for(int i=0;i<threads.length;i++){
            threads[i]=new MyThread(comingTag,waitTag,waitRunTag,beginTag,endTag);
            threads[i].setName("运动员"+(i+1));
            threads[i].start();
        }

        try {
            System.out.println("裁判正在等待选手的到来。。。。");
            comingTag.await();
            System.out.println("所有的运动员都到齐了,准备开始,各就位。。。。预备");
            Thread.sleep(5000);
            waitTag.countDown();
            System.out.println("各就各位。。。。");
            waitRunTag.await();
            Thread.sleep(2000);
            System.out.println("命令枪,开!!!");
            beginTag.countDown();
            endTag.await();
            System.out.println("所有运动员都到得终点了。。。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
image.png
image.png

一般来说,都会把CountDownLatch与CyclicBarrier进行比较?

CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成,再携手共进。
调用CountDownLatch的countDown方法后,当前线程并不会阻塞,会继续往下执行;而调用CyclicBarrier的await方法,会阻塞当前线程,直到CyclicBarrier指定的线程全部都到达了指定点的时候,才能继续往下执行;
CountDownLatch方法比较少,操作比较简单,而CyclicBarrier提供的方法更多,比如能够通过getNumberWaiting(),isBroken()这些方法获取当前多个线程的状态,并且CyclicBarrier的构造方法可以传入barrierAction,指定当所有线程都到达时执行的业务功能;
CountDownLatch是不能复用的,而CyclicBarrier是可以复用的。就是说,当CountDownLatch执行countDown时如果此时countDown执行的state的值减到0了,这时候再调用,不能循环执行了,而CyclicBarrier是可以的,可以看一下这篇文章:
https://www.jianshu.com/p/ff6c2ef5e8c2

整理不易,喜欢可以关注我

上一篇下一篇

猜你喜欢

热点阅读