线程JUC 并发专题一些收藏

Java并发编程——CyclicBarrier

2021-12-11  本文已影响0人  小波同学

一、CyclicBarrier循环栅栏

上面5只小熊,准备跑到起跑线,跑到起跑线等待,相当于执行了await方法,等到所有小熊准备就绪之后,然后一起开跑。这就很好的揭示了内存屏障的作用了。

二、执行原理

CyclicBarrier是基于ReentrantLock的Condition来实现的。

如下图,栅栏中有两个关键属性:

其中CyclicBarrier的await()方法封装了对ReentrantLock条件锁的使用,主要处理流程:

await()能够响应中断。除此之外,await还提供了带有超时的实现await(long timeout, TimeUnit unit),以及reset()方法重新开启下一轮。

三、CyclicBarrier的用法

3.1 CyclicBarrier构造方法

3.2 CyclicBarrier方法

3.3 CyclicBarrier使用

/**
 * @Description: 演示CyclicBarrier的使用
 */
public class CyclicBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
            System.out.println("所有人都到场了,大家统一出发");
        });

        for (int i = 0; i < 10; i++) {
            final int id = i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName()+",id:"+id+"现前往集合地点");
                try {
                    Thread.sleep(new Random().nextInt(10000));
                    System.out.println(Thread.currentThread().getName()+"到了集合地点,开始等待其他人到达");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName()+"出发了");
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

3.4 CyclicBarrier和CountDownLatch的区别

四、源码

CyclicBarrier是由ReentrantLock可重入锁和Condition共同实现的。

4.1 CyclicBarrier构造方法

public class CyclicBarrier {

    //同步操作锁
    private final ReentrantLock lock = new ReentrantLock();
    
    //线程拦截器
    private final Condition trip = lock.newCondition();
    
    //每次拦截的线程数
    private final int parties;
    
    //换代前执行的任务
    private final Runnable barrierCommand;
    
    //表示栅栏的当前代
    private Generation generation = new Generation();
    
    //计数器
    private int count;
    
    //静态内部类Generation
    private static class Generation {
        boolean broken = false;
    }
    
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        // parties表示“必须同时到达barrier的线程个数”。
        this.parties = parties;
        // count表示“处在等待状态的线程个数”。
        this.count = parties;
        // barrierCommand表示“parties个线程到达barrier时,会执行的动作”。
        this.barrierCommand = barrierAction;
    }
}

4.2 await源码分析

public class CyclicBarrier {

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
}
public class CyclicBarrier {

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        // 获取“独占锁(lock)”
        lock.lock();
        try {
            // 保存“当前的generation”
            final Generation g = generation;

            // 若“当前generation已损坏”,则抛出异常。
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // 将“count计数器”-1
            int index = --count;
            // 如果index=0,则意味着“有parties个线程到达barrier”。
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // 如果barrierCommand不为null,则执行该动作。
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 唤醒所有等待线程,并更新generation。
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 当前线程一直循环,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,
            // 当前线程才继续执行。
            for (;;) {
                try {
                    // 如果不是“超时等待”,则调用await()进行等待;否则,调用awaitNanos()进行等待。
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 如果等待过程中,线程被中断,则执行下面的函数。
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                // 如果“当前generation已经损坏”,则抛出异常。
                if (g.broken)
                    throw new BrokenBarrierException();
                // 如果“generation已经换代”,则返回index。
                if (g != generation)
                    return index;
                // 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,
                // 唤醒CyclicBarrier中所有等待线程,并抛出TimeoutException异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            // 释放“独占锁(lock)”
            lock.unlock();
        }
    }
}

参考:
https://www.itzhai.com/articles/graphical-several-fun-concurrent-helper-classes.html

https://www.cnblogs.com/200911/p/6060195.html

上一篇 下一篇

猜你喜欢

热点阅读