CountDownLatch与CyclicBarrier
2020-05-13 本文已影响0人
HannahLi_9f1c
CountDownLatch
简介
CountDownLatch使用AQS同步框架实现了多线程计时器。主线程等待所有线程完成任务之后,主线程再进行下一步动作。比如说吃鸡游戏要等待队友准备好后可以开始玩游戏,马拉松比赛等所有人跑完之后开始排名,都可以用CountDownLatch实现
使用方式
- 用构造器初始化计时器初始值
- countDown()函数将当前计数返回,表示当前线程已完成
- await()用在主线程中等待所有线程返回。然后进行下一步动作
CountDownLatch实现马拉松比赛
- 代码
public class CountDownLatchDemo {
public static Map<Long, Long> map = new TreeMap<Long, Long>();
public static final int RUNNER = 10;
private static class Runner implements Runnable{
CountDownLatch countDownLatch;
public Runner(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
long id = Thread.currentThread().getId();
System.out.println("运动员"+id+"开始");
long start = System.currentTimeMillis();
int count = 0;
for(int i = 0; i < 100000; i++) {
count++;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("运动员"+id+"用时"+(end-start));
countDownLatch.countDown();
map.put(id, start-end);
}
}
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(RUNNER);
for(int i = 0; i < RUNNER; i++) {
new Thread(new Runner(countDownLatch)).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("比赛结束,开始排名");
int count = 0;
for(long id:map.keySet()) {
count++;
System.out.println("第"+count+"名:运动员"+id+"用时"+map.get(id));
}
}
}
-
运行结果
image.png
实现原理
CountDownLatch在内部构造一个静态sync类,sync类实现了AQS接口,AQS是由共享资源和同步双向队列组成,使用了模板方式。在使用AQS时实现独占锁时需要重写tryAcquire和tryRealse方法,在使用AQS实现共享锁时需要重写tryReleaseShared和tryAcquireShared。
- sync锁重写的tryAcquireShared,state值会在new CountDownLatch时初始化,然后通过countDown()方法将state值减一。tryAcquireShared()方法表示state值为0时返回1,否则返回-1.
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
- sync重写的tryReleaseShared方法,就是不断循环通过CAS将把state值减一,state值为0时返回true。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
- countDown方法做了什么?
- 通过sync调用父类的releaseShared方法
public void countDown() {
sync.releaseShared(1);
}
- tryReleaseShared是子类重写的,将state值减一,减为0时会调用doRealaseShare方法唤醒同步队列中等待的线程。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
- doRealeaseShared要保证可以并发释放锁,所以使用了CAS改变状态。从头结点中依次寻找等待中的结点并唤醒。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
- await()方法内部原理
- 调用AQS中acquireSharedInterruptibly方法,表示可以响应中断。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
- tryAcquireShared判断state是否为0,为0返回1,方法返回。否则执行doAcquireSharedInterruptibly方法。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
- 通过addWaiter将共享结点入队,先CAS直接插入尾部,失败的话自旋CAS保证能够入队成功。然后获取前继结点,如果前继为head并且state值为0,将当前结点设为头结点,然后await方法可以返回。shouldParkAfterFailedAcquire是用来将前继结点标记为SIGNAL。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
实现旅游发签证
- 代码
public class CyclicBarrierDemo {
public static void main(String [] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new TourGuideTask());
Executor executor = Executors.newFixedThreadPool(3);
//登哥最大牌,到的最晚
executor.execute(new TravelArrive(cyclicBarrier,222,5));
executor.execute(new TravelArrive(cyclicBarrier,333,3));
executor.execute(new TravelArrive(cyclicBarrier,444,1));
}
}
class TourGuideTask implements Runnable{
@Override
public void run() {
System.out.println("分发护照");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class TravelArrive implements Runnable{
private CyclicBarrier cyclicBarrier;
private int time;
private int no;
public TravelArrive(CyclicBarrier cyclicBarrier, int no, int time) {
this.cyclicBarrier = cyclicBarrier;
this.time = time;
this.no = no;
}
@Override
public void run() {
try {
Thread.sleep(time*100);
System.out.println("游客"+no+"已到达");
cyclicBarrier.await();
System.out.println("游客"+no+"开始旅行");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
CyclicBarrier
简介
CyclicBarrier也是基于AQS实现的,但是可以复用计数器,如同栅栏一般。与CountDownLatch不同的是,CyclicBarrier循环使用计数器。在旅游时,导游会先等待游客到达之后统一发送护照和签证,这个场景就可以用CyclicBarrier实现
-
运行结果
image.png