J.U.C之AQS-CyclicBarrier

2018-03-26  本文已影响0人  慕童

CyclicBarrier 也是 AQS 的同步组件

CyclicBarrier

AQS 的同步组件之 CyclicBarrier

CyclicBarrier也是一个同步辅助类 , 它允许一组线程相互等待 , 直到到达某个公共的屏障点 , 通过它可以完成多个线程之间相互等待 ,只有当每个线程都准备好之后, 才能各自继续往下执行后续的操作, 和 CountDownLatch相似的地方就是, 它也是通过计数器来实现的. 当某个线程调用了 await()方法之后, 该线程就进入了等待状态 . 而且计数器就进行 +1 操作 , 当计数器的值达到了我们设置的初始值的时候 , 之前调用了await() 方法而进入等待状态的线程会被唤醒继续执行后续的操作. 因为 CyclicBarrier释放线程之后可以重用, 所以又称之为循环屏障 . CyclicBarrier 使用场景和 CountDownLatch 很相似 , 可以用于多线程计算数据, 最后合并计算结果的应用场景 .

CyclicBarrier 与 CountDownLatch 区别

CyclicBarrier 代码演示


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by Charles 
 */
@Slf4j
public class CyclicBarrierExample1 {
    private static CyclicBarrier barrier = new CyclicBarrier(5); // 5 个线程同步等待

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await(); // 有一个线程准备 ok 了 , 当达到上面设置的5个线程 的时候 , 后续代码就开始执行了
        log.info("{} continue", threadNum);
    }
}

控制台输出


测试代码控制台输出结果

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

/**
 * Created by Charles 
 */
@Slf4j
public class CyclicBarrierExample2 {
    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        try {
            barrier.await(2000, TimeUnit.MILLISECONDS); //等待2000 毫秒后继续执行
        } catch (BrokenBarrierException | TimeoutException e) {
            log.warn("BarrierException",e);
        }
        log.info("{} continue", threadNum);
    }
}

控制台输出结果如下图 :


测试代码控制台输出结果
上一篇下一篇

猜你喜欢

热点阅读