CountDownLatch与CyclicBarrier

2020-05-13  本文已影响0人  HannahLi_9f1c

CountDownLatch

简介

CountDownLatch使用AQS同步框架实现了多线程计时器。主线程等待所有线程完成任务之后,主线程再进行下一步动作。比如说吃鸡游戏要等待队友准备好后可以开始玩游戏,马拉松比赛等所有人跑完之后开始排名,都可以用CountDownLatch实现

使用方式
  1. 用构造器初始化计时器初始值
  2. countDown()函数将当前计数返回,表示当前线程已完成
  3. await()用在主线程中等待所有线程返回。然后进行下一步动作
CountDownLatch实现马拉松比赛
  1. 代码
public class CountDownLatchDemo {
    public static Map<Long, Long> map = new TreeMap<Long, Long>();
    public static final int RUNNER = 10;
    private static class Runner implements Runnable{
        CountDownLatch countDownLatch;
        public Runner(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void run() {
            long id = Thread.currentThread().getId();
            System.out.println("运动员"+id+"开始");
            long start = System.currentTimeMillis();
            int count = 0;
            for(int i = 0; i < 100000; i++) {
                count++;
            }
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long end = System.currentTimeMillis();
            System.out.println("运动员"+id+"用时"+(end-start));
            countDownLatch.countDown();
            map.put(id, start-end);
        }
    }
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(RUNNER);
        for(int i = 0; i < RUNNER; i++) {
            new Thread(new Runner(countDownLatch)).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("比赛结束,开始排名");
        int count = 0;
        for(long id:map.keySet()) {
            count++;
            System.out.println("第"+count+"名:运动员"+id+"用时"+map.get(id));
        }
    }
}

  1. 运行结果


    image.png
实现原理

CountDownLatch在内部构造一个静态sync类,sync类实现了AQS接口,AQS是由共享资源和同步双向队列组成,使用了模板方式。在使用AQS时实现独占锁时需要重写tryAcquire和tryRealse方法,在使用AQS实现共享锁时需要重写tryReleaseShared和tryAcquireShared。

  1. sync锁重写的tryAcquireShared,state值会在new CountDownLatch时初始化,然后通过countDown()方法将state值减一。tryAcquireShared()方法表示state值为0时返回1,否则返回-1.
   protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
  1. sync重写的tryReleaseShared方法,就是不断循环通过CAS将把state值减一,state值为0时返回true。
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
  1. countDown方法做了什么?
    public void countDown() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
 private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

  1. await()方法内部原理
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
实现旅游发签证
  1. 代码
public class CyclicBarrierDemo {
    public static void main(String [] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new TourGuideTask());
        Executor executor = Executors.newFixedThreadPool(3);
        //登哥最大牌,到的最晚
        executor.execute(new TravelArrive(cyclicBarrier,222,5));
        executor.execute(new TravelArrive(cyclicBarrier,333,3));
        executor.execute(new TravelArrive(cyclicBarrier,444,1));
    }

}
class TourGuideTask implements Runnable{

    @Override
    public void run() {
        System.out.println("分发护照");
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
class TravelArrive implements Runnable{
    private CyclicBarrier cyclicBarrier;
    private int time;
    private int no;
    public TravelArrive(CyclicBarrier cyclicBarrier, int no, int time) {
        this.cyclicBarrier = cyclicBarrier;
        this.time = time;
        this.no = no;
    }
    @Override
    public void run() {
        try {
            Thread.sleep(time*100);
            System.out.println("游客"+no+"已到达");
            cyclicBarrier.await();
            System.out.println("游客"+no+"开始旅行");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    }
}

CyclicBarrier

简介

CyclicBarrier也是基于AQS实现的,但是可以复用计数器,如同栅栏一般。与CountDownLatch不同的是,CyclicBarrier循环使用计数器。在旅游时,导游会先等待游客到达之后统一发送护照和签证,这个场景就可以用CyclicBarrier实现

  1. 运行结果


    image.png
上一篇下一篇

猜你喜欢

热点阅读