AQS之CountDownLatch和Semaphore源码分析
CountDownLatch
示例代码
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(5 );
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i <5 ; i++) {
final int a=i;
service.execute( new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep( 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(a);
latch.countDown();
}
} );
}
// latch.await();
latch.await(1, TimeUnit.SECONDS);
System.out.println("1111111111111111");
}
}
CountDownLatch
CountDownLatch#CountDownLatch
创建CountDownLatch类,把传入参数创建AQS的实现类Sync
image.png
Sync
CountDownLatch.Sync#Sync
创建Sync类,调用setState(count)方法。把状态值state改为CountDownLatch的传入参数
image.png
countDown
执行countDown方法
CountDownLatch#countDown
image.png
releaseShared
AbstractQueuedSynchronizer#releaseShared
image.png
tryReleaseShared
CountDownLatch.Sync#tryReleaseShared
image.png
释放过程,每调用一次countDown方法。获取当前的状态值,如果等于0返回false。并重新设置减一后的状态值。直到当前状态值等于0并返回true
doReleaseShared
唤醒线程执行
AbstractQueuedSynchronizer#doReleaseShared
image.png
这里是个死循环,AQS队列为空退出。只调用countDown方法。AQS队列没有初始化,一直为null
await()
CountDownLatch#await()
此方法会一直等待,直到Latch的计数器到达0的或调用interrupt方法
image.png
AbstractQueuedSynchronizer#acquireSharedInterruptibly
image.png
CountDownLatch.Sync#tryAcquireShared
image.png
AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
当状态值不等于0时进入
image.png
addWaiter
AbstractQueuedSynchronizer#addWaiter
初始化队列并对当前线程入队
image.png
把当前调用await方法的初始化并添加当前线程到AQS队列,然后一直循环直到当前线程节点的前一个节点时head节点。再次调用
tryAcquireShared(arg)方法,查看当前状态值state是否是0(返回1),如果是则结束循环返回。
await(long,TimeUnit)
java.util.concurrent.CountDownLatch#await(long, java.util.concurrent.TimeUnit)
image.png
AbstractQueuedSynchronizer#tryAcquireSharedNanos
image.png
AbstractQueuedSynchronizer#doAcquireSharedNanos
image.png
调用过程与await()相似,主要多了一步如果超时直接返回无论是否执行完线程
Semaphore
示例代码
public class SemaphoreTest {
public static void main(String[] args) throws InterruptedException {
final Semaphore semaphore = new Semaphore( 2 );
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i <5 ; i++) {
final int a=i;
service.execute( new Runnable() {
@Override
public void run() {
try {
// semaphore.tryAcquire( 1,TimeUnit.SECONDS );
semaphore.acquire();
TimeUnit.SECONDS.sleep( 1 );
System.out.println(a+System.currentTimeMillis());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} );
}
}
}
Semaphore
Semaphore#Semaphore(int)
把构造方法参数创建Sync对面,默认是非公平
image.png
Semaphore.Sync
用来设置状态值state
image.png
acquire
Semaphore#acquire()
拿到一个许可信号量
image.png
AbstractQueuedSynchronizer#acquireSharedInterruptibly
判断得到许可值是否有可用的
image.png
tryAcquireShared
Semaphore.FairSync#tryAcquireShared
image.png
获取状态值。调用一次acquire()方法,状态值减1并通过CAS设置状态值,直到状态值为0。
如示例代码线程t1、t2(获取许可信号量,state变化)
image.png
如果前两个线程没有调用release()方法,第三个线程t3执行acquire()。tryAcquireShared方法直接返回-1。将会调用doAcquireSharedInterruptibly()方法。
AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
image.png
调用addWaiter()方法初始化队列,并把阻塞的线程t3添加到队列中,如果AQS头结点之后的节点没有获取信号量,则park当前线程。其它线程(t4、t5等等)调用tryAcquireShared方法查看当前队列有值,直接返回-1。并添加到队列中。释放节点,当第二个节点线程持有,可以持有许可信号量,便会执行setHeadAndPropagate方法重新设置头结点。
release
Semaphore#release()
释放许可信号量
image.png
image.png
Semaphore.Sync#tryReleaseShared
image.png
释放许可信号量,调用一次release方法状态值state加1。
总结:
CountDownLatch和Semaphore都是利用AQS的状态值state创建初始化容量大小
CountDownLatch 调用countDown方法,数量到达0,释放所有线程
Semaphore 调用acquire方法,持有许可。没有获取许可的线程加入AQS队列
CountDownLatch 状态值不可还原,Semaphore状态值可以恢复初始大小