Java并发编程解析 | 解析AQS基础同步器的设计与实现

2022-09-08  本文已影响0人  朝槿木兮

关健术语

Picture-Keyword

本文用到的一些关键词语以及常用术语,主要如下:

基本概述

在Java领域中,我们可以将锁大致分为基于Java语法层面(关键词)实现的锁和基于JDK层面实现的锁。

在Java领域中, 尤其是在并发编程领域,对于多线程并发执行一直有两大核心问题:同步和互斥。其中:

针对对于这两大核心问题,利用管程是能够解决和实现的,因此可以说,管程是并发编程的万能钥匙。

虽然,Java在基于语法层面(synchronized 关键字)实现了对管程技术,但是从使用方式和性能上来说,内置锁(synchronized 关键字)的粒度相对过大,不支持超时和中断等问题。

为了弥补这些问题,从JDK层面对其“重复造轮子”,在JDK内部对其重新设计和定义,甚至实现了新的特性。

在Java领域中,从JDK源码分析来看,基于JDK层面实现的锁大致主要可以分为以下4种方式:

从阅读源码不难发现,在Java SDK 并发包主要通过AbstractQueuedSynchronizer(AQS)实现多线程同步机制的封装与定义,而通过Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。

一. 基本理论

在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。

在操作系统中,一般有如果I/O操作时,对于阻塞和非阻塞是从函数调用角度来说的,其中:

而同步和异步则是从“读写是主要是由谁完成”的角度来说的,其中:

其中,信号量机制(Semaphores)是用来解决同步/互斥的问题的,但是信号量(Semaphore)的操作分散在各个进程或线程中,不方便进行管理,因每次需调用P/V(来自荷兰语 proberen和 verhogen)操作,还可能导致死锁或破坏互斥请求的问题。

由于PV操作对于解决进程互斥/同步编程复杂,因而在此基础上提出了与信号量等价的——“管程技术”。

其中,管程(Monitor)当中定义了共享数据结构只能被管程内部定义的函数所修改,所以如果我们想修改管程内部的共享数据结构的话,只能调用管程内部提供的函数来间接的修改这些数据结构。

一般来说,管程(Monitor)和信号量(Semaphore)是等价的,所谓等价指的是用管程能够实现信号量,也能用信号量实现管程。

在管程的发展历程上,先后出现过Hasen模型、Hoare模型和MESA模型等三种不同的管程模型,现在正在广泛使用的是MESA模型。

在MESA模型中,管程中引入了条件变量(Conditional Variable)的概念,而且每个条件变量都对应有一个等待队列(Wait Queue)。其中,条件变量和等待队列的作用是解决线程之间的同步问题。

而对于解决线程之间的互斥问题,将共享变量(Shared Variable)及其对共享变量的操作统一封装起来,一般主要是实现一个线程安全的阻塞队列(Blocking Queue),将线程不安全的队列封装起来,对外提供线程安全的操作方法,例如入队操作(Enqueue)和出队操作(Dequeue)。

在Java领域中,对于Java语法层面实现的锁(synchronized 关键字), 其实就是参考了 MESA 模型,并且对 MESA 模型进行了精简,一般在MESA 模型中,条件变量可以有多个,Java 语言内置的管程(synchronized)里只有一个条件变量。

这就意味着,被synchronized 关键字修饰的代码块或者直接标记静态方法以及实例方法,在编译期会自动生成相关加锁(lock)和解锁(unlock)的代码,即就是monitorenter和monitorexit指令。

对于synchronized 关键字来说,主要是在Java HotSpot(TM) VM 虚拟机通过Monitor(监视器)来实现monitorenter和monitorexit指令的。

同时,在Java HotSpot(TM) VM 虚拟机中,每个对象都会有一个监视器,监视器和对象一起创建、销毁。

监视器相当于一个用来监视这些线程进入的特殊房间,其义务是保证(同一时间)只有一个线程可以访问被保护的临界区代码块。

本质上,监视器是一种同步工具,也可以说是JVM对管程的同步机制的封装实现,主要特点是:

在Hotspot虚拟机中,监视器是由C++类ObjectMonitor实现的,ObjectMonitor类定义在ObjectMonitor.hpp文件中,其中:

