从多线程到分布式

从多线程到分布式(三)如何实现锁

2023-08-10  本文已影响0人  吟游雪人

为了实现一把具有阻塞或唤醒功能的锁,需要解决几个核心问题:
1.需要底层支持对一个线程进行阻塞或唤醒操作。
2.需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列

  1. 需要一个state变量,标记该锁的状态。state变量至少有两个值:0、1。对state变量的操作,要确保线程安全

问题1:
在Unsafe类中,提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark
有个常用的LockSupport,对这个操作做了简单的封装

在当前线程中调用park(),该线程就会被阻塞;在另外一个线程中,调用unpark(Thread t),传入一个被阻塞的线程,就可以唤醒阻塞在park()地方的线程。尤其是unpark(Thread t),它实现了一个线程对另外一个线程的“精准唤醒”。前面讲到的wait()/notify(),notify也只是唤醒某一个线程,但无法指定具体唤醒哪个线程。

问题2:如何设计队列
当条件不满足的时候,我们需要队列去保存当前的执行线程,并在适当的时机唤醒。这个队列也必须是线程安全的无锁队列,这时候又引入了新的问题,这个队列应该如何设计,才能又安全又快。所以这时候CAS作为一个通用的优化手段是必不可少的。
CAS配合自旋形成的自旋锁,特点就是抢不到锁的线程会一直处于“亚工作”状态,一直消耗cpu判断是否满足条件获取锁。一直会消耗cpu资源,速度快是其特点。

说起自旋锁队列,就要提起CLH队列(CLH是发明此队列的三人名字的首字母),以及此队列的变种AQS队列。

CLH队列
既然是队列,就是FIFO结构,新的线程会从尾部加入队列。这时候尾部的引用就是一个修改点,在多线程中,修改点需要保证自身的线程安全性,这里通过原子引用来保证修改尾节点的线程安全性。当有新节点加入时,尾节点引用就会指向这个新加入的节点,并将原本尾节点作为当前进加入节点的前驱。
那么剩下的问题就是如何知道自己有资格去再去获取锁了呢,那就是自己的前面一个节点已经有了改变,也就是通过不断的自选检测前驱节点。
通过上述描述,可以看出CLH是个公平锁,只会通知队列中的第一个线程去竞争锁,好处就是避免同时唤醒大量线程同时竞争,消耗cpu资源。并且空间复杂度低。
缺点就是每一个线程都是一个自旋锁,消耗比较多的cpu资源。

AQS队列
AQS是CLH队列的一个变种。AQS队列内部维护的是一个FIFO的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的前驱节点和直接的后继节点。所以双向链表可以从任意一个节点开始很方便地访问前驱节点和后继节点。每个节点其实是由线程封装的,当线程争抢锁失败后会封装成节点加入AQS队列中;当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。

     static final class Node {
         /**节点等待状态值1:取消状态*/
         static final int CANCELLED =  1;

         /**节点等待状态值-1:标识后继线程处于等待状态*/
         static final int SIGNAL    = -1;

   
         /**节点等待状态值-2:标识当前线程正在进行条件等待*/
         static final int CONDITION = -2;

    
         /**节点等待状态值-3:标识下一次共享锁的acquireShared操作需要无条件传播*/
         static final int PROPAGATE = -3;

         //节点状态:值为SIGNAL、CANCELLED、CONDITION、PROPAGATE、0 
         //普通的同步节点的初始值为0,条件等待节点的初始值为CONDITION(-2)

         volatile int waitStatus;
         //节点所对应的线程,为抢锁线程或者条件等待线程

         volatile Thread thread;
         //前驱节点,当前节点会在前驱节点上自旋,循环检查前驱节点的waitStatus状态

         volatile Node prev;

    

         //后继节点

         volatile Node next;

         //若当前Node不是普通节点而是条件等待节点,则节点处于某个条件的等待队列上

         //此属性指向下一个条件等待节点,即其条件队列上的后继节点

         Node nextWaiter;

          ...

     }

