基础知识

Java Semaphore/CountDownLatch/Cy

2021-03-19  本文已影响0人  小鱼人爱编程

前言

线程并发系列文章:

Java 线程基础
Java “优雅”地中断线程
Java 线程状态
真正理解Java Volatile的妙用
Java ThreadLocal你之前了解的可能有误
Java Unsafe/CAS/LockSupport 应用与原理
Java 并发"锁"的本质(一步步实现锁)
Java Synchronized实现互斥之应用与源码初探
Java 对象头分析与使用(Synchronized相关)
Java Synchronized 偏向锁/轻量级锁/重量级锁的演变过程
Java Synchronized 重量级锁原理深入剖析上(互斥篇)
Java Synchronized 重量级锁原理深入剖析下(同步篇)
Java并发之 AQS 深入解析(上)
Java并发之 AQS 深入解析(下)
Java Thread.sleep/Thread.join/Thread.yield/Object.wait/Condition.await 详解
Java 并发之 ReentrantLock 深入分析(与Synchronized区别)
Java 并发之 ReentrantReadWriteLock 深入分析
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(原理篇)
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(应用篇)
最详细的图文解析Java各种锁(终极篇)

前面分析了基于AQS的独占锁ReentrantLock、共享锁/独占锁ReentrantReadWriteLock,它们内部都实现了Lock 接口。而AQS还有其它常用的子类封装器,它们虽然没有实现Lock接口,但可以用来做线程间的同步,接下来将要来深入了解它们。
通过本篇文章,你将了解到:

1、Semaphore 原理分析
2、CountDownLatch 原理分析
3、CyclicBarrier 原理分析

1、Semaphore 原理分析

场景引入

ReentrantReadWriteLock 里有读锁和写锁,其中读锁是共享锁,其核心是对AQS里的"state"变量进行操作,每获取一次锁,将state加1,释放锁将state减1。从这里可以看出,将state作为共享资源能够实现线程间的协作。
现在有个需求:资源是共享的,但是数量有限,因此没拿到资源的需要等待别人释放资源。

将state作为标记共享资源的数量,那么就有:

1、线程占有资源后将state减1,线程释放资源后将state加1。
2、若线程没拿到资源(资源都被其它线程占有了),那么挂起等待。
3、线程释放资源后,唤醒其它等待该资源的线程。

image.png

这样子,不用synchronized+wait/notify与ReentrantLock+await/signal,也依然能够实现线程间同步。
具体到现实场景:

如停车场只能容纳一定数量的车子,当停车场停满了车(入场许可发放完了),其它想进来的车子必须等待有其它车从停车场开出(释放入场许可)。

Semaphore 构造

指定初始的许可个数:

#Semaphore.java
    public Semaphore(int permits) {
        //默认是非公平
        sync = new NonfairSync(permits);
    }

    Sync(int permits) {
            setState(permits);
        }

可以看出许可的个数就是state的值。

Semaphore 占有许可

通过调用acquire(xx)占有许可:

#Semaphore.java
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        //交给AQS处理,可中断
        sync.acquireSharedInterruptibly(permits);
    }

#AbstractQueuedSynchronizer.java
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //发生了中断,直接返回
        if (Thread.interrupted())
            throw new InterruptedException();
        //尝试修改state(减)
        if (tryAcquireShared(arg) < 0)
            //修改失败,则挂起等待
            doAcquireSharedInterruptibly(arg);
    }

每次可以占有多个许可,若占有成功则直接返回,否则挂起等待。
具体的操作state在tryAcquireShared(xx)里实现,此处以非公平模式说明:

#Semaphore.java
        final int nonfairTryAcquireShared(int acquires) {
            //死循环确保修改state成功,或者state已经获取完了
            for (;;) {
                //获取state
                int available = getState();
                //减少state
                int remaining = available - acquires;
                if (remaining < 0 ||
                    //CAS 操作
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

Semaphore 释放许可

占有许可做了相应的任务后,就可以释放许可了。
通过调用release(xx)释放许可:

#Semaphore.java
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        //AQS 实现
        sync.releaseShared(permits);
    }

#AbstractQueuedSynchronizer.java
    public final boolean releaseShared(int arg) {
        //尝试修改state(加)
        if (tryReleaseShared(arg)) {
            //成功修改state,唤醒后继节点
            doReleaseShared();
            return true;
        }
        //修改失败
        return false;
    }

具体的操作state在tryReleaseShared(xx)里实现:

#Semaphore.java
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                //获取state
                int current = getState();
                //增加
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                //修改
                if (compareAndSetState(current, next))
                    return true;
            }
        }

可以看出:

释放许可,增加state,占有许可,减少state。