v7MaNT.png

同时,管程与Java中面向对象原则(Object Oriented Principle)也是非常契合的,主要体现在 java.lang.Object类中wait()、notify()、notifyAll() 这三个方法,其中:

不难发现,在Java中synchronized 关键字及 java.lang.Object类中wait()、notify()、notifyAll() 这三个方法都是管程的组成部分。

由此可见,我们可以得到一个比较通用的并发同步工具基础模型,大致包含如下几个内容,其中:

综上所述,条件变量和等待队列的作用是解决线程之间的同步问题;共享变量与阻塞队列的作用是解决线程之间的互斥问题。

二.AQS基础同步器的设计与实现

在Java领域中,同步器是专门为多线程并发设计的同步机制,主要是多线程并发执行时线程之间通过某种共享状态来实现同步,只有当状态满足这种条件时线程才往下执行的一种同步机制。

对于多线程实现实现并发处理机制来说,一直以来,多线程都存在2个问题:

因此,在并发编程领域中,一直有一个很重要的设计原则: “ 不要通过内存共享来实现通信,而应该通过通信来实现内存共享。”

简单来说,就是尽可能通过消息通信,而不是内存共享来实现进程或者线程之间的同步。

其中,同步器是专门为多线程并发设计的同步机制,主要是多线程并发执行时线程之间通过某种共享状态来实现同步,只有当状态满足这种条件时线程才往下执行的一种同步机制。

由于在不同的应用场景中,对于同步器的需求也会有所不同,一般在我们自己去实现和设计一种并发工具的时候,都需会考虑以下几个问题:

从阅读JDK源码不难发现,主要是采用设计模式中模板模式的原则,JDK将各种同步器中相同的部分抽象封装成了一个统一的基础同步器,然后基于基础同步器为模板通过继承的方式来实现不同的同步器。

也就是说,在实际开发过程中,除了直接使用JDK实现的同步器,还可以基于这个基础同步器我们也可以自己自定义实现符合我们业务需求的同步器。

在JDK源码中,同步器位于java.util.concurrent.locks包下,其基本定义是AbstractQueuedSynchronizer类,即就是我们常说的AQS同步器。

1. 设计思想

一个标准的AQS同步器主要有同步状态机制,等待队列,条件队列,独占模式,共享模式等五大核心要素组成。

JDK的JUC(java.util.concurrent.)包中提供了各种并发工具,但是大部分同步工具的实现基于AbstractQueuedSynchronizer类实现,其内部结构主要如下:

其中,对于AbstractQueuedSynchronizer类的实现原理,我们可以从如下几个方面来看:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691 L;


    protected AbstractQueuedSynchronizer() {}

    /**
     * 等待队列: head-头节点
     */
    private transient volatile Node head;

    /**
     * 等待队列: tail-尾节点
     */
    private transient volatile Node tail;

    /**
     * 同步状态:32位整数类型,更新同步状态(state)时必须保证其是原子性的
     */
    private volatile int state;

    /**
     * 自旋锁消耗超时时间阀值(threshold): threshold < 1000ns时,表示竞争时选择自旋;threshold > 1000ns时,表示竞争时选择系统阻塞
     */
    static final long spinForTimeoutThreshold = 1000 L;

    /**
     * CAS原子性操作
     */
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    /**
     * stateOffset
     */
    private static final long stateOffset;

    /**
     * headOffset
     */
    private static final long headOffset;

    /**
     * tailOffset
     */
    private static final long tailOffset;

    /**
     * waitStatusOffset
     */
    private static final long waitStatusOffset;

    /**
     * nextOffset
     */
    private static final long nextOffset;


    static {
        try {
            stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));

        } catch (Exception ex) {
            throw new Error(ex);
        }
    }
        
            private final boolean compareAndSetHead(Node update)  {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
        
            private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
        
            private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }
        
            private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }
        
            protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

}

[1]. AbstractQueuedSynchronizer类的实现原理是继承了基于AbstractOwnableSynchronizer类的抽象类,其中主要对AQS同步器的通用特性和方法进行抽象封装定义,主要包括如下方法:

