17. 并发终结之CyclicBarrier
2020-09-27 本文已影响0人
涣涣虚心0215
CyclicBarrier是让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程到达屏障,屏障打开,所有被阻塞的线程继续运行。
与CountDownLatch比较
CountDownLatch是一组线程等待其他一个或一组线程完成工作之后再执行,类似加强版join,await()用来等待,countDown()负责计数器的减一。
CyclicBarrier是一组线程内部进行等待,且可以循环使用。
CyclicBarrier(int parties, Runnable barrierAction)这个是在屏障开放时,barrierAction定义的线程会被执行。即用barrierAction来指定一个任务,以实现一种等待线程结束的效果;barrierAction中的任务只有在目标线程结束后才能被执行。
事实上,这完全可以通过Thread.jon()或者CountDownLatch来实现。
如果CyclicBarrier.await()的调用不在一个循环之中,并且使用的目的也不知为了模拟高并发,这就是滥用。
底层执行流程:
- 初始化CyclicBarrier中各种成员变量,包括parties(不可变),count(计数器)以及Runnable(可选)
- 当调用await方法时,底层先检查计数器是否归零,如果是的话,执行可选Runnable,并进入下一个generation。
- 进入nextGeneration,将会重置count = parties,并且创建新的generation实例。同时会调用signAll方法唤醒所有屏障前等待的线程。
- 如果计数器没有归零,那么当前调用condition.await()方法,在屏障前等待。
- 以上方法都在lock锁中,所以不会出现并发的情况。
源码分析
- 成员变量
CyclicBarrier内部使用lock和condition来使一组线程来等待以及唤醒。
final的parties表示线程数量,count表示一个generation里面在WAITING的线程个数,初始值是parties。
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;
- await
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//拿到lock并进行上锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//拿到generation
final Generation g = generation;
//如果这个generation是被broken的,则抛出BrokenBarrierException
if (g.broken)
throw new BrokenBarrierException();
//如果当前线程是被中断了
if (Thread.interrupted()) {
//breakBarrier会设置generation.broken=true,重置count=parties,并且signalAll唤醒所有等待的线程
breakBarrier();
throw new InterruptedException();
}
//每次一个线程进入await,就减一下count
int index = --count;
//当count减为0表示所有线程都到达屏障,则调用barrierCommand
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//nextGeneration则会唤醒所有的等待线程,并且重置count=parties以及generation
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//如果count没有到0,表示还有线程没有到达屏障,则当前线程一直await,加入condition queue,等待唤醒。
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}