Android开发Android开发Android技术知识

Java并发之 AQS 深入解析(上)

2021-10-19  本文已影响0人  小鱼人爱编程

前言

线程并发系列文章:

Java 线程基础
Java 线程状态
Java “优雅”地中断线程-实践篇
Java “优雅”地中断线程-原理篇
真正理解Java Volatile的妙用
Java ThreadLocal你之前了解的可能有误
Java Unsafe/CAS/LockSupport 应用与原理
Java 并发"锁"的本质(一步步实现锁)
Java Synchronized实现互斥之应用与源码初探
Java 对象头分析与使用(Synchronized相关)
Java Synchronized 偏向锁/轻量级锁/重量级锁的演变过程
Java Synchronized 重量级锁原理深入剖析上(互斥篇)
Java Synchronized 重量级锁原理深入剖析下(同步篇)
Java并发之 AQS 深入解析(上)
Java并发之 AQS 深入解析(下)
Java Thread.sleep/Thread.join/Thread.yield/Object.wait/Condition.await 详解
Java 并发之 ReentrantLock 深入分析(与Synchronized区别)
Java 并发之 ReentrantReadWriteLock 深入分析
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(原理篇)
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(应用篇)
最详细的图文解析Java各种锁(终极篇)
线程池必懂系列

前面几篇分析了synchronized 原理及其使用,synchronized 是JVM实现的,核心代码是C++,对于不熟悉C++语言的读者可能有点难度。JUC 包下提供了新的同步框架:AQS,是纯JAVA代码实现的。如果你了解了synchronized 核心,那么AQS不在话下,若是不了解,本篇将一起从头到尾深入分析AQS。
通过本篇文章,你将了解到:

1、如何实现自己的同步框架
2、AQS 功能解析
3、AQS 独占锁实现
4、AQS 共享锁实现
5、场景模拟与疑难解答

1、如何实现自己的同步框架

准备关键数据结构

第一
得需要共享变量作为"锁芯"。由于这个共享变量是多线程共享,为保证线程间的可见性,因此需要用volatile关键字修饰。

第二
当线程竞争锁成功时则进入临界区执行代码,当失败时需要加入到队列里进行等待,因此需要一个同步队列,用以存放因获取锁失败而挂起的线程。

第三
线程之间需要同步,A线程等待B线程生产数据,B线程生产了数据通知A线程,因此需要一个等待(条件)队列。

核心操作

数据结构准备好之后,需要操作以上数据结构来实现锁功能。
线程竞争锁:通过CAS操作"锁芯",操作成功则执行临界区代码,失败则加入到同步队列。
线程被唤醒:拿到锁的线程执行完临界区代码后释放锁,并唤醒同步队列里等待的线程,被唤醒的线程继续竞争锁。
线程等待某个条件:线程因为某种条件不满足于是加入到等待队列里,释放锁,并挂起等待。
条件满足线程被唤醒:条件满足,线程被唤醒后继续竞争锁。

以上步骤是实现锁功能的核心,不论是synchronized还是AQS,基础功能都是以上步骤,只是他们还拥有更丰富的功能,如可中断(AQS),可重入、可独占可共享(AQS)等。
自己实现锁的实践请移步:Java 并发"锁"的本质(一步步实现锁)

2、AQS 功能解析

AQS是AbstractQueuedSynchronizer的简称,顾名思义:同步器。它是JUC下的同步框架,也是实现锁的核心类。
AQS是抽象类,提供了基础的方法,需要子类实现具体的获取锁、释放锁操作。


image.png

如上图所示,AQS 实现了独占锁/共享锁、可中断锁/不可中断锁的逻辑,当子类扩展AQS时调用对应的方法即可实现不同的锁组合。

JUC下扩展自AQS的子类封装器:


image.png

接下来进入到AQS源码里,看看它是如何实现上述功能的。
注:Semaphore和CountDownLatch 并不是严格意义上的锁,后面具体分析每种锁的时候再细说