public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {


    private static final long serialVersionUID = 3737899427754241961 L;


    protected AbstractOwnableSynchronizer() {}

    /**
     *  同步器拥有者
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * 设置同步器拥有者:把线程当作参数传入,指定某个线程为独享
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    /**
     * 获取同步器拥有者:获取指定的某个线程
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

[2]. 对于同步状态(state),其类型是32位整数类型,并且是被volatile修饰的,表示在更新同步状态(state)时必须保证其是原子性的。

[3]. 对于等待队列的结构,主要是在Node定义了head和tail变量,其中head表示头部节点,tail表示尾部节点

[4].对于等待队列的结构提到的Node类来说,主要内容如下:

  static final class Node {
      /** Marker to indicate a node is waiting in shared mode */
      static final Node SHARED = new Node();
            
      /** Marker to indicate a node is waiting in exclusive mode */
      static final Node EXCLUSIVE = null;

      /** waitStatus value to indicate thread has cancelled */
      static final int CANCELLED = 1;
            
      /** waitStatus value to indicate successor's thread needs unparking */
      static final int SIGNAL = -1;
            
      /** waitStatus value to indicate thread is waiting on condition */
      static final int CONDITION = -2;
            
      /**
       * waitStatus value to indicate the next acquireShared should
       * unconditionally propagate
       */
      static final int PROPAGATE = -3;

      /**
       * Status field, taking on only the values:
       *   SIGNAL:     The successor of this node is (or will soon be)
       *               blocked (via park), so the current node must
       *               unpark its successor when it releases or
       *               cancels. To avoid races, acquire methods must
       *               first indicate they need a signal,
       *               then retry the atomic acquire, and then,
       *               on failure, block.
       *   CANCELLED:  This node is cancelled due to timeout or interrupt.
       *               Nodes never leave this state. In particular,
       *               a thread with cancelled node never again blocks.
       *   CONDITION:  This node is currently on a condition queue.
       *               It will not be used as a sync queue node
       *               until transferred, at which time the status
       *               will be set to 0. (Use of this value here has
       *               nothing to do with the other uses of the
       *               field, but simplifies mechanics.)
       *   PROPAGATE:  A releaseShared should be propagated to other
       *               nodes. This is set (for head node only) in
       *               doReleaseShared to ensure propagation
       *               continues, even if other operations have
       *               since intervened.
       *   0:          None of the above
       *
       *
       * The field is initialized to 0 for normal sync nodes, and
       * CONDITION for condition nodes.  It is modified using CAS
       * (or when possible, unconditional volatile writes).
       */
      volatile int waitStatus;

      /**
       * Link to predecessor node that current node/thread relies on
       */
      volatile Node prev;

      /**
       * Link to the successor node that the current node/thread
       */
      volatile Node next;

      /**
       * The thread that enqueued this node.  Initialized on
       * construction and nulled out after use.
       */
      volatile Thread thread;

      /**
       * Link to next node waiting on condition, or the special
       */
      Node nextWaiter;

      /**
       * Returns true if node is waiting in shared mode.
       */
      final boolean isShared() {
          return nextWaiter == SHARED;
      }


      final Node predecessor() throws NullPointerException {
          Node p = prev;
          if (p == null)
              throw new NullPointerException();
          else
              return p;
      }

      Node() { // Used to establish initial head or SHARED marker
      }

      Node(Thread thread, Node mode) { // Used by addWaiter
          this.nextWaiter = mode;
          this.thread = thread;
      }

      Node(Thread thread, int waitStatus) { // Used by Condition
          this.waitStatus = waitStatus;
          this.thread = thread;
      }
  }

[5].对于自旋锁消耗超时时间阀值(spinForTimeoutThreshold),主要表示系统依据这个阀值来选择自旋方式还是系统阻塞。一般假设这个threshold,当 threshold < 1000ns时,表示竞争时选择自旋;否则,当threshold > 1000ns时,表示竞争时选择系统阻塞

[6].对于带有Offset 等变量对应各自的句柄,主要用于执行CAS操作。在JDK1.8版本之前,CAS操作主要通过Unsafe类来说实现;在JDK1.8版本之后,已经开始利用VarHandle来替代Unsafe类操作实现。

[7].对于CAS操作来说,主要提供了如下几个方法:

