Java并发编程

JUC并发容器之CountDownLatch源码分析

2020-05-14  本文已影响0人  Java技术天地

原文出处:https://www.zzwzdx.cn

CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。它是通过一个计数器来实现的,当我们在new 一个CountDownLatch对象的时候需要带入该计数器值,该值就表示了线程的数量。每当一个线程完成自己的任务后,计数器的值就会减1。当计数器的值变为0时,就表示所有的线程均已经完成了任务,然后就可以恢复等待的线程继续执行了。

我们先用一个简单的实例来了解下CountDownLatch的使用,实例如下:

public class CountDownLatchDemo {
    private static int LATCH_SIZE = 5;
    private static CountDownLatch doneSignal;
    public static void main(String[] args) {
        try {
            doneSignal = new CountDownLatch(LATCH_SIZE);
            // 新建5个任务
            for(int i=0; i<LATCH_SIZE; i++)
                new Task().start();
     
            System.out.println("main await begin.");
            // "主线程"等待线程池中5个任务的完成
            doneSignal.await();
     
            System.out.println("main await finished.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    static class Task extends Thread{
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");
                // 将CountDownLatch的数值减1
                doneSignal.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

说明:创建等待线程数为5,当主线程Main运行到doneSignal.wait()时会阻塞当前线程,直到另外5个线程执行完成之后主线程才会继续执行。

构造函数

构造函数设置锁标识state的值,CountDownLatch countDownLatch = new CountDownLatch(5) 实现的操作是设置锁标识state的值为5,其构建函数的源码如下:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

sync为CountDownLatch的一个内部类,其定义如下:

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
    Sync(int count) {
        setState(count);
    }
    //获取同步状态
    int getCount() {
        return getState();
    }
    //获取同步状态
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    //释放同步状态
    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

通过这个内部类Sync我们可以清楚地看到CountDownLatch是采用共享锁来实现的。

await()

CountDownLatch中await的逻辑是如果state的值不等于0,表示还有其他线程没有执行完(其他线程执行完之后会将state减一操作),此时主线程处于阻塞状态,其定义如下:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

这里acquireSharedInterruptibly会进行state状态判断

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) //tryAcquireShared函数用来判断state的值是否等于0
        doAcquireSharedInterruptibly(arg);
}

tryAcquireShared中的操作是判断锁标识位state是否等于0,如果不等于0,则调用doAcquireSharedInterruptibly函数,阻塞线程。内部类Sync重写了tryAcquireShared(int args)方法,其源码如下:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;  //判断锁标识位state是否等于0,在构造函数时会给state赋值
}

doAcquireSharedInterruptibly(arg)操作的判断是将当前线程放到FIFO队列中,并将线程阻塞。

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //将线程添加到FIFO队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //parkAndCheckInterrupt完成线程的阻塞操作
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

countDown()

CountDownLatch中countDown()操作是将锁标识位state进行减一操作,如果state此时减一之后为0时则唤起被阻塞线程。

public void countDown() {
    sync.releaseShared(1); //将state值进行减一操作
}
//releaseShared中完成的操作是将锁标识位state进行减一操作,如果state此时减一之后为0时则唤起被阻塞线程
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//将锁标识位state进行减arg操作
        doReleaseShared();//唤起阻塞线程操作
        return true;
    }
    return false;
}

在tryReleaseShared中会完成state的减值操作。

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        //获取state值
        int c = getState();
        if (c == 0)
            return false;
        //进行减一操作
        int nextc = c-1;
        //cas操作完成state值的修改
        if (compareAndSetState(c, nextc))
            //如果nextc等于0则返回
            return nextc == 0;
    }
}

doReleaseShared完成阻塞线程的唤起操作

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                //完成阻塞线程的唤起操作
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

总结

CountDownLatch内部通过共享锁实现。在创建CountDownLatch实例时,需要传递一个int型的参数:count,该参数为计数器的初始值,也可以理解为该共享锁可以获取的总次数。当某个线程调用await()方法,程序首先判断count的值是否为0,如果不会0的话则会一直等待直到为0为止。当其他线程调用countDown()方法时,则执行释放共享锁状态,使count值 – 1。当在创建CountDownLatch时初始化的count参数,必须要有count线程调用countDown方法才会使计数器count等于0,锁才会释放,前面等待的线程才会继续运行。注意CountDownLatch不能回滚重置。


关注下面公众号,回复 1024 领取最新大厂面试资料

image
上一篇下一篇

猜你喜欢

热点阅读