另外,Semaphore 占有许可可分为公平与非公平模式,占有许可过程可中断/不可中断。


image.png

Semaphore 与Lock 区别

与ReentrantLock、ReentrantReadWriteLock 区别在于从不同的角度看待state:

1、ReentrantLock、ReentrantReadWriteLock 获取锁的过程是将state值增大,而Semaphore 占有许可是将state值减小。
2、ReentrantLock、ReentrantReadWriteLock 释放锁的过程是将state值减小,而Semaphore 释放许可是将state值增大。
3、这也是AQS的灵活之处,将具体的"state"锁代表的意义由子类实现,可实现不同场景的应用。

2、CountDownLatch 原理分析

场景引入

A、B、C三个线程协作:

A 等待B、C完成任务后再进行下一步操作。

这场景我们可能会想到用Thread.join(),A调用B.join(),C.join(),A阻塞等待,当B、C线程执行结束后唤醒A。这种方式虽然能够解决问题,但是有些不尽人意的地方:比如说A不一定要等待B、C执行完成,而是B、C中途完成某个任务后通知A;又比如,B、C线程不止执行一次任务,而是一定的次数后才会唤醒A,这个时候使用Thread.join() 就无法解决问题了。
而CountDownLatch 可以很好地解决这问题。

CountDownLatch 构造

#CountDownLatch.java
    //初始化次数
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    Sync(int count) {
        //设置state
            setState(count);
        }

可以看出,count的值最终反馈到state上。

CountDownLatch 等待

通过await(xx)等待state变为0:

#CountDownLatch.java
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        //超时返回
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

#AbstractQueuedSynchronizer.java
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        //该方法响应中断
        if (Thread.interrupted())
            throw new InterruptedException();
        //主要工作在tryAcquireShared(xx)里
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

又是AQS的套路,具体的操作state在tryAcquireShared(xx)里实现:

#CountDownLatch.java
    protected int tryAcquireShared(int acquires) {
        //若state == 0,则返回1,否则-1
        //外层判断>=0,说明当前state还有数量,则需要阻塞等待,否则不阻塞
            return (getState() == 0) ? 1 : -1;
        }

与其它子类实现的tryAcquireShared(xx)方法不同的是,CountDownLatch里的Sync并没有修改state的值,仅仅只是判断state?=0进而做具体的操作而已。
由此可知:CountDownLatch 是基于AQS的共享模式。

CountDownLatch 倒数计数

既然调用await(xx)可能会使得线程阻塞等待,那么势必有其它线程唤醒它,调用的方法即是countDown():

#CountDownLatch.java
    public void countDown() {
        sync.releaseShared(1);
    }

#AbstractQueuedSynchronizer.java
    public final boolean releaseShared(int arg) {
        //子类实现
        if (tryReleaseShared(arg)) {
            //AQS里实现,唤醒阻塞的线程
            doReleaseShared();
            return true;
        }
        return false;
    }

同样的,具体的操作state在tryReleaseShared(xx)里实现:

#CountDownLatch.java
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                //获取state
                int c = getState();
                //若当前state==0,说明已经没有可以释放的了
                if (c == 0)
                    return false;
                int nextc = c-1;
                //CAS修改
                if (compareAndSetState(c, nextc))
                    //说明可以唤醒其它线程了
                    return nextc == 0;
            }
        }

也即是说,当线程调用await(xx)阻塞后,其它线程通过countDown()修改state值,若是发现state最终变为0了,那么唤醒阻塞的线程。
用图表示CountDownLatch主要结构如下:


image.png

CountDownLatch 与其它AQS子类封装器的区别

前面已经分析了基于AQS的封装器:ReentrantLock、ReentrantReadWriteLock、Semaphore,它们对state值的修改包括增加与减少,而CountDownLatch 只是减小state的值,用以实现倒数计数的功能。
可类比场景如下:

1、田径运动场开始百米赛跑。
2、运动员在跑道上各就各位(多个线程调用await 阻塞等待)。
3、裁判喊倒数3、2、1(线程调用countDown)。
4、等待倒数结束,发令枪响,运动员就开始跑(线程被唤醒,继续做事)。

可以看出,运动员不会去干涉裁判的倒数(修改state值)。

3、CyclicBarrier 原理分析

场景引入

在CountDownLatch 场景里说到运动员需要裁判,想想可以不需要裁判吗?运动员之间自发倒数,倒数结束就一起跑。
更普遍的场景是:

1、几个驴友想去某个景点旅游,约定了在某个地方集合后再一起出发。
2、每个驴友到达集合点时打卡并看人都到齐了没,没到齐则等待。
3、若最后一个参与者过来后发现人到齐了,于是告诉大家不用等了,出发吧。

CyclicBarrier 可满足该场景的需求。

