程序员小天地

Java中的CountDownLatch

2019-11-17  本文已影响0人  小草莓子桑

今天给大家介绍一个并发包中的线程工具CountDownLatch,有的人把它叫做闭锁,有的人把它叫做计数锁,我们先从他使用场景来说吧。

怎么让三个线程按顺序执行?

今天看到这么一篇文章,就从这个标题,引出主题吧。

第一时间想到了使用join
join方法

join是线程Thread类的方法,如果在T1线程中,使用T2线程调用了join方法,会阻塞T1线程执行,等T2线程执行完毕后,T1线程在执行。那么讲道理说,定义三个线程对象T1、T2、T3,在三个线程里面,分别调用别的线程对象的join方法,那么就可以实现三个线程按顺序执行。

举个join栗子

线上代码

public class Demo1 {

    public static void main(String[] args) {
        //定义T1线程
        Thread t1 = new Thread(new JoinRunner(null),"T1");
        //定义T2线程,T1线程执行完了,在执行线程T2
        Thread t2 = new Thread(new JoinRunner(t1),"T2");
        //定义T3线程,T2线程执行完了。在执行线程T3
        Thread t3 = new Thread(new JoinRunner(t2),"T3");
        t1.start();
        t2.start();
        t3.start();
    }

    static class JoinRunner implements Runnable {

        /**
         * 先要执行的线程
         */
        private Thread before;

        public JoinRunner(Thread before) {
            this.before = before;
        }

        @Override
        public void run() {
            if (before != null) {
                try {
                    //使用join方法,先排在前面的线程先执行完了,
                    // 在执行该线程
                    before.join();
                    System.out.println("目前执行的线程是:"
                            + Thread.currentThread().getId() + "====="
                            + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                System.out.println("目前执行的线程是:"
                        + Thread.currentThread().getId() + "====="
                        + Thread.currentThread().getName());            }
        }
    }

}

看看执行结果,是不是按照T1、T2、T3顺序执行的。


代码执行结果

刚讲了join实现线程顺序执行的功能,其实用CountDownLatch也可以实现

CountDownLatch简介

CountDownLatch有的人把它叫做闭锁,有的人把它叫做计数锁,它能够使一个线程等待其他线程执行完毕以后再执行。CountDownLatch是基于AbstractQueuedSynchronizer实现的,大致原理是,定义一个计数器state。在调用构造方法创建CountDownLatch对象时,通过入参初始化state,然后调用countDown方法,state会减1,当调用次数让state变为0,在唤醒被挂起在await上的线程。我是喜欢叫做计数锁,当state大于0时,说明没有获取到锁,当state变成0时,获取到锁,我是这么理解的。

举个CountDownLatch栗子
public class Demo2 {

    public static void main(String[] args) {

        //计数锁C1 state 为1
        CountDownLatch c1 = new CountDownLatch(1);
        //计数锁C2 state 为1
        CountDownLatch c2 = new CountDownLatch(1);
        //计数锁C3 state 为1
        CountDownLatch c3 = new CountDownLatch(1);

        //T1线程中 C2 调用await挂起
        //然后让C1 调用countDown减一
        Thread t1 = new Thread(new CountDownLatchRunner(c2, c1),"T1");

        //T2线程中 C3 调用await挂起
        //然后让C2 调用countDown减一
        Thread t2 = new Thread(new CountDownLatchRunner(c3, c2),"T2");

        //T3线程中 C3 调用await挂起
        //然后让C3 调用countDown减一
        Thread t3 = new Thread(new CountDownLatchRunner(c3, c3),"T3");

        //目前这三个线程中的定义中
        // T1线程的计数锁C1的state变成0,
        // 才能执行 被 C2挂起的线程T2
        // T2线程的计数锁 C2的state变成0,
        // 才能执行线程T3
        t1.start();
        t2.start();
        t3.start();
        
        //先线程T1的计数锁state变成0,开始顺序执行
        c1.countDown();
    }

    static class CountDownLatchRunner implements Runnable {

        /**
         * 定义计数器
         */
        CountDownLatch countDownLatch1;
        CountDownLatch countDownLatch2;

        public CountDownLatchRunner(CountDownLatch c1,CountDownLatch c2) {
            countDownLatch1 = c1;
            countDownLatch2 = c2;
        }

        @Override
        public void run() {
            try {
                //先挂起排在后面的线程
                countDownLatch2.await();
                System.out.println("目前执行的线程是:"
                        + Thread.currentThread().getId() + "====="
                        + Thread.currentThread().getName());
                //让本线程计数锁 state减1
                countDownLatch1.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

看看执行结果,是不是按照T1、T2、T3顺序执行的。


执行结果
CountDownLatch关键方法

看到了CountDownLatch使用方法,我们来看看使用中的关键方法

构造方法
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

从构造方法上,创建了一个Sync对象,来看看Sync对象。

Sync类
/**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

Sync是继承了AbstractQueuedSynchronizer的,有时间在来讲AbstractQueuedSynchronizer。

await方式
public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

await方法的作用是,挂起线程,看到他其实是调用了Sync的acquireSharedInterruptibly方法,简单的说,该方法的主要作用是让获取不到锁的线程挂起。详细的分析,我们放到AbstractQueuedSynchronizer的文章中专门说。

countDown方式
public void countDown() {
        sync.releaseShared(1);
    }

countDown方法的作用是state减一,也是调用了Sync的releaseShared方法实现。

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

看代码大致可以看出来,tryReleaseShared是进行state减一,如果减到0了,就返回true,然后,在调用doReleaseShared方法来唤醒挂起在await的线程
tryReleaseShared还是个抽象方法

tryReleaseShared方法
来看一眼CountDownLatch中的实现
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

看来和字面意思差不多,getState()是返回成员变量state的值,看下state

/**
     * The synchronization state.
     */
    private volatile int state;

是一个用volatile修饰的int字段。
再来看doReleaseShared方法

private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

稍有点复杂,doReleaseShared是在AbstractQueuedSynchronizer中实现的,我们放到AbstractQueuedSynchronizer的文章中专门说,大家大致明白他的作用,就是唤醒挂起的线程。
关键方法就为大家分析到这里。

CountDownLatch的使用场景

CountDownLatch是一种同步结构,和我们之前讲过的ReentrantLock很像,他们都是被用来解决线程之间的调度、交互问题的,列一些自己用过的场景和别人分享的场景给大家吧。

CountDownLatch就为大家讲到这,源码涉及到AbstractQueuedSynchronizer的,我们留到以后,专门写一个AbstractQueuedSynchronizer的文章,来统一说吧。欢迎大家来交流,指出文中一些说错的地方,让我加深认识,愿大家没有bug,谢谢!

上一篇下一篇

猜你喜欢

热点阅读