并发

AQS之CountDownLatch和Semaphore源码分析

2019-12-20  本文已影响0人  loveFXX

CountDownLatch

示例代码

public class CountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(5 );
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i <5 ; i++) {
            final  int a=i;
            service.execute( new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep( 2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(a);
                    latch.countDown();
                }
            } );
        }
//      latch.await();
        latch.await(1, TimeUnit.SECONDS);
        System.out.println("1111111111111111");

    }
}

CountDownLatch

CountDownLatch#CountDownLatch
创建CountDownLatch类,把传入参数创建AQS的实现类Sync


image.png
Sync

CountDownLatch.Sync#Sync
创建Sync类,调用setState(count)方法。把状态值state改为CountDownLatch的传入参数


image.png

countDown

执行countDown方法
CountDownLatch#countDown


image.png
releaseShared

AbstractQueuedSynchronizer#releaseShared


image.png
tryReleaseShared

CountDownLatch.Sync#tryReleaseShared


image.png

释放过程,每调用一次countDown方法。获取当前的状态值,如果等于0返回false。并重新设置减一后的状态值。直到当前状态值等于0并返回true

doReleaseShared

唤醒线程执行
AbstractQueuedSynchronizer#doReleaseShared


image.png

这里是个死循环,AQS队列为空退出。只调用countDown方法。AQS队列没有初始化,一直为null

await()

CountDownLatch#await()
此方法会一直等待,直到Latch的计数器到达0的或调用interrupt方法


image.png

AbstractQueuedSynchronizer#acquireSharedInterruptibly


image.png
CountDownLatch.Sync#tryAcquireShared
image.png
AbstractQueuedSynchronizer#doAcquireSharedInterruptibly

当状态值不等于0时进入


image.png
addWaiter
AbstractQueuedSynchronizer#addWaiter
初始化队列并对当前线程入队
image.png
把当前调用await方法的初始化并添加当前线程到AQS队列,然后一直循环直到当前线程节点的前一个节点时head节点。再次调用
tryAcquireShared(arg)方法,查看当前状态值state是否是0(返回1),如果是则结束循环返回。

await(long,TimeUnit)

java.util.concurrent.CountDownLatch#await(long, java.util.concurrent.TimeUnit)


image.png

AbstractQueuedSynchronizer#tryAcquireSharedNanos


image.png
AbstractQueuedSynchronizer#doAcquireSharedNanos
image.png

调用过程与await()相似,主要多了一步如果超时直接返回无论是否执行完线程

Semaphore

示例代码

public class SemaphoreTest {
    public static void main(String[] args) throws InterruptedException {
        final Semaphore semaphore = new Semaphore( 2 );
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i <5 ; i++) {
            final  int a=i;
            service.execute( new Runnable() {
                @Override
                public void run() {
                    try {
//                      semaphore.tryAcquire( 1,TimeUnit.SECONDS );
                        semaphore.acquire();
                        TimeUnit.SECONDS.sleep( 1 );
                        System.out.println(a+System.currentTimeMillis());
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } );
        }

    }
}
Semaphore

Semaphore#Semaphore(int)
把构造方法参数创建Sync对面,默认是非公平


image.png

Semaphore.Sync
用来设置状态值state


image.png

acquire

Semaphore#acquire()
拿到一个许可信号量


image.png

AbstractQueuedSynchronizer#acquireSharedInterruptibly
判断得到许可值是否有可用的


image.png
tryAcquireShared

Semaphore.FairSync#tryAcquireShared


image.png

获取状态值。调用一次acquire()方法,状态值减1并通过CAS设置状态值,直到状态值为0。
如示例代码线程t1、t2(获取许可信号量,state变化)


image.png
如果前两个线程没有调用release()方法,第三个线程t3执行acquire()。tryAcquireShared方法直接返回-1。将会调用doAcquireSharedInterruptibly()方法。
AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
image.png

调用addWaiter()方法初始化队列,并把阻塞的线程t3添加到队列中,如果AQS头结点之后的节点没有获取信号量,则park当前线程。其它线程(t4、t5等等)调用tryAcquireShared方法查看当前队列有值,直接返回-1。并添加到队列中。释放节点,当第二个节点线程持有,可以持有许可信号量,便会执行setHeadAndPropagate方法重新设置头结点。

release

Semaphore#release()
释放许可信号量


image.png
image.png

Semaphore.Sync#tryReleaseShared


image.png
释放许可信号量,调用一次release方法状态值state加1。

总结:

CountDownLatch和Semaphore都是利用AQS的状态值state创建初始化容量大小
CountDownLatch 调用countDown方法,数量到达0,释放所有线程
Semaphore 调用acquire方法,持有许可。没有获取许可的线程加入AQS队列
CountDownLatch 状态值不可还原,Semaphore状态值可以恢复初始大小

上一篇下一篇

猜你喜欢

热点阅读