一些收藏

知识回顾|并发|AQS

2023-01-31  本文已影响0人  三更冷

前提

参考网上文章,本文针对以下几个问题作出回答,文内可能有遗漏、错误或表达不够清晰的地方(基于JDK8):

① AQS是什么?其内部实现?
② AQS是如何唤醒下一个线程的?
③ 依靠AQS的同步类比如ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等的与AQS的关联?
④ ReentrantLock如何实现公平和非公平锁?
⑤ CountDownLatch和CyclicBarrier的区别?各自适用于什么场景?

① AQS是什么?其内部实现?

AQS 是 JUC 包中用于构建锁或者其他同步组件(信号量、事件等)的基础类,是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架 [^1] 。下面基于 Doug Lea 所著论文中 [^2] ,关于需求、设计、实现的描述来讨论 AQS 内部实现。

背景和需求

那么,设计一个同步框架要满足哪些通用的需求和机制?

功能上

  1. 阻塞:阻塞线程直到同步状态允许它继续执行(acquire方法)
  2. 唤醒:释放正在阻塞的线程,修改同步状态,使得一或多个被acquire阻塞的线程继续执行(release方法)
  3. 非阻塞: 非阻塞同步(tryLock方法)
  4. 超时设置:携带超时时间,让调用者可以放弃等待(tryAcquireNanos方法)
  5. 支持中断:线程等不及资源了从等待队列退出,通过中断实现任务取消(acquireInterruptibly方法)
  6. 独占/共享:根据资源是否可以被同时访问定义了两种资源共享方式(acquireShared方法)
    1)Exclusive(独占),同一时间只有一个线程可以通过阻塞点(ReentrantLock)
    2)Share(共享),多个线程可以同时执行(Semaphore/CountDownLatch )

性能目标

主要的性能目标是可伸缩性,即在大部分情况下,即使,或特别在同步器有竞争的情况下,稳定地保证其效率。在理想的情况下,不管有多少线程正试图通过同步点,通过同步点的开销都应该是个常量。

设计上

实现同步器的伪代码如下:

为了实现上述操作,需要下面三个基本组件的相互协作:

实现

AQS针对上述三个功能的具体实现:

  1. 使用单个 int(32位)来保存同步状态 (资源的可用状态),并暴露出getState、setState以及compareAndSet操作来读取和更新这个状态。AbstractQueuedLongSynchronizer类使用了64位 long 字段的原子性操作。

  2. 阻塞
    j.u.c包 LockSupport 类来作为线程阻塞和唤醒的工具。方法LockSupport.park阻塞当前线程除非/直到有个LockSupport.unpark 方法被调用(unpark方法被提前调用也是可以的)

  3. 两种队列(都基于Node内部类)
    1)同步等待队列,是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。
    2)条件等待队列,不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。

同步等待队列节点类state

// 取消状态     static final int CANCELLED =  1;
// 唤醒状态     static final int SIGNAL    = -1;
// 条件等待状态  static final int CONDITION = -2;
// 传播状态     static final int PROPAGATE = -3;
// 等待状态,初始值为0,其他可选值是上面的4个值 volatile int waitStatus;

同步等待队列

AQS 类的 protected 修饰的构造函数里面有一大段注释用于说明AQS实现的等待队列的细节事项,这里列举几点重要的[^4]:


假设使用Node.EXCLUSIVE模式把新增的等待线程加入队列,例如有三个线程分别是thread-1、thread-2和thread-3,线程入队的时候都处于阻塞状态,新入队的线程总是添加到队列的tail节点,阻塞的线程总是”争抢”着成为head节点。

     +------+  prev +-----+       +-----+       +-----+
head |      | <---- |     | <---- |     | <---- |     |  tail
     +------+ ----> +-----+ ----> +-----+ ----> +-----+
               next thread1       thread2       thread3

条件等待队列

理解条件等待队列,要先理解java.util.concurrent.locks.Condition接口,Condition可以理解为Object中的wait()、notify()和notifyAll()的替代品。