[8].除此之外,还包括许多辅助的操作方法,具体可参考源码分析。

2. 基本实现

一个标准的AQS同步器最核心底层设计实现是一个非阻塞的CHL Node FIFO(先进先出)队列数据结构,通过采用自旋锁+CAS操作的方法来保证原子性操作。

总的来说,一个AQS基础同步器,底层的数据结构采用的是一个非阻塞的CHL Node FIFO(先进先出)队列数据结构,而实现的核心算法则是采用自旋锁+CAS操作的方法。

首先,对于非阻塞的CHL Node FIFO(先进先出)队列数据结构,一般来说,FIFO(First In First Out,先进先出)队列是一个有序列表,属于抽象型数据类型(Abstract Data Type,ADT),所有的插入和删除操作都发生在队首(Front)和队尾(Rear)两端,具有先进先出的特性。


    /**
     * 等待队列: head-头节点
     */
    private transient volatile Node head;

    /**
     * 等待队列: tail-尾节点
     */
    private transient volatile Node tail;
        
    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

在AQS同步器的源码中,主要是通过静态内部类Node来实现的这个非阻塞的CHL Node FIFO(先进先出)队列数据结构, 维护了两个变量head和tail,其中head对应队首(Front),tail对应队尾(Rear)。同时,还定义了addWaiter(Node mode)方法来表示入队操作,其中有个enq(final Node node)方法,主要用于初始化队列中head和tail的设置。

其次,AQS同步器以CLH锁为基础,其中CLH锁是一种自旋锁,对于自旋锁的实现方式来看,主要可以分为普通自旋锁和自适应自旋锁,CLH锁和MCS锁等4种,其中:

自旋锁是一种实现同步的方案,属于一种非阻塞锁,与常规锁主要的区别就在于获取锁失败之后的处理方式不同,主要体现在:

从本质上讲,自旋是一钟忙等待状态,会一直消耗CPU的执行时间。一般情况下,常规互斥锁适用于持有锁长时间的情况,自旋锁适合持有时间短的情况。

其中,对于CLH锁来说,其核心是为解决同步带来的花销问题,Craig,Landim,Hagersten三人发明了CLH锁,其中主要是:

CLH锁将众多线程长时间对资源的竞争,通过有序化这些线程将其转化为只需要对本地变量检测。唯一存在竞争的地方就是入队之前对尾部节点tail 的竞争,相对来说,当前线程对资源的竞争次数减少,这节省了CPU缓存同步的消耗,从而提升了系统性能。

但是同时也有一个问题,CLH锁虽然解决了大量线程同时操作同一个变量时带来的开销问题,如果前驱节点和当前节点在本地主存中不存在,则访问时间过长,也会引起性能问题。

为了让CLH锁更容易实现取消和超时的功能,AQS同步器在设计时进行了改造,主要体现在:节点的结构和节点等待机制。其中:

由此可见,主要是通过前驱节点和后继节点的引用连接起来形成一个链表队列,其中对于入队,检测节点,出队,判断超时,取消节点等操作主要如下:

最后,AQS同步器中使用了CAS操作,其中CAS(Compare And Swap,比较并交换)操作时一种乐观锁策略,主要涉及三个操作数据:内存值,预期值,新值,主要是指当且仅当预期值和内存值相等时才去修改内存值为新值。

一般来说,CAS操作的具体逻辑,主要可以分为三个步骤:

除此之外,需要注意的是CAS操作具有原子性,主要是由CPU硬件指令来保证,并且通过Java本地接口(Java Native Interface,JNI)调用本地硬件指令实现。

当然,CAS操作避免了悲观策略独占对象的 问题,同时提高了并发性能,但是也有以下三个问题:

而在AQS同步器中,为了保证并发实现保证原子性,而且是硬件级别的原子性,一般是通过JNI(Java native interface,Java 本地接口)方式让Java代码调用C/C++本地代码。

需要注意的是,在Java领域中,对于CAS操作实现,主要有亮点问题:

版权声明:本文为博主原创文章,遵循相关版权协议,如若转载或者分享请附上原文出处链接和链接来源。

上一篇 下一篇

猜你喜欢

热点阅读