juc并发组件(三)CountDownLatch 和Cyclic
2019-01-05 本文已影响0人
sadamu0912
为什么要把这两个放一起来呢?因为这两个概念有点像。都是屏障
的概念,区别在于,CountDownLatch 只能用一次,CyclicBarrier是可以用多次。
什么叫屏障
, 就是条件没有满足的时候,各个线程都不允许通过。只有条件满足了,才可以通过。这是一个计数为4 的屏障。

但是,他们两个的实现机制不一样,CountDownLatch 是采用AQS中的共享锁,调用的是本身内聚的同步器子类的tryAcquireShared方法,在上一节中我们提到,
tryAcquireShared
这个方法的返回值,如果小于0,代表没有获取到许可,无法进入临界区代码,countdownLatch 调用await 方法去获取同步状态
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
如果不是0,就代表获取不到同步状态。
然后countdownLatch 调用countDown()方法的时候,只是把state这个volatile变量-1 。实现比较简单。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
CyclicBarrier源码分析
首先上UML图:

可以发现,主要基于ReentrantLock和Condition实现。主要方法就是await()方法,和reset()方法
首先我们看await()方法
await()方法如何实现屏障
的语义。阻塞parties个线程,当最后到达时,屏障打开。
- 首先线程获取独占锁,
- count减去1,代表等待的线程数加1
- 判断count减去1之后的值是否等于0,如果==0,调用nextGeneration方法去trip.signalAll()唤醒
所有线程,执行后续方法 - 如果!=0 ,那么阻塞当前线程,插入条件队列
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 获取“独占锁(lock)”
lock.lock();
try {
// 保存“当前这一代屏障是否损坏”
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
// 如果index=0,则意味着“有parties个线程到达barrier”。
if (index == 0) { // tripped
boolean ranAction = false;
try {
//这里可以看出,满足条件时,先执行的barrierCommand,再唤醒其他线程去执行代码
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 调用trip.signalAll();唤醒线程,执行各自await方法后面的代码
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
//这里可以看出,调用cyclicBarrier.await()方法的时候,实际上是
//被ConditionObject的await()方法阻塞,而最终调用LockSupport.park(this);
//Disables the current thread for thread scheduling,释放锁
//被插入独占锁的trip条件等待队列中
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
让我们看一下,屏障被破坏
的语义
/**
* 屏障破了,所有await后面的代码都不执行了
*
*/
public class CyclicBarrierTest2 {
private static int count ;
public static void main(String[] args) throws Exception{
CyclicBarrier barrier1 = new CyclicBarrier(3);
ExecutorService executorService = Executors.newCachedThreadPool();
//添加一个用await()等待的线程
executorService.submit(() -> {
try {
//等待,除非:1.屏障打开;2.本线程被interrupt;3.其他等待线程被interrupted;4.其他等待线程timeout;5.其他线程调用reset()
count++;
barrier1.await();
System.out.println(Thread.currentThread().getName()+"====enter into critical code");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " is interrupted.");
//e.printStackTrace();
} catch (BrokenBarrierException e) {
System.out.println(Thread.currentThread().getName() + " is been broken.");
//e.printStackTrace();
}
});
Thread.sleep(10);
System.out.println("刚开始,屏障是否破损:" + barrier1.isBroken());
//添加一个等待线程-并超时
executorService.submit(() -> {
try {
//等待1s,除非:1.屏障打开(返回true);2.本线程被interrupt;3.本线程timeout;4.其他等待线程被interrupted;5.其他等待线程timeout;6.其他线程调用reset()
count++;
barrier1.await(1, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()+"====wait code after timeout");
for(long i=0;i<50000L;i++){
System.out.println(i);
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " is interrupted.");
//e.printStackTrace();
} catch (BrokenBarrierException e) {
System.out.println( barrier1.isBroken());
System.out.println(Thread.currentThread().getName() + " is been reset().");
//e.printStackTrace();
} catch (TimeoutException e) {
System.out.println("TimeoutException"+ barrier1.isBroken());
System.out.println(Thread.currentThread().getName() + " is timeout.");
//e.printStackTrace();
}
});
Thread.sleep(100);
System.out.println("当前等待线程数量:" + barrier1.getNumberWaiting());
Thread.sleep(1000);
System.out.println("当前等待线程数量:" + barrier1.getNumberWaiting());
System.out.println("当等待的线程timeout时,当前屏障是否破损:" + barrier1.isBroken());
System.out.println("等待的线程中,如果有一个出现问题,则此线程会抛出相应的异常;其他线程都会抛出BrokenBarrierException异常。");
System.out.println(count+"====count");
System.out.println();
Thread.sleep(1000);
//通过reset()重置屏障回初始状态,也包括是否破损
barrier1.reset();
System.out.println("reset()之后,当前屏障是否破损:" + barrier1.isBroken());
System.out.println("reset()之后,当前等待线程数量:" + barrier1.getNumberWaiting());
System.out.println(count+"====count");
executorService.shutdown();
}
}
输出结果:
刚开始,屏障是否破损:false
当前等待线程数量:2
pool-1-thread-1 is been broken.
TimeoutExceptiontrue
pool-1-thread-2 is timeout.
当前等待线程数量:0
当等待的线程timeout时,当前屏障是否破损:true
等待的线程中,如果有一个出现问题,则此线程会抛出相应的异常;其他线程都会抛出BrokenBarrierException异常。
2====count
reset()之后,当前屏障是否破损:false
reset()之后,当前等待线程数量:0
2====count
当await超时了之后,执行for循环中的
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
抛出TimeoutException
broken = true ,count= parties ,trip.signalAll(); 和reset还是有区别的,reset时broken = false
而且屏障破了之后,await后面的代码就都不执行了
最后我们看reset方法
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
breakBarrier唤醒所有线程,把broken设为true,nextGeneration唤醒所有线程,把broken=false。