线程与安全

CyclicBarrier

2021-01-14  本文已影响0人  得力小泡泡

1、CyclicBarrier使用场景:

先来描述一下它的使用场景:有若干个线程,比如说有五个线程,需要它们都到达了某一个点之后才能开始一起执行,也就是说假如其中只有四个线程到达了这个点,还差一个线程没到达,此时这四个线程都会进入等待状态,直到第五个线程也到达了这个点之后,这五个线程才开始一起进行执行状态,是不是这个场景的描述跟CountDownLatch很类似的,下面用一个简单的示例图来感受一下它们两者的区别:

CountDownLatch使用场景图
image.png
CyclicBarrier使用场景图
image.png

所有子线程都已经到达屏障之后,此时屏障就会消失,所有子线程继续执行,若存子线程尚未到达屏障,其他到达了屏障的线程都会进行等待

2、官方文档说明

它是一个同步的工具,能够允许一组线程去互相等待直到都到达了屏障,CyclicBarrier对于涉及到固定大小的线程是非常有用的,线程们必须相互等待。该屏障称之为循环屏障,是因为当等待屏障的线程被释放之后,该屏障能循环使用

3、关于CyclicBarrier的底层执行流程总结:

4、典型事例

1、当没有可选的Runnable时

当所有线程到达屏障时,不需要进行汇总,最后一个线程到达时,屏障消除,所有线程继续执行

image.png
package com.concurrency2;

import java.util.Random;
import java.util.concurrent.CyclicBarrier;

public class MyTest1 {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        for(int i = 0;i < 3;i ++) {
            new Thread(() -> {
                try {
                    Thread.sleep((long)(Math.random() * 2000));

                    int randomInt = new Random().nextInt(500);
                    System.out.println("hello " + randomInt);

                    cyclicBarrier.await();

                    System.out.println("world " + randomInt);

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

输出

hello 30
hello 471
hello 343
world 343
world 471
world 30
2、当有可选的Runnable时

当所有线程到达屏障时,需要进行汇总操作,等汇总操作进行完,屏障消除,所有线程继续执行

image.png
package com.concurrency2;

import java.util.Random;
import java.util.concurrent.CyclicBarrier;

public class MyTest1 {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            System.out.println("汇总1 ...");

            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("汇总2 ...");
        });
       //for(int u = 0, u < 2;u ++)//开两次屏障使用
        for(int i = 0;i < 3;i ++) {
            new Thread(() -> {
                try {
                    Thread.sleep((long)(Math.random() * 2000));

                    int randomInt = new Random().nextInt(500);
                    System.out.println("hello " + randomInt);

                    cyclicBarrier.await();

                    System.out.println("world " + randomInt);

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

输出

hello 4
hello 229
hello 73
汇总1 ...
汇总2 ...
world 73
world 229
world 4

5、CyclicBarrier源代码分析

前面讲过CountDownLatch是基于AQS实现的;而CyclicBarrier是基于ReentrantLock重入锁实现的,当然ReentrantLock也是基于AQS实现的,非要说CyclicBarrier也是基于AQS实现的也不为过。

1、重要成员变量
    / /可以理解为初始化时 需要阻塞的任务个数
    private final int parties;
    / /剩余需要等待的任务个数,初始值为parties,直到为0时依次唤醒所有被阻塞的任务线程。
    private int count;
 
    / /每次对“栅栏”的主要成员变量进行变更操作,都应该加锁
    private final ReentrantLock lock = new ReentrantLock();
    / /用于阻塞和唤醒任务线程
   private final Condition trip = lock.newCondition();
 
    / /在所有线程被唤醒前,需要执行的一个Runable对应的run方法
    private final Runnable barrierCommand;
    / /用于表示“栅栏”当前的状态
    private Generation generation = new Generation();
2、构造方法

CyclicBarrier有两个重载的构造方法,一个是不带Runnable参数,另一个带有Runnable参数。本质上都会调用带Runnable参数的构造方法进行实例化,这里只贴出带Runnable参数的构造方法实现:

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;    / /为了实现复用,进行备份
        this.count = parties;   / /初始化,待阻塞的任务总数
        this.barrierCommand = barrierAction;   / /初始化
    }
3、核心方法
await()方法有两层含义:

1、先检查前面是否已经有count个线程了,如果没有线程则会进入等待状态
2、当检测到屏障已经有count个线程了,则所有线程会冲出屏障继续执行(如果有Runnable参数的构造方法先执行汇总方法)

int index = --count;操作很明显不是原子性的,如果在多线程中不加lock肯定会出问题

private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
 
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock(); //加锁
        try {
            final Generation g = generation;
 
            if (g.broken)
                throw new BrokenBarrierException();
            //有一个线程线程被中断,整个CyclicBarrier将不可用
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
 
            int index = --count; //待等待的任务数减1
            if (index == 0) {  // 如果待等待的任务数减至0,依次唤醒所有线程
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();//唤醒前先执行Runnable对象的run方法
                    ranAction = true;
                    nextGeneration();//重置整个CyclicBarrier,方便下次重用
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
 
            //如果待等待的任务数大于0,进行线程阻塞,直到count为0时被唤醒
            for (;;) {
                try {
                    if (!timed)
                        trip.await();//阻塞当前线程
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);//延时阻塞当前线程
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
 
                if (g.broken)//异常唤醒
                    throw new BrokenBarrierException();
 
                if (g != generation)//正常被唤醒,generation会被新建
                    return index;
 
                if (timed && nanos <= 0L) {//延迟阻塞时间到后唤醒
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
}
上一篇下一篇

猜你喜欢

热点阅读