Java 并发程序员

【Java 并发笔记】CyclicBarrier 相关整理

2019-01-18  本文已影响0人  58bc06151329

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

1. 简介

1.1 CyclicBarrier 的应用场景

1.2 CyclicBarrier 方法说明

CyclicBarrier(parties) 方法

CyclicBarrier(parties,Runnable barrierAction) 方法

getParties 方法

getNumberWaiting 方法

await 方法

await(timeout,TimeUnit) 方法

isBroken 方法

reset 方法

1.3 CyclicBarrier 和 CountDownLatch 的区别

2. CyclicBarrier 原理

属性信息

//用于保护屏障入口的锁
private final ReentrantLock lock = new ReentrantLock();
//线程等待条件
private final Condition trip = lock.newCondition();
//记录参与等待的线程数
private final int parties;
//当所有线程到达屏障点之后,首先执行的命令
private final Runnable barrierCommand;
private Generation generation = new Generation();
//实际中仍在等待的线程数,每当有一个线程到达屏障点,count值就会减一;当一次新的运算开始后,count的值被重置为parties
private int count;

构造函数

public CyclicBarrier(int parties) {
    this(parties, null);
}
 
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

await 方法

/**
* 线程持续等待直到此barrier上的所有线程都调用了await()方法.
*
* 如果当前线程并不是到达的最后一个线程,则它被禁用线程调度目的,并且处于休眠状态,直到发生以下事件之一:
* 1.最后一个线程到达;
* 2.其他线程中断了当前线程.
* 3.其它线程中断了其它等待的线程.
* 4.在barrier上面等待的线程发生超时.
* 5.其它线程调用了barrier上面的reset方法.
*
* 如果当前线程:
* 1.在进入这一方法时,中断状态位被标记.
* 2.在等待过程中被中断
* 则会抛出中断异常InterruptedException,且当前线程的中断状态被清除.
*
* 会抛出BrokenBarrierException异常的情况有:
* 1.当其它线程在等待时,如果barrier被reset;
* 2.当调用await()方法时barrier发生了broken
*
* 任意等待线程发生了中断异常时,其它等待线程都会抛出BrokenBarrierException,且barrier的状态会变为broken.
*
* 如果当前线程是最后一个到达barrier的线程,且构造函数中的barrier action非null,则在其它线程可以继续执行前,当前线程会执行
* barrier action.
* 如果在barrier action的执行过程中发生了异常,则该异常会对当前线程产生影响,且barrier的会处于broken状态.
*
* @return 当前线程到达索引,第一个到达的索引值为:getParties() - 1;
*         最后一个到达的索引值为:0
*/
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 不超时等待
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
            BrokenBarrierException,
            TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    // 获取独占锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 当前代
        final Generation g = generation;
        // 如果这代损坏了,抛出异常
        if (g.broken)
            throw new BrokenBarrierException();
 
        // 如果线程中断了,抛出异常
        if (Thread.interrupted()) {
            // 将损坏状态设置为true
            // 并通知其他阻塞在此栅栏上的线程
            breakBarrier();
            throw new InterruptedException();
        }
 
        // 获取下标
        int index = --count;
        // 如果是 0,说明最后一个线程调用了该方法
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                // 执行栅栏任务
                if (command != null)
                    command.run();
                ranAction = true;
                // 更新一代,将count重置,将generation重置
                // 唤醒之前等待的线程
                nextGeneration();
                return 0;
            } finally {
                // 如果执行栅栏任务的时候失败了,就将损坏状态设置为true
                if (!ranAction)
                    breakBarrier();
            }
        }
 
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                 // 如果没有时间限制,则直接等待,直到被唤醒
                if (!timed)
                    trip.await();
                // 如果有时间限制,则等待指定时间
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 当前代没有损坏
                if (g == generation && ! g.broken) {
                    // 让栅栏失效
                    breakBarrier();
                    throw ie;
                } else {
                    // 上面条件不满足,说明这个线程不是这代的
                    // 就不会影响当前这代栅栏的执行,所以,就打个中断标记
                    Thread.currentThread().interrupt();
                }
            }
 
            // 当有任何一个线程中断了,就会调用breakBarrier方法
            // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
 
            // g != generation表示正常换代了,返回当前线程所在栅栏的下标
            // 如果 g == generation,说明还没有换代,那为什么会醒了?
            // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
            // 正是因为这个原因,才需要generation来保证正确。
            if (g != generation)
                return index;
            
            // 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放独占锁
        lock.unlock();
    }
}

BrokenBarrierException 异常

Generation 对象

/**
* barrier每一次使用都代表了一个generation实例.
* 当barrier发生trip或者reset时,对应的generation会发生改变.
* 由于非确定性,锁可能会分配给等待线程,因此可能会存在许多和使用barrier的线程相关的generation.
* 但是每次只能激活这些线程中的一个(使用计数的那个),并且其他的线程要么broken要么trip.
* 如果出现了一个暂停,但并未reset,则不需要一个激活的generation.
*/
private static class Generation {
    boolean broken = false;
}
private void breakBarrier() {
    // 设置状态
    generation.broken = true;
    // 恢复正在等待进入屏障的线程数量
    count = parties;
    // 唤醒所有线程
    trip.signalAll();
}
//当barrier发生trip时,用于更新状态并唤醒每一个线程.
//这一方法只在持有lock时被调用.
private void nextGeneration() {
    // signal completion of last generation
    // 唤醒所有线程
    trip.signalAll();
    // set up next generation
    // 恢复正在等待进入屏障的线程数量
    count = parties;
    // 新生一代
    generation = new Generation();
}

reset 方法

/**
* 将barrier状态重置.如果此时有线程在barrier处等待,它们会抛出BrokenBarrierException并返回.
* 注意:请注意,由于其他原因发生broken后重置可能会很复杂;线程需要通过一些方式来 完成同步,并选择一种方式完成reset.
* 相对为后续的使用重建一个barrier,此重置操作更受欢迎.
* 注意:这是一个需要加锁的操作.
*/
public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

isBroken 方法

public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

getNumberWaiting 方法

//返回当前在屏障处等待的参与者数目,此方法主要用于调试和断言。
public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally {
        lock.unlock();
    }
}

2.1 程序示例

public class CyclicBarrierTest {
    // 自定义工作线程
    private static class Worker extends Thread {
        private CyclicBarrier cyclicBarrier;
        
        public Worker(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        
        @Override
        public void run() {
            super.run();
            
            try {
                System.out.println(Thread.currentThread().getName() + "开始等待其他线程");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + "开始执行");
                // 工作线程开始处理,这里用Thread.sleep()来模拟业务处理
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + "执行完毕");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
 
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
        
        for (int i = 0; i < threadCount; i++) {
            System.out.println("创建工作线程" + i);
            Worker worker = new Worker(cyclicBarrier);
            worker.start();
        }
    }
}
/**
创建工作线程0
创建工作线程1
Thread-0开始等待其他线程
创建工作线程2
Thread-1开始等待其他线程
Thread-2开始等待其他线程
Thread-2开始执行
Thread-0开始执行
Thread-1开始执行
Thread-1执行完毕
Thread-0执行完毕
Thread-2执行完毕
*/

3. 使用 CyclicBarrier 的注意事项

4. 总结

参考资料

https://blog.csdn.net/qq_38293564/article/details/80558157

上一篇 下一篇

猜你喜欢

热点阅读