CyclicBarrier 构造

#CyclicBarrier.java
    public CyclicBarrier(int parties, Runnable barrierAction) {
        //必须要有参与者
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        //临时变量count
        this.count = parties;
        //参与者都到达了后执行的动作
        this.barrierCommand = barrierAction;
    }

可以看出,此处并没有AQS介入,也就是没有直接修改state。
CyclicBarrier是通过ReentrantLock + Condition 来实现线程间同步的:

#CyclicBarrier.java
    //独占锁,为了互斥修改count
    private final ReentrantLock lock = new ReentrantLock();
    //线程等待条件
    private final Condition trip = lock.newCondition();
    //修改的共享变量
    private int count;

CyclicBarrier 等待参与者

接着来分析,如何实现线程间的同步的。

#CyclicBarrier.java
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            //实际调用doWait(),此处是不限时等待
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //先锁住
        lock.lock();
        try {
            final Generation g = generation;
            //等待过程被中断
            if (g.broken)
                throw new BrokenBarrierException();
            //中断了线程
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //等待个数-1
            int index = --count;
            if (index == 0) {
                //都到齐了,无需等待了
                boolean ranAction = false;
                try {
                    //执行既定的方法
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //开始下一轮
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            //走到这,说明还需要等待
            for (;;) {
                try {
                    if (!timed)
                        //不限时等待
                        trip.await();
                    else if (nanos > 0L)
                        //限时等待,时间到了就返回
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //等待过程被中断,则抛出异常
                    if (g == generation && ! g.broken) {
                        //不等了,唤醒其它线程
                        breakBarrier();
                        throw ie;
                    } else {
                        ...
                        Thread.currentThread().interrupt();
                    }
                }

                //醒来后发现已经被中断了,则直接抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();

                //已经开启了下一轮,说明前面一轮都到齐了结束了
                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    //超时了还是没到齐,不等了,唤醒其它线程
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

线程先获取独占锁,然后修改count值,若发现修改后count !=0,那么还需要等待,等待借助的是Condition.await(xx)方法。
有等待,自然有唤醒的地方:

#CyclicBarrier.java
    private void breakBarrier() {
        //置为true,表示已经结束等待了
        generation.broken = true;
        //重置count,复用的关键
        count = parties;
        //唤醒其它在等待的线程
        trip.signalAll();
    }

用图表示,等待/唤醒过程如下:


image.png

来看看CyclicBarrier 主要方法:


image.png

CyclicBarrier与CountDownLatch 区别

看到这,你也许已经发现了CyclicBarrier 和CountDownLatch 实现的功能很相似,都是等待某个条件满足后再进行下一步的动作,两者不同之处在于:

1、CountDownLatch 参与的线程分为两类:一个是等待者,另一个是计数者;CyclicBarrier 参与的线程既是等待者,也是计数者。
2、CountDownLatch 完成一次完整的协作过程后不能再复用,CountDownLatch 可以复用(不用重新新建CountDownLatch 对象)。
3、CountDownLatch 的计数值与线程个数没有必然联系,CyclicBarrier 的初始计数值与线程个数一致。
4、CountDownLatch 基于AQS实现,CyclicBarrier 基于ReentrantLock&Condition实现(内部也是基于AQS)。

你可能还有疑惑,在下一篇应用篇将会重点体现两者区别。
下篇文章重点分析:Semaphore/CountDownLatch/CyclicBarrier 实际应用。

本文基于jdk1.8。

您若喜欢,请点赞、关注,您的鼓励是我前进的动力

持续更新中,和我一起步步为营系统、深入学习Android/Java

1、Android各种Context的前世今生
2、Android DecorView 一窥全貌(上)
3、Android DecorView 一窥全貌(下)
4、Window/WindowManager 不可不知之事
5、View Measure/Layout/Draw 真明白了
6、Android事件分发全套服务
7、Android invalidate/postInvalidate/requestLayout 彻底厘清
8、Android Window 如何确定大小/onMeasure()多次执行原因
9、Android事件驱动Handler-Message-Looper解析
10、Android 键盘一招搞定
11、Android 各种坐标彻底明了
12、Android Activity/Window/View 的background
13、Android IPC 之Service 还可以这么理解
14、Android IPC 之Binder基础
15、Android IPC 之Binder应用
16、Android IPC 之AIDL应用(上)
17、Android IPC 之AIDL应用(下)
18、Android IPC 之Messenger 原理及应用
19、Android IPC 之获取服务(IBinder)
20、Android 存储基础
21、Android 10、11 存储完全适配(上)
22、Android 10、11 存储完全适配(下)
23、Java 并发系列不再疑惑

上一篇下一篇

猜你喜欢

热点阅读