AQS定义了两种资源共享方式:
·Exclusive(独享锁):只有一个线程能占有锁资源,如ReentrantLock。独享锁又可分为公平锁和非公平锁。
·Share(共享锁):多个线程可同时占有锁资源,如Semaphore、CountDownLatch、CyclicBarrier、ReadWriteLock的Read锁。
AQS为不同的资源共享方式提供了不同的模板流程,包括共享锁、独享锁模板流程。这些模板流程完成了具体线程进出等待队列的(如获取资源失败入队/唤醒出队等)基础、通用逻辑。

需要覆盖的方法
·tryAcquire(int):独占锁钩子,尝试获取资源,若成功则返回true,若失败则返回false。
·tryRelease(int):独占锁钩子,尝试释放资源,若成功则返回true,若失败则返回false。
·tryAcquireShared(int):共享锁钩子,尝试获取资源,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
·tryReleaseShared(int):共享锁钩子,尝试释放资源,若成功则返回true,若失败则返回false。
·isHeldExclusively():独占锁钩子,判断该线程是否正在独占资源。只有用到condition条件队列时才需要去实现它。

关键属性state:
• getState()方法、setState()方法和compareAndSetState()方法:getState()方法主要用于获取当前AQS队列的state属性值;setState()方法主要用于为AQS队列设置新的state属性值。由于AQS中的state变量为volatile变量,因此可以保证操作的可见性和有序性;compareAndSetState()方法采用CAS技术,在保证了操作原子性的前提下,可以在对比成功后为state属性设置新值。例如,compareAndSetState(0, 1)表示如果当前AQS队列的state属性值为0,则设置state属性值为1并返回true。

state取值不仅可以是0、1,还可以大于1,就是为了支持锁的可重入性。例如,同样一个线程,调用5次lock,state会变成5;然后调用5次unlock,state减为0。当state=0时,没有线程持有锁,exclusiveOwnerThread=null;当state=1时,有一个线程持有锁,exclusiveOwnerThread=该线程;当state>1时,说明该线程重入了该锁。

AQS技术内部的CLH队列,为AQS控制的多个线程建立同步机制打下了基础。AQS通过对内部CLH队列的控制,可以知晓哪些线程参与了资源独占操作权抢占工作,它们的申请状态如何,优先顺序如何。程序员可以根据在AQS不同的工作模式下的逻辑差异,实现自己的线程控制方式。注意AQS中的state属性。为了保证参与AQS控制的多个线程能正确地同步信息,AQS中内置了一个保证可见性、有序性和原子性的state属性。在通常情况下,这些线程可以通过改变state属性的值,通知其他线程关键的同步信息。
state属性不会参与CLH队列的控制过程,它只专注于向相关线程表达业务意义,也就是说,state属性在不同场景中可以表示不同的业务含义。例如,在基于AQS技术实现的ReentrantLock类的工作逻辑中,state属性的值表示当前获得资源独占操作权的线程累计成功加锁的次数,相当于一个计数器。

通过CAS入队addWaiter方法


    private Node addWaiter(Node mode) {

            //创建新节点

          Node node = new Node(Thread.currentThread(), mode);

            // 加入队列尾部,将目前的队列tail作为自己的前驱节点pred

            Node pred = tail;

            // 队列不为空的时候

            if (pred != null) {

                node.prev = pred;

                // 先尝试通过AQS方式修改尾节点为最新的节点

                // 如果修改成功,将节点加入队列的尾部

                if (compareAndSetTail(pred, node)) {

                     pred.next = node;

                    return node;

                }

            }

            //第一次尝试添加尾部失败,意味着有并发抢锁发生,需要进行自旋

            enq(node);

            return node;

        }

如果发生竞争,则通过CAS的方式入队列。enq()方法通过CAS自旋将节点添加到队列尾部。

AQS队列和Condition队列的关系
AQS队列头部节点表示该节点的线程处于占有锁的线程,可以执行代码。但是执行条件不一定满足,可能再次进入Condition等待队列。
所以一个带有Condition的锁,至少有2个队列。也就是AQS一条,然后一个Condition一条。下图是一个带有2个Condition的锁的图:


image.png

当临界区内的线程发现条件不满足,准备阻塞等待调用await时,说明当前线程的节点为当前AQS队列的头节点,正好处于占有锁的状态,await()方法需要把该线程从AQS队列挪到Condition等待队列里


