Java 进阶java学习快到碗里来java

java并发编程(十七)带你了解什么是面试官常说的AQS

2022-01-13  本文已影响0人  我犟不过你

前面我们学习了ReentrantLock,其底层就是用了AQS实现的,应该先讲这一章节,但是当时给忘了,现在给补上吧。

关于ReentrantLock的学习,可以参考:https://www.jianshu.com/p/edec5185196d

AbstractQueuedSynchronizer是阻塞式锁,以及同步器组件的实现框架。是JDK中实现并发编程的核心,它提供了一个基于FIFO队列,平时我们工作中经常用到的ReentrantLock,CountDownLatch等都是基于它来实现的。

一、初识AQS

首先我们还是从前面学习的ReentrantLock入手,看看其代码结构是什么样的:

ReentrantLockl类图

从上图可以看到以下几点:

AQS有如下的特点:

二、 源码分析

下面我们通过源码剖析其本质是什么样的。

首先在脑海中有个印象,AQS维护了两个对个队列,一个是同步队列,一个是阻塞队列。

Node可以说是【同步队列】和【阻塞队列】的节点。

2.1 Node源码剖析

  static final class Node {
    // 模式,分为共享与独占
    // 共享模式
    static final Node SHARED = new Node();
    // 独占模式
    static final Node EXCLUSIVE = null;        
    // 结点状态
    // CANCELLED,值为1,表示当前的线程被取消
    // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
    // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
    // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;        

    // 结点状态
    volatile int waitStatus;        
    // 前驱结点
    volatile Node prev;    
    // 后继结点
    volatile Node next;        
    // 结点所对应的线程
    volatile Thread thread;        
    // 下一个等待者
    Node nextWaiter;
    
    // 结点是否在共享模式下等待
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    
    // 获取前驱结点,若前驱结点为空,抛出异常
    final Node predecessor() throws NullPointerException {
        // 保存前驱结点
        Node p = prev; 
        if (p == null) // 前驱结点为空,抛出异常
            throw new NullPointerException();
        else // 前驱结点不为空,返回
            return p;
    }
    
    // 无参构造函数
    Node() {    // Used to establish initial head or SHARED marker
    }
    
    // 构造函数,被addWaiter使用
     Node(Thread thread, Node mode) {    // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
    // 构造函数
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

2.2 ConditionObject源码剖析

实现了condition接口,关于condition的学习后面会介绍,在学习ReentrantLock时也介绍了其使用方式。

代码较多,直接从上向下看吧:

 // 内部类
public class ConditionObject implements Condition, java.io.Serializable {
    // 版本号
    private static final long serialVersionUID = 1173984872572414699L;
    
    // condition队列的头结点
    private transient Node firstWaiter;
    
    // condition队列的尾结点
    private transient Node lastWaiter;

    /**
     *  构造函数
     */
    public ConditionObject() { }

    /**
     * 添加新的waiter到wait队列
     */
    private Node addConditionWaiter() {
        // 定义尾结点是t
        Node t = lastWaiter;
       // 尾结点不为空,并且尾结点的状态不为CONDITION(默认是-2,表示当前节点在conditionObject等待队列中)
        if (t != null && t.waitStatus != Node.CONDITION) { 
            // 清除状态不为CONDITION的结点,对firstWaiter和lastWaiter重新赋值
            unlinkCancelledWaiters(); 
            // 将最后一个结点重新赋值给t
            t = lastWaiter;
        }
        // 新建一个结点
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        // 尾结点为空
        if (t == null) 
            // 设置condition队列的头结点
            firstWaiter = node;
        else 
            // 设置为节点的nextWaiter域为node结点
            t.nextWaiter = node;
        // 更新condition队列的尾结点
        lastWaiter = node;
        return node;
    }

    /**
     *  移除或转移头结点到sync队列,直到没有取消的或者空为止
     */
    private void doSignal(Node first) {
        // 循环
        do {
            // 将下一个节点设为首节点,如果为空
            if ( (firstWaiter = first.nextWaiter) == null) 
                // 设置尾结点为空
                lastWaiter = null;
            // 设置first结点的nextWaiter域
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null); // 将结点从condition队列转移到sync队列失败并且condition队列中的头结点不为空,一直循环
    }

    /**
     * 转移所有等待队列的节点到同步队列
     */
    private void doSignalAll(Node first) {
        // condition队列的头结点尾结点都设置为空
        lastWaiter = firstWaiter = null;
        // 循环
        do {
            // 获取first结点的nextWaiter域结点
            Node next = first.nextWaiter;
            // 设置first结点的nextWaiter域为空
            first.nextWaiter = null;
            // 将first结点从condition队列转移到sync队列
            transferForSignal(first);
            // 重新设置first
            first = next;
        } while (first != null);
    }

    /**
     * 过滤掉状态不为CONDITION的节点
     * 对firstWaiter和lastWaiter重新赋值
     **/
    private void unlinkCancelledWaiters() {
        // 获取condition队列头结点
        Node t = firstWaiter;
        // 获取一个尾结点是null
        Node trail = null;
        while (t != null) {
            // 获取下一个结点
            Node next = t.nextWaiter;
            // 头结点的状态不为CONDTION状态
            if (t.waitStatus != Node.CONDITION) { 
                // 设置t节点的下一个等待者为空
                t.nextWaiter = null;
                if (trail == null) // trail为空,首次进来一定为空
                    // 重新设置condition队列的头结点
                    firstWaiter = next;
                else 
                    // 设置trail结点的nextWaiter域为next结点
                    trail.nextWaiter = next;
                if (next == null) // next结点为空
                    // 设置condition队列的尾结点
                    lastWaiter = trail;
            }
            else // t结点的状态为CONDTION状态
                // 设置trail结点
                trail = t;
            // 设置t结点
            t = next;
        }
    }

    /**
     * 实现Condition接口的signal方法
     */
    public final void signal() {
        if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
            throw new IllegalMonitorStateException();
        // 保存condition队列头结点
        Node first = firstWaiter;
        if (first != null) // 头结点不为空
            // 唤醒一个等待线程,将头结点从阻塞队列移除,添加到同步队列
            doSignal(first);
    }

    /**
     * 实现Condition的signalAll方法,唤醒所有线程
     */
    public final void signalAll() {
        if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
            throw new IllegalMonitorStateException();
        // 保存condition队列头结点
        Node first = firstWaiter;
        if (first != null) // 头结点不为空
            // 唤醒所有等待线程,将头结点从阻塞队列移除,添加到同步队列
            doSignalAll(first);
    }

    /**
     * 与await()区别在于,使用await方法,调用interrupt()中断后会报错,而该方法不会报错。
     */
    public final void awaitUninterruptibly() {
        // 添加一个结点到等待队列
        Node node = addConditionWaiter();
        // 获取释放的状态
        int savedState = fullyRelease(node);
        boolean interrupted = false;
        while (!isOnSyncQueue(node)) { // 
            // 阻塞当前线程
            LockSupport.park(this);
            if (Thread.interrupted()) // 当前线程被中断
                // 设置interrupted状态
                interrupted = true; 
        }
        if (acquireQueued(node, savedState) || interrupted) // 
            selfInterrupt();
    }

    /**
     *  等待,当前线程在接到信号或被中断之前一直处于等待状态
     */
    public final void await() throws InterruptedException {
        // 当前线程被中断,抛出异常
        if (Thread.interrupted()) 
            throw new InterruptedException();
        // 将当前线程包装成Node,尾插入到等待队列中
        Node node = addConditionWaiter();
        // 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            // 当前线程进入到等待状态
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查结点等待时的中断类型
                break;
        }
        // 自旋等待获取到同步状态(即获取到lock)
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        // 处理被中断的情况
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    /**
     * 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
     */
    public final long awaitNanos(long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }

    /**
     * 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
     */
    public final boolean awaitUntil(Date deadline)
            throws InterruptedException {
        long abstime = deadline.getTime();
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        boolean timedout = false;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (System.currentTimeMillis() > abstime) {
                timedout = transferAfterCancelledWait(node);
                break;
            }
            LockSupport.parkUntil(this, abstime);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }

    /**
     * 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等              
     * 效于:awaitNanos(unit.toNanos(time)) > 0
     */
    public final boolean await(long time, TimeUnit unit)
            throws InterruptedException {
        long nanosTimeout = unit.toNanos(time);
        if (Thread.interrupted())
            throw new InterruptedException();
        // 1. 将当前线程包装成Node,尾插入到等待队列中
        Node node = addConditionWaiter();
        // 2. 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
        int savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        boolean timedout = false;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                timedout = transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }

2.3 锁的获取和释放

整个AQS的设计理念就是通过state字段来实现锁的获取和释放,锁有主要分为公平锁和非公平锁。

2.3.1 公平锁

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            // 继承自AQS的方法,内部先调用tryAcquire获取锁,获取失败则添加下城到等待队列当中
            acquire(1);
        }

        /**
         * 公平锁版本的tryAcquire
         */
        protected final boolean tryAcquire(int acquires) {
            // 获取当前线程
            final Thread current = Thread.currentThread();
            // 获取锁的状态
            int c = getState();
            // 0表示锁没有被持有
            if (c == 0) {
                // 判断当前等待队列是否有节点在等待,没有才去竞争
                if (!hasQueuedPredecessors() &&
                    // 比较并替换状态
                    compareAndSetState(0, acquires)) {
                    // 设置当前线程为独占线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                // 此处表示锁重入
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

2.3.2 非公平锁

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * 立即获取锁,失败会加入等待队列
         */
        final void lock() {
            // 通过CAS尝试获取锁,成功则设置当前线程独占
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 否则加入等待队列·····································································································································
                acquire(1);
        }

        /** 
          * 非公平锁版本的tryAcquire
          */
        protected final boolean tryAcquire(int acquires) {
            // 走其父类Sync的默认nonfairTryAcquire
            return nonfairTryAcquire(acquires);
        }
    }

2.3.3 Syc子类

这是公平锁和非公平锁的父类,提供统一的tryRelease方法释放锁

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * 提供非公平版本的快捷路径
         */
        abstract void lock();

        /**
         * 非公平锁获取,默认就是非公平锁
         */
        final boolean nonfairTryAcquire(int acquires) {
            // 获取当前线程
            final Thread current = Thread.currentThread();
            // 获取当前锁的状态
            int c = getState();
            // 0表示没有被占用
            if (c == 0) {
                // CAS占用,成功则设置当前线程为独占锁
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 当前线程是独占锁,表示锁重入
            else if (current == getExclusiveOwnerThread()) {
                // 状态 + 1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 设置当前状态
                setState(nextc);
                return true;
            }
            return false;
        }

         /**
         * 释放锁
         */
        protected final boolean tryRelease(int releases) {
            // 当前状态 减去 释放的数量
            int c = getState() - releases;
            // 如果当前线程不是占有锁的线程,抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 当全部释放后,状态为0,取消独占线程
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            // 设置状态为0,返回释放成功
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // 当前线程是否是锁持有者
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        // 获取当前持有者
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        // 获取持有数,只有是线程持有者才能获取
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
    }

3.3.4 acquire 和 release

在AQS当中还有两个核心方法:

    public final void acquire(int arg) {
      // 尝试获取
      if (!tryAcquire(arg) &&
          // 尝试获取成功,以独占方式添加到等待队列,并不断地尝试占有锁知道成功
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
  }

后面我们自定义不可重入锁,来看看同步器和锁的关系是什么样的,加深理解。

2.4 简单总结

到此为止,通过阅读前面的源码内容,我们可以有如下的总结:

三、实践

了解了AQS的结构之后,我们不妨自己动手实践一番。加深理解。

定义一个不可重入锁,需要一个同步器,一个锁,一个测试类

自定义同步器:

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * @description: 实现一个不可重入锁 同步器,state最大只能是1
 * @author:weirx
 * @date:2022/1/13 13:49
 * @version:3.0
 */
public class MyLockSynchronizer extends AbstractQueuedSynchronizer {

    @Override
    protected boolean tryAcquire(int acquires) {
        int state = getState();
        if (state == 0) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
        }
        return false;
    }

    @Override
    protected boolean tryRelease(int acquires) {
        int c = getState() - acquires;
        if (c == 0) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        return false;
    }

    @Override
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }

    protected ConditionObject newCondition() {
        return new ConditionObject();
    }
}

自定义锁:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @description: 自定义锁
 * @author:weirx
 * @date:2022/1/13 14:05
 * @version:3.0
 */
public class MyLock implements Lock {

    MyLockSynchronizer myLockSynchronizer = new MyLockSynchronizer();

    @Override
    public void lock() {
        // 尝试获取锁,失败则加入等待队列
        myLockSynchronizer.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        // 尝试获取锁,失败则加入等待队列, 可中断
        myLockSynchronizer.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        // 尝试获取锁,不加入等待队列
        return myLockSynchronizer.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        // 尝试获取锁,不加入等待队列,有实现
        return myLockSynchronizer.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        // 释放锁
        myLockSynchronizer.release(1);
    }

    @Override
    public Condition newCondition() {
        // 条件变量
        return myLockSynchronizer.newCondition();
    }
}

测试锁的效果:

/**
 * @description: 测试
 * @author:weirx
 * @date:2022/1/13 14:24
 * @version:3.0
 */
public class TestMyLock {

    public static void main(String[] args) {
        MyLock myLock = new MyLock();

        new Thread(() -> {
            try {
                myLock.lock();
                System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
                        + " " + Thread.currentThread().getName() + " :acquire lock success");

                // 休眠一秒看效果
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                myLock.unlock();
                System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
                        + " " + Thread.currentThread().getName() + " :release lock success");
            }

        }, "t1").start();

        new Thread(() -> {
            try {
                myLock.lock();
                System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
                        + " " + Thread.currentThread().getName() + " :acquire lock success");
            } finally {
                myLock.unlock();
                System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
                        + " " + Thread.currentThread().getName() + " :release lock success");
            }
        }, "t2").start();
    }
}

结果,ti一秒后释放锁,才会由t2获得锁:

2022-01-13 14:34:56 t1 :acquire lock success
2022-01-13 14:34:57 t2 :acquire lock success
2022-01-13 14:34:57 t1 :release lock success
2022-01-13 14:34:57 t2 :release lock success

测试下不可重入是否好使,只需要在上述测试代码的线程t1中,再次使用myLock.lock()获取一次锁,发现,整个程序被卡住了,只会打印一条信息:

2022-01-13 14:35:56 t1 :acquire lock success

四、关于Condition的补充

本篇没有介绍Condition的具体内容,但是在之前讲解ReentrantLock提到过【条件变量】,可以返回去看这篇文章了解其用法:https://www.jianshu.com/p/edec5185196d


源码学习真是难,看别人说的再多不如自己跟着走一遍,建议同学们参照本文自己跟踪一遍。

上一篇下一篇

猜你喜欢

热点阅读