java多线程Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

java源码 - CyclicBarrier

2018-09-01  本文已影响0人  晴天哥_王志

开篇

CyclicBarrier用法demo

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");
        }
    }
}
线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...

CyclicBarrier类定义

public class CyclicBarrier {
    // 代的类定义
    private static class Generation {
        boolean broken = false;
    }

    // 内部通过ReentrantLock实现线程安全的等待
    private final ReentrantLock lock = new ReentrantLock();
    // 内部通过Lock的condition实现所有waiter的信号通知
    private final Condition trip = lock.newCondition();
    // 所有等待执行的个数
    private final int parties;
    // 所有等待线程都完成任务后由最后一个线程执行的命令
    private final Runnable barrierCommand;
   
    // 通过代的概念实现复用
    private Generation generation = new Generation();

    // 还在等待的个数
    private int count;

    // 核心构造函数
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

CyclicBarrier工作原理

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }


    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 通过lock来保证线程安全
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            // 判断generation过期的情况
            if (g.broken)
                throw new BrokenBarrierException();
            // 判断线程中断情况
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // 递减待执行的个数计数
            int index = --count;
            // 所有待执行任务完成后执行barrierCommand命令
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    // barrierCommand命令不为null的时候执行该命令
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    // 已经执行了barrierCommand
                    ranAction = true;
                    // 重置generation用以复用并且唤醒所有等待的线程
                    //       private void nextGeneration() {
                    //           trip.signalAll();
                    //           count = parties;
                    //           generation = new Generation();
                    //        }
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 如果count的值不为0,那么当前线程就开始进入等待
            // 外层通过lock占用锁,内层通过wait()进入休眠并释放锁
            for (;;) {
                try {
                    if (!timed)
                        // private final Condition trip = lock.newCondition();
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                
                 // 各种后置处理逻辑
                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

参考文章

Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

上一篇 下一篇

猜你喜欢

热点阅读