Condition的实现类是AQS的公有内部类ConditionObject,使用 condition 时必须先持有相应的锁,每个 ReentrantLock 实例可以通过调用多次 newCondition 产生多个 ConditionObject 的实例。

条件队列[^5]

② AQS是如何唤醒下一个线程的?

  1. 以 ReentrantLock 为例,获取锁失败的线程,会向 CLH 队列中添加一个独占模式的节点到队尾,然后挂起等待唤醒(通过LocksSuport#park挂起,挂起前会修改前驱节点状态为 Node.SIGNAL(-1))。
  2. 锁释放,从队尾往前找,找到 waitStatus<=0 的所有节点中排在最前面的一个,waitStatus 为 SIGNAL 代表后继节点需要被唤醒(通过 LocksSuport#unpark 唤醒线程)。
  3. 线程被唤醒后检查当前线程的中断状态并置 false ,如果发现该线程被中断过,就再中断一次,再然后再次获取锁。
// 中断标志位的修改
Thread.interrupted()                                    true ->false 
Thread.currentThread().interrupt()                      false -> true

代码实例:

public class ReentrantLockTest {

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            tryAcquire();
        }, "thread-0").start();

        Thread.sleep(1000);
        new Thread(() -> {
            tryAcquire();
        }, "thread-1").start();

        Thread.sleep(10000);
        new Thread(() -> {
            tryAcquire();
        }, "thread-2").start();

        Thread.sleep(10000);
        new Thread(() -> {
            tryAcquire();
        }, "thread-3").start();
    }

    private static ReentrantLock lock = new ReentrantLock(true);

    private static void tryAcquire(){
        lock.lock();
        try {
            try {
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }finally {
            lock.unlock();
        }
    }

}

及图示(重点关注 waitStatus 状态改变),其中初始化时头尾节点是同一个节点,获取锁失败的线程进入到同步队列的队尾:

③ 依靠AQS的同步类比如ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等的实现

④ ReentrantLock如何实现公平和非公平锁?

ReentrantLock lock = new ReentrantLock(false);//false为非公平锁,true为公平锁

公平锁:在线程尝试获取公平锁时,先去判断队列中是否有前驱节点,如果没有前驱节点,尝试去获取一次锁,如果获取失败,把线程封装成节点放入队列中,然后该线程会进行最多3次自旋,判断线程是否在队列首部,如果在,尝试获取锁。在自旋次数耗尽后,线程仍未获取到锁,则线程被挂起。

非公平锁:在线程尝试获取非公平锁时,直接尝试获取锁,如果没有获取到,就把该线程封装成节点放到队列中,然后该线程会进行最多3次自旋,判断线程是否在队列首部,如果在,尝试获取锁。如果在自旋次数耗尽后,线程仍未获取到锁,则线程被挂起。
公平锁会比非公平锁多一个对队列中是否有前驱节点的判断。[^6]

⑤ CountDownLatch和CyclicBarrier的区别?各自适用于什么场景?

这两个类都可以实现一组线程在到达某个条件之前进行等待,它们内部都有一个计数器,当计数器的值不断的减为0的时候所有阻塞的线程将会被唤醒。
有区别的是CyclicBarrier的计数器由自己控制,而CountDownLatch的计数器则由使用者来控制,在CyclicBarrier中线程调用await方法不仅会将自己阻塞还会将计数器减1,而在CountDownLatch中线程调用await方法只是将自己阻塞而不会减少计数器的值。另外,CountDownLatch只能拦截一轮,而CyclicBarrier可以实现循环拦截。一般来说用CyclicBarrier可以实现CountDownLatch的功能,而反之则不能。[^7]

参考文章

[1] https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
[2] https://gee.cs.oswego.edu/dl/papers/aqs.pdf
[3] https://www.cnblogs.com/dennyzhangdd/p/7218510.html
[4] https://www.cnblogs.com/throwable/p/13369717.html
[5] https://javadoop.com/post/AbstractQueuedSynchronizer-2
[6]https://blog.csdn.net/m0_51958329/article/details/124323234
[7] https://blog.csdn.net/qq_39241239/article/details/87030142

上一篇下一篇

猜你喜欢

热点阅读