CyclicBarrier
1、CyclicBarrier使用场景:
先来描述一下它的使用场景:有若干个线程,比如说有五个线程,需要它们都到达了某一个点之后才能开始一起执行,也就是说假如其中只有四个线程到达了这个点,还差一个线程没到达,此时这四个线程都会进入等待状态,直到第五个线程也到达了这个点之后,这五个线程才开始一起进行执行状态,是不是这个场景的描述跟CountDownLatch很类似的,下面用一个简单的示例图来感受一下它们两者的区别:
CountDownLatch使用场景图
image.pngCyclicBarrier使用场景图
image.png所有子线程都已经到达屏障之后,此时屏障就会消失,所有子线程继续执行,若存子线程尚未到达屏障,其他到达了屏障的线程都会进行等待
2、官方文档说明
它是一个同步的工具,能够允许一组线程去互相等待直到都到达了屏障,CyclicBarrier对于涉及到固定大小的线程是非常有用的,线程们必须相互等待。该屏障称之为循环屏障,是因为当等待屏障的线程被释放之后,该屏障能循环使用
3、关于CyclicBarrier的底层执行流程总结:
-
1、初始化CyclicBarrier中的各种成员变量,包括parties、count以及Runnable(可选);
-
2、当调用await()方法时,底层会先检查计数器是否已经归零,如果是的话,那么就首先执行可选的Runnable,接下来开始下一个generation;(注意:这里只是调用Runnable的run()方法,并不是调用start()方法开启另一个线程)
-
3、在下一个分代中,将会重置count值为parties,并且创建新的Generation实例;
-
4、同时会调用Condition的singalAll方法,唤醒所有在屏障前面等待的线程,让其开始继续执行;(注意:当有可选的Runnable时,是执行完run()方法中的汇总操作,其他线程才会继续执行)
-
5、如果计数器没有归零,那么当前的调用线程将会通过Condition的await方法,在屏障前进行等待;
-
6、以上所有执行流程均在lock锁的控制范围内,不会出现并发情况。
-
7、在下一个分代时,该屏障又可以继续使用,例如计数器是3,线程1,线程2和线程3冲破了当前屏障后,下一个分代的屏障可以去给线程4,线程5和线程6使用,也可以又给线程1,线程2和线程3使用(自己总结的)
4、典型事例
1、当没有可选的Runnable时
当所有线程到达屏障时,不需要进行汇总,最后一个线程到达时,屏障消除,所有线程继续执行
package com.concurrency2;
import java.util.Random;
import java.util.concurrent.CyclicBarrier;
public class MyTest1 {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
for(int i = 0;i < 3;i ++) {
new Thread(() -> {
try {
Thread.sleep((long)(Math.random() * 2000));
int randomInt = new Random().nextInt(500);
System.out.println("hello " + randomInt);
cyclicBarrier.await();
System.out.println("world " + randomInt);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
输出
hello 30
hello 471
hello 343
world 343
world 471
world 30
2、当有可选的Runnable时
当所有线程到达屏障时,需要进行汇总操作,等汇总操作进行完,屏障消除,所有线程继续执行
package com.concurrency2;
import java.util.Random;
import java.util.concurrent.CyclicBarrier;
public class MyTest1 {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println("汇总1 ...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("汇总2 ...");
});
//for(int u = 0, u < 2;u ++)//开两次屏障使用
for(int i = 0;i < 3;i ++) {
new Thread(() -> {
try {
Thread.sleep((long)(Math.random() * 2000));
int randomInt = new Random().nextInt(500);
System.out.println("hello " + randomInt);
cyclicBarrier.await();
System.out.println("world " + randomInt);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
输出
hello 4
hello 229
hello 73
汇总1 ...
汇总2 ...
world 73
world 229
world 4
5、CyclicBarrier源代码分析
前面讲过CountDownLatch是基于AQS实现的;而CyclicBarrier是基于ReentrantLock重入锁实现的,当然ReentrantLock也是基于AQS实现的,非要说CyclicBarrier也是基于AQS实现的也不为过。
1、重要成员变量
/ /可以理解为初始化时 需要阻塞的任务个数
private final int parties;
/ /剩余需要等待的任务个数,初始值为parties,直到为0时依次唤醒所有被阻塞的任务线程。
private int count;
/ /每次对“栅栏”的主要成员变量进行变更操作,都应该加锁
private final ReentrantLock lock = new ReentrantLock();
/ /用于阻塞和唤醒任务线程
private final Condition trip = lock.newCondition();
/ /在所有线程被唤醒前,需要执行的一个Runable对应的run方法
private final Runnable barrierCommand;
/ /用于表示“栅栏”当前的状态
private Generation generation = new Generation();
2、构造方法
CyclicBarrier有两个重载的构造方法,一个是不带Runnable参数,另一个带有Runnable参数。本质上都会调用带Runnable参数的构造方法进行实例化,这里只贴出带Runnable参数的构造方法实现:
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties; / /为了实现复用,进行备份
this.count = parties; / /初始化,待阻塞的任务总数
this.barrierCommand = barrierAction; / /初始化
}
3、核心方法
await()方法有两层含义:
1、先检查前面是否已经有count个线程了,如果没有线程则会进入等待状态
2、当检测到屏障已经有count个线程了,则所有线程会冲出屏障继续执行(如果有Runnable参数的构造方法先执行汇总方法)
int index = --count;操作很明显不是原子性的,如果在多线程中不加lock肯定会出问题
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); //加锁
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
//有一个线程线程被中断,整个CyclicBarrier将不可用
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count; //待等待的任务数减1
if (index == 0) { // 如果待等待的任务数减至0,依次唤醒所有线程
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();//唤醒前先执行Runnable对象的run方法
ranAction = true;
nextGeneration();//重置整个CyclicBarrier,方便下次重用
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//如果待等待的任务数大于0,进行线程阻塞,直到count为0时被唤醒
for (;;) {
try {
if (!timed)
trip.await();//阻塞当前线程
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);//延时阻塞当前线程
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)//异常唤醒
throw new BrokenBarrierException();
if (g != generation)//正常被唤醒,generation会被新建
return index;
if (timed && nanos <= 0L) {//延迟阻塞时间到后唤醒
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}