深入理解 CyclicBarrier

2022-03-21  本文已影响0人  天还下着毛毛雨
image

1. 作用

CyclicBarrier 可以 让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会释放。

注意,被阻塞的线程 ,不是同时 被释放的。但时间间隔几乎 可以说没有。

2. 使用方式

2.1 代码示例

public static void main(String[] args) {
    // 到达屏障的线程数量 : 3
    CyclicBarrier cb = new CyclicBarrier(3);
    for (int i = 0; i < 3; i++) {
        int finalI = i;
        new Thread(()->{
            try {
                // 最后一个线程 休眠5秒, 看看所有到达屏障的线程 也是休眠5秒之后再执行。
                if(finalI % 3 == 2){
                    Thread.sleep(5000);
                }
                // 阻塞在这里(也可以理解为到达屏障),直到 到达屏障的线程数量 = 3才会被释放。
                cb.await();
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }).start();
    }
    System.out.println("主线程执行完毕........");
}

最终执行结果, 肯定是最后 到达屏障的 最先执行,其次才是之前到达屏障阻塞的所有线程。其他线程被释放的顺序 也不一定是 按照 await()方法的调用顺序来定的。稍后源码具体分析一下。

image

2.2 api

2.2.1. 构造方法

一个有参构造,一个无参构造,有参构造相对于无参构造来说 多传入了一个Runnable实例, 当一个周期的屏障被 释放后, 可以执行 Runnable实例的 run方法逻辑。

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

2.2.2. await()

使线程阻塞。

还有一个带超时时间的 await(long timeout, TimeUnit unit), 阻塞超过一定时间 就会抛出TimeoutException异常。

3. 源码分析

3.1 大致原理

CyclicBarrier 是利用 ReentrantLock锁 和 ReentrantLock里的条件队列 来实现 所有线程 在屏障处 阻塞的效果的。

dowait 大体逻辑(省略 一些判断的代码)

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock(); // q1 加锁
    try {
        // 判断count(需要到达屏障 的线程数) q2
        int index = --count;
        if (index == 0) {
            // 这个就是 有参构造传进来的第二个 参数 Runable 实例
            final Runnable command = barrierCommand;
            if (command != null)
            // 由最后一个到达屏障的线程 同步调用
               command.run();
            // 释放所有 q3
            nextGeneration();
            return 0;           
        }
        for (;;) {
            try {
                // 释放,进入条件队列阻塞 q4
                trip.await();
                // 被唤醒后 执行的代码 q5
                if (g != generation)
                   return index; 
            } 
        }
    } finally {
        // 释放锁 q6
        lock.unlock();
    }
}

源码执行流程

假设 需要到达屏障 的线程数 count = 3.

当第一个线程进入dowait方法,来抢锁, 抢锁成功(q1),count - 1 =2(q2),执行q4代码 :会调用 ReentrantLock里的Condition实例的await方法 释放锁(并不是ReentrantLock.unlock方法),在 初始化条件队列里,并休眠 。

当第二个线程进入dowait方法,来抢锁, 抢锁成功(q1),count - 1 =1(q2) ,执行q4代码 :会调用 ReentrantLock里的Condition实例的await方法 释放锁(并不是ReentrantLock.unlock方法) ,会接在条件队列的队尾,并休眠。

当第count个(最后一个)线程进入dowait方法, 来抢锁, 抢锁成功 (q1),count - 1 =0(q2),当count =0时 , 执行q3代码:会把条件队列里的所有线程节点, 移动到 等待ReentrantLock的 阻塞队列中去。(此时其他阻塞的线程还没有唤醒 )。并开启下一个周期。q3 return出去后,执行finally代码块 ,q6, unlock()会唤醒阻塞队列的第一个线程, 然后执行自己的业务代码。 第一个阻塞线程在 q5醒来后 在,最终return,然后执行 finally q6 去 唤醒下一个 线程, 然后执行自己的业务代码。下一个线程 也在 q5处被唤醒,执行的也是一样的逻辑,需要在解锁的时候 唤醒自己的下一个节点。 直到所有的线程 都被唤醒。

问题

现在解释下之前的问题

1. 为什么 是到达屏障的最后一个线程 最先执行 ?

从上面 的执行逻辑 可以得知, 前面的线程 都 到 条件队列里 阻塞住了,最后一个到达屏障的 线程 才 会先发起 唤醒阻塞线程的动作。

2. 为什么 线程被释放的顺序 不是 按照 线程调用await()的时间(比较极限)?

原因就在于 dowait 在一开始 是需要抢锁的, 这把锁作为CyclicBarrier的成员变量 在 实例化 CyclicBarrier的时候被初始化。

image

默认是把非公平锁。

如下图,这是某个线程持有锁 ,还没 进入 条件队列休眠,释放锁的 区间

image

假设并发很高, 在这段代码里 就有线程b,c,d 因为 抢不到锁被阻塞,而进入 阻塞队列中等待。假设 当前持有锁的线程 a 这个时候刚刚 进条件队列释放锁,还没来得及去 唤醒 阻塞队列里的线程来抢锁, 或者唤醒了 ,还在抢的路上。

这个时候又来一个线程 e 因为是非公平锁, 走到这直接抢锁,不看阻塞队列是否有线程在排队, 万一抢到锁了, 就跑去 条件队列里 休眠了。那么 最终 进入条件的队列 顺序 是 a,e,b,c,d 被唤醒的顺序 也是 a,e,b,c,d。

所以说 线程 恢复工作的顺序 不是 按照调用 await的顺序,而是 按照抢到锁的时间,由于是把非公平锁,所以在 并发很高的 情况下, 会 有这种 情况发生。

3. 被阻塞的线程 ,为什么不是同时 被释放的?

因为 唤醒 是 上一个线程调用unlock方法 去唤醒下一个线程的,是一个接着一个来的,所以不是同时 被释放。

上一篇下一篇

猜你喜欢

热点阅读