3、AQS 独占锁实现

A、先找到关键数据结构

锁芯

#AbstractQueuedSynchronizer.java
    private volatile int state;

state 称为共享资源或者同步状态,作为锁的锁芯,可以看出它用volatile修饰了。

同步队列

#AbstractQueuedSynchronizer.java
    //指向同步队列的头
    private transient volatile Node head;
    //指向同步队列的尾
    private transient volatile Node tail;

再来看Node里的元素:

#AbstractQueuedSynchronizer.java
    static final class Node {
        ...
        //前驱节点
        volatile Node prev;
        //后继节点
        volatile Node next;
        //占用独占锁的线程
        volatile Thread thread;
        //指向下一个等待条件的节点
        Node nextWaiter;
        ...
    }

B、操作关键数据结构的方法

1、先说获取同步状态的操作

acquire(xx)

#AbstractQueuedSynchronizer.java
    public final void acquire(int arg) {
        //arg 不同的锁实现有不同的含义
        //tryAcquire 由子类实现具体的获取同步状态操作
        //addWaiter 将当前线程封装在Node里并加入到同步队列
        //acquireQueued 符合条件则挂起线程
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //补上中断标记
            selfInterrupt();//-------------(1)
    }

里面写法很简单,重要工作都在各个方法里体现。
\color{Red}{问题1:如标注的(1),为什么需要selfInterrupt()?}

tryAcquire(xx)

真正获取锁的地方,也就是操作锁芯"state"的地方,不同子类有不一样的实现,后面会分类细说。tryAcquire(xx) 返回true表示获取同步状态成功,false表示获取同步状态失败。

addWaiter(xx)

#AbstractQueuedSynchronizer.java
    private Node addWaiter(Node mode) {
        //构造新节点
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            //尾节点存在
            //新节点的前驱指针指向尾节点
            node.prev = pred;//-------------(2)
            //CAS修改为尾节点指向新节点
            if (compareAndSetTail(pred, node)) {
                //成功后
                //将最后一个节点的后继指针指向新加入的节点
                //此时新节点正式加入到同步队列里了
                pred.next = node;
                return node;
            }
        }
        //前面步骤加入队列失败,则会走到这
        enq(node);
        //返回新加入的节点
        return node;
    }

    private Node enq(final Node node) {
        //死循环务必保证插入队列成功
        for (;;) {
            Node t = tail;
            if (t == null) { 
                //队列是空的,则先创建头节点
                if (compareAndSetHead(new Node()))
                    //尾节点指向头节点
                    tail = head;
            } else {
                //和addWaiter里一样的操作,加入新节点到队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter(xx)作用是将节点加入到同步队列的尾部。
需要注意的是:

头节点不关联任何线程,仅仅起到索引的作用。

最终,同步队列如下图:


image.png

\color{Red}{问题2:如标注的(2),为什么先设置前驱指针?}

acquireQueued(xx)

按照以往的经验(synchronized),加入到同步队列后应该挂起线程,来看看AQS实现有何不同:

#AbstractQueuedSynchronizer.java
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //记录中断标志
            boolean interrupted = false;
            for (;;) {
                //找到当前节点的前驱节点
                final Node p = node.predecessor();
                //如果前驱节点为头节点,则尝试获取同步状态
                if (p == head && tryAcquire(arg)) {
                    //获取同步状态成功,将头节点指向下一个节点,并且新的头节点prev=null,表示原本的头节点出队了
                    setHead(node);
                    //原本的头节点next=null,帮助尽快GC
                    p.next = null;
                    failed = false;
                    return interrupted;
                }
                //判断获取同步状态失败后是否需要挂起,走到这里说明获取同步状态失败了,可能需要挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //标记中断
                    interrupted = true;
            }
        } finally {
            if (failed)
                //还是没获取成功,则取消竞争同步状态操作
                cancelAcquire(node);
        }
    }

这里面的逻辑可能比较绕,着重分析一下。
首先外层有个死循环,该循环退出的条件是当前线程成功获取了同步状态。
其次,如果当前新加入队列的节点的前驱节点是头节点,那么它就会去尝试获取同步状态。若是获取同步状态失败或者它的前驱节点不是头节点,则进入到shouldParkAfterFailedAcquire(xx)方法。

#AbstractQueuedSynchronizer.java
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //拿出前驱节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //为SIGNAL 直接返回
            return true;
        if (ws > 0) {
            //该节点已被取消
            do {
                //一直往前追溯,直到找到不被取消的节点为止
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //找到
            pred.next = node;
        } else {
            //不是取消状态,则直接设置前驱节点状态为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

该方法返回true,则说明当前线程所在节点的前驱节点的状态为:SIGNAL。进而执行parkAndCheckInterrupt()方法。

parkAndCheckInterrupt()

从名字就可以看出来是挂起线程并检查中断。

#AbstractQueuedSynchronizer.java
    private final boolean parkAndCheckInterrupt() {
        //挂起线程
        LockSupport.park(this);
        //查询中断标记位
        return Thread.interrupted();
    }

cancelAcquire(xx)

该方法有两种场景会调用到:

1、当获取同步状态发生异常时,需要取消线程竞争同步状态的操作。
2、当获取同步状态的超时时间到来之时,若此刻还无法成功获取同步状态,则调用该方法。

#AbstractQueuedSynchronizer.java
    private void cancelAcquire(Node node) {
        if (node == null)
            return;
        node.thread = null;
        Node pred = node.prev;
        //若是已经取消了,则跳过
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        Node predNext = pred.next;

        //标记为取消状态
        node.waitStatus = Node.CANCELLED;
        //如果当前节点是尾节点,则将尾节点指向当前节点的前驱节点
        if (node == tail && compareAndSetTail(node, pred)) {
            //再将前驱节点的后继指针置为空,把node从队列里移除
            compareAndSetNext(pred, predNext, null);
        } else {//---------------(3)
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                //若前驱节点不是头结点,当前节点也不是尾结点
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    //将node的前驱节点的后继指针指向node的后继节点
                    compareAndSetNext(pred, predNext, next);
            } else {
                //前驱节点是头结点
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

\color{Red}{问题3:如标注的(3),node 什么时候从队列里移除?}
2、再说释放同步状态的操作

release(xx)

#AbstractQueuedSynchronizer.java
    public final boolean release(int arg) {
        //tryRelease 释放同步状态
        if (tryRelease(arg)) {
            //释放同步状态成功
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //waitStatus 不为0,则唤醒线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        //将头结点状态置为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //取出头结点的后继节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
          //若是没有后继节点或者是取消状态
            s = null;
            //则从尾部开始寻找离头结点最近的未取消的节点-----------(4)
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //唤醒线程
        if (s != null)
            LockSupport.unpark(s.thread);
    }

\color{Red}{问题4:如标注的(4),为什么需要从尾部开始索引?}

节点状态

#AbstractQueuedSynchronizer.java
        //节点被取消,不再参与竞争锁
        static final int CANCELLED =  1;
        //表示该节点的后继节点需要唤醒
        static final int SIGNAL    = -1;
        //节点在等待队列里的状态
        static final int CONDITION = -2;
        //表示头结点将唤醒的动作传播下去
        static final int PROPAGATE = -3;
        //默认值为0

至此,独占锁的获取锁、释放锁的流程已经分析完毕,如下图:


image.png

4、AQS 共享锁实现

独占锁是同一时刻只允许一个线程获取锁,而共享锁则不然,来看看AQS里共享锁的实现。
先来看看获取共享同步状态的操作

acquireShared(xx)

#AbstractQueuedSynchronizer.java
    public final void acquireShared(int arg) {
        //获取共享的同步状态,不同锁实现不一样
        //<0 表示获取同步状态失败
        if (tryAcquireShared(arg) < 0)
            //加入同步队列、挂起线程等在此处实现
            doAcquireShared(arg);
    }

与独占锁的获取不一样的是,此处将加入同步队列与挂起线程等操作放到一个方法里了。

doAcquireShared(xx)

#AbstractQueuedSynchronizer.java
    private void doAcquireShared(int arg) {
        //加入同步队列,此处节点是共享状态
        final Node node = addWaiter(Node.SHARED);
        //获取同步状态失败
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //前驱节点
                final Node p = node.predecessor();
                if (p == head) {
                    //若是前驱节点为头结点,则尝试获取同步状态
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //获取同步状态成功
                        //修改头结点,并传递唤醒状态
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            //补中断
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //与独占锁一致
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }   

tryAcquireShared(arg) 返回值表示当前可用的资源。

setHeadAndPropagate(xx)

#AbstractQueuedSynchronizer.java
private void setHeadAndPropagate(Node node, int propagate) {
        //propagate == 0 表示没有资源可以使用了
        Node h = head; // Record old head for check below
        //设置头结点
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            //若后继节点是共享节点,则唤醒
            if (s == null || s.isShared())
                doReleaseShared();
        }

除了将头结点指向当前节点外,还需要唤醒下一个共享节点。
而独占锁不会。

从现实的角度来看也比较容易理解这种操作:

某个澡堂分男女分批洗,当前只允许女士进去先洗,而一群男士在排队等候,当女士们洗好之后,允许男士进去洗。第一个男士进去后,发现可以洗,于是跟第二个男士说可以洗,你快进来吧,真可以洗,这是共享精神。这个澡堂就是共享的。

再来看看释放共享同步状态的操作

releaseShared(xx)

#AbstractQueuedSynchronizer.java
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            //释放同步状态成功后,通知后续节点
            doReleaseShared();
            return true;
        }
        return false;
    }

可以看出,线程在获取共享锁和释放共享锁后都会尝试唤醒后续节点,都调用了
doReleaseShared()方法。

doReleaseShared()

#AbstractQueuedSynchronizer.java
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                //队列里还有节点等候
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    //唤醒后继节点
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                //将头节点状态置为PROPAGATE-------->(5)
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //该方法可能会被多个线程调用,而线程获取锁后会修改头节点
            //因此,若是发现头结点更改了,则再重新拿新的头结点再试探
            if (h == head)                   // loop if head changed
                break;
        }
    }

\color{Red}{问题5:如标注的(5),为什么需要PROPAGATE状态?}
至此,共享锁的获取锁、释放锁的流程已经分析完毕,如下图:

image.png

5、场景模拟与疑难分析

共享锁、独占锁的实现重要方法、数据结构都过了一遍,接下来通过模拟场景来分析上面提到的五个问题。

1、问:为什么需要selfInterrupt()

#AbstractQueuedSynchronizer.java
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

LockSupport.park(this)将线程挂起,挂起后调用Thread.interrupted()查询中断状态。而Thread.interrupted()除了查询中断状态外,还会重置中断状态,也就是说之前中断状态为true,调用该方法后中断状态变为false。而从整个acquire(xx)方法来看,没有任何地方处理了中断,因此不能简单将中断状态置为false,还需要恢复到原来的样子,让外部调用者可以感知到是否已经发生过中断了,所以需要selfInterrupt() 重新把中断状态设置为true。
既然后面要恢复中断状态,那干嘛一开始就置为false呢,直接调用Thread.isInterrupted()不就ok了吗?
想象一种场景:A线程被中断,此时从挂起状态醒过来,然后去获取同步状态,发现还是无法获取,于是又开始准备挂起了。此处挂起线程使用的是LockSupport.park(xx)方法,其底层使用Parker.park(xx)函数:


image.png

注意红框里代码,若是发现当前线程中断状态为true,则直接返回不再挂起线程。若是调用Thread.isInterrupted(),中断状态没有改为false,那么当调用LockSupport.park(xx)方法时,线程是无法挂起的。而acquire(xx)方法里没有获取到锁就一直循环,导致线程一直不断轮询同步状态,造成了不必要的CPU资源浪费。
Parker细节请移步:Java Unsafe/CAS/LockSupport 应用与原理
线程中断细节请移步:Java “优雅”地中断线程(原理篇)

2、问:为什么先设置前驱指针
当前的顺序是:

node.prev = pred------>pred.next = node;

先让新结点的prev指向尾结点,再让尾结点的next指向新结点,如下图:


image.png

现在同步队列里有两个结点,其中一个头结点,一个是Node1结点。若是先给pred.next 赋值,假设流程如下:

1、线程A先竞争锁,竞争失败,先将Node1的next指向NewNodeA。
2、此时另一个线程B也来竞争锁,失败,也将Node1的next指向NewNodeB。
3、将tail指针指向新的节点(可能是NewNodeA,也可能是NewNodeB),若是NewNodeA,然后将NewNodeA的prev指向Node1。此时问题出现了:虽然NewNodeA的prev指向了Node1,但是Node1的next却是指向了NewNodeB。

而先给node.prev 赋值就不会出现上述情况。出现这个问题的根本原因是多线程操作队列元素(给Node1.next赋值)没有做好并发保护,而先给node.prev 并不是操作队列,将操作队列的步骤延迟到CAS成功之后,就能正确地修改队列。
当然,pred.next = node 执行之前,其它线程可能会遍历查询队列,此时pred.next可能为空,也就是上图的Node1.next可能为空。
这也是网上一些文章说next指针不可靠的原因。

3、问:node 什么时候从队列里移除
cancelAcquire(xx)分三种情形操作同步队列:
1、若node为队尾节点,则将node从队列移除。
2、若node为队头节点,则调用unparkSuccessor(xx)检测。
3、若node为中间节点,则在shouldParkAfterFailedAcquire(xx)/unparkSuccessor(xx) 彻底移除。

4、问:为什么需要从尾部开始索引
在第2点里有分析过节点的next指针可能为空,若是从队头开始索引,有可能还没遍历完整个队列就退出遍历了。因此,为了保险起见,从队尾开始索引。

5、问:为什么需要PROPAGATE状态
PROPAGATE 在共享节点时才用得到,假设现在有4个线程、A、B、C、D,A/B 先尝试获取锁,没有成功则将自己挂起,C/D 释放锁。可以参照Semaphore获取/释放锁流程。

1、C 释放锁后state=1,设置head.waitStatus=0,然后将A唤醒,A醒过来后调用tryAcquireShared(xx),该方法返回r=0,此时state=0。
2、在A还没调用setHeadAndPropagate(xx)之前,D 释放了锁,此时D调用doReleaseShared(),发现head.waitStatus==0,所以没有唤醒其它节点。
3、此时A调用了setHeadAndPropagate(xx),因为r==0且head.waitStatus==0,因此不会调用doReleaseShared(),也就没有唤醒其它节点。最后导致的是B节点没有被唤醒。

若是加了PROPAGATE状态,在上面的第2步骤里的D调用doReleaseShared()后,发现head.waitStatus==0,于是设置head.waitStatus=PROPAGATE,在第3步骤里,发现head.waitStatus==PROPAGATE,于是唤醒B。
虽然在第2步骤里没有唤醒任何线程,但是设置了PROPAGATE状态,在后续的步骤中发现已经设置了PROPAGATE,于是唤醒,这也是PROPAGATE名字的意义:传播。

由于篇幅原因,下篇将分析AQS 中断与否、条件等待等相关知识。

本文基于jdk1.8。

您若喜欢,请点赞、关注,您的鼓励是我前进的动力

持续更新中,和我一起步步为营系统、深入学习Android/Java

上一篇下一篇

猜你喜欢

热点阅读