juc并发组件(三)CountDownLatch 和Cyclic

2019-01-05  本文已影响0人  sadamu0912

为什么要把这两个放一起来呢?因为这两个概念有点像。都是屏障的概念,区别在于,CountDownLatch 只能用一次,CyclicBarrier是可以用多次。
什么叫屏障, 就是条件没有满足的时候,各个线程都不允许通过。只有条件满足了,才可以通过。这是一个计数为4 的屏障。

image.png
但是,他们两个的实现机制不一样,CountDownLatch 是采用AQS中的共享锁,调用的是本身内聚的同步器子类的tryAcquireShared方法,在上一节中我们提到, tryAcquireShared这个方法的返回值,如果小于0,代表没有获取到许可,无法进入临界区代码,countdownLatch 调用await 方法去获取同步状态
protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

如果不是0,就代表获取不到同步状态。
然后countdownLatch 调用countDown()方法的时候,只是把state这个volatile变量-1 。实现比较简单。

 protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

CyclicBarrier源码分析

首先上UML图:


image.png

可以发现,主要基于ReentrantLock和Condition实现。主要方法就是await()方法,和reset()方法

首先我们看await()方法

await()方法如何实现屏障的语义。阻塞parties个线程,当最后到达时,屏障打开。

private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
            TimeoutException {
        final ReentrantLock lock = this.lock;
        // 获取“独占锁(lock)”
        lock.lock();
        try {
            // 保存“当前这一代屏障是否损坏”
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            int index = --count;
            // 如果index=0,则意味着“有parties个线程到达barrier”。
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    //这里可以看出,满足条件时,先执行的barrierCommand,再唤醒其他线程去执行代码
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //  调用trip.signalAll();唤醒线程,执行各自await方法后面的代码
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        //这里可以看出,调用cyclicBarrier.await()方法的时候,实际上是
                        //被ConditionObject的await()方法阻塞,而最终调用LockSupport.park(this);
                        //Disables the current thread for thread scheduling,释放锁
                        //被插入独占锁的trip条件等待队列中
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

让我们看一下,屏障被破坏的语义

/**
 * 屏障破了,所有await后面的代码都不执行了
 *
 */
public class CyclicBarrierTest2 {
    private static int count ;
    public static void main(String[] args) throws Exception{

        CyclicBarrier barrier1 = new CyclicBarrier(3);
        ExecutorService executorService = Executors.newCachedThreadPool();
//添加一个用await()等待的线程
        executorService.submit(() -> {
            try {
                //等待,除非:1.屏障打开;2.本线程被interrupt;3.其他等待线程被interrupted;4.其他等待线程timeout;5.其他线程调用reset()
                count++;
                barrier1.await();
                System.out.println(Thread.currentThread().getName()+"====enter into critical code");
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " is interrupted.");
                //e.printStackTrace();
            } catch (BrokenBarrierException e) {
                System.out.println(Thread.currentThread().getName() + " is been broken.");
                //e.printStackTrace();
            }
        });
        Thread.sleep(10);
        System.out.println("刚开始,屏障是否破损:" + barrier1.isBroken());
//添加一个等待线程-并超时
        executorService.submit(() -> {
            try {
                //等待1s,除非:1.屏障打开(返回true);2.本线程被interrupt;3.本线程timeout;4.其他等待线程被interrupted;5.其他等待线程timeout;6.其他线程调用reset()
                count++;
                barrier1.await(1, TimeUnit.SECONDS);
                System.out.println(Thread.currentThread().getName()+"====wait code after timeout");
                for(long i=0;i<50000L;i++){
                    System.out.println(i);
                }
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " is interrupted.");
                //e.printStackTrace();
            } catch (BrokenBarrierException e) {
                System.out.println( barrier1.isBroken());
                System.out.println(Thread.currentThread().getName() + " is been reset().");
                //e.printStackTrace();
            } catch (TimeoutException e) {
                System.out.println("TimeoutException"+ barrier1.isBroken());
                System.out.println(Thread.currentThread().getName() + " is timeout.");
                //e.printStackTrace();
            }
        });
        Thread.sleep(100);
        System.out.println("当前等待线程数量:" + barrier1.getNumberWaiting());
        Thread.sleep(1000);
        System.out.println("当前等待线程数量:" + barrier1.getNumberWaiting());
        System.out.println("当等待的线程timeout时,当前屏障是否破损:" + barrier1.isBroken());
        System.out.println("等待的线程中,如果有一个出现问题,则此线程会抛出相应的异常;其他线程都会抛出BrokenBarrierException异常。");
        System.out.println(count+"====count");
        System.out.println();
        Thread.sleep(1000);
//通过reset()重置屏障回初始状态,也包括是否破损
        barrier1.reset();
        System.out.println("reset()之后,当前屏障是否破损:" + barrier1.isBroken());
        System.out.println("reset()之后,当前等待线程数量:" + barrier1.getNumberWaiting());
        System.out.println(count+"====count");
        executorService.shutdown();
    }
}

输出结果:

刚开始,屏障是否破损:false
当前等待线程数量:2
pool-1-thread-1 is been broken.
TimeoutExceptiontrue
pool-1-thread-2 is timeout.
当前等待线程数量:0
当等待的线程timeout时,当前屏障是否破损:true
等待的线程中,如果有一个出现问题,则此线程会抛出相应的异常;其他线程都会抛出BrokenBarrierException异常。
2====count

reset()之后,当前屏障是否破损:false
reset()之后,当前等待线程数量:0
2====count

当await超时了之后,执行for循环中的

if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }

抛出TimeoutException
broken = true ,count= parties ,trip.signalAll(); 和reset还是有区别的,reset时broken = false
而且屏障破了之后,await后面的代码就都不执行了

最后我们看reset方法

public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

breakBarrier唤醒所有线程,把broken设为true,nextGeneration唤醒所有线程,把broken=false。

上一篇 下一篇

猜你喜欢

热点阅读