image.png

在await()方法将当前线程挪动到Condition等待队列后,还会唤醒AQS同步队列中head节点的下一个节点。
await()方法的整体流程如下:
(1)执行await()时,会新创建一个节点并放入Condition队列尾部。
(2)然后释放锁,并唤醒AQS同步队列中的头节点的后一个节点。
(3)然后执行while循环,将该节点的线程阻塞(LockSupport.park),直到该节点离开等待队列,重新回到同步队列成为同步节点后,线程才退出while循环。
(4)退出循环后,开始调用acquireQueued()不断尝试拿锁。
(5)拿到锁后,会清空Condition队列中被取消的节点。

上源码:

     public final void await() throws InterruptedException {

                 if (Thread.interrupted())

                     throw new InterruptedException();

                 Node node = addConditionWaiter();          // step 1

                 int savedState = fullyRelease(node);       // step 2

                 int interruptMode = 0;

                 while (!isOnSyncQueue(node)) {             // step 3

                     LockSupport.park(this);

                     if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

                         break;

                 }

                 if (acquireQueued(node, savedState)        // step 4

                                         && interruptMode != THROW_IE)

                     interruptMode = REINTERRUPT;

                 if (node.nextWaiter != null)                       //step 5

                     unlinkCancelledWaiters();

                 if (interruptMode != 0)

                     reportInterruptAfterWait(interruptMode);

             }

那么当Condition的条件满足,调用signal方法后,就是从Condition队列返回AQS队列的一个过程。线程在某个ConditionObject对象上调用signal()方法后,等待队列中的firstWaiter会被加入同步队列中,等待节点被唤醒


image.png

signal()方法的整体流程如下:
(1)通过enq()方法自旋(该方法已经介绍过)将条件队列中的头节点放入AQS同步队列尾部,并获取它在AQS队列中的前驱节点。
(2)如果前驱节点的状态是取消状态,或者设置前驱节点为Signal状态失败,就唤醒当前节点的线程;否则节点在同步队列的尾部,参与排队。(再次排队)
(3)同步队列中的线程被唤醒后,表示重新获取了显式锁,然后继续执行condition.await()语句后面的临界区代码。

剩下的就是响应中断的能力
也就是底层的park方式可以响应中断。我觉得对于实现锁是非核心功能,这里就不细说了。(使用锁的时候,中断还是很重要的了)

能够被中断的阻塞称为轻量级阻塞,对应的线程状态是WAITING或者TIMED_WAITING;而像synchronized 这种不能被中断的阻塞称为重量级阻塞,对应的状态是BLOCKED。
初始线程处于NEW状态,调用start()之后开始执行,进入RUNNING或者READY状态。如果没有调用任何的阻塞函数,线程只会在RUNNING和READY之间切换,也就是系统的时间片调度。这两种状态的切换是操作系统完成的,开发者基本没有机会介入,除了可以调用yield()函数,放弃对CPU的占用。
一旦调用了图中的任何阻塞函数,线程就会进入WAITING或者TIMED_WAITING状态,两者的区别只是前者为无限期阻塞,后者则传入了一个时间参数,阻塞一个有限的时间。如果使用了synchronized关键字或者synchronized块,则会进入BLOCKED状态。

故而t.interrupted()的精确含义是“唤醒轻量级阻塞”,而不是字面意思“中断一个线程”。
因为t.interrupted()相当于给线程发送了一个唤醒的信号,所以如果线程此时恰好处于WAITING或者TIMED_WAITING状态,就会抛出一个InterruptedException,并且线程被唤醒。而如果线程此时并没有被阻塞,则线程什么都不会做。


image.png

结论:
1.给我一个阻塞线程,一个CAS的方法,我就能实现一个锁(这两个方法都是操作系统级别提供的)
2.合理的使用标志state(位标记的方式),我就能实现独占和共享锁
3.AQS是一个很好的模板,给与了自制锁很好的通用模板基础和核心功能。

那么有没有不是基于AQS的锁呢?下一章说

上一篇下一篇

猜你喜欢

热点阅读