AQS(AbstractQueuedSynchronizer)详

2021-04-25  本文已影响0人  Duanty

什么是AQS?

备注1:

CLH锁即Craig, Landin, and Hagersten (CLH) locks. CLH锁是一个自旋锁。能确保无饥饿性. 提供先来先服务的公平性.
CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁, 申请线程仅仅在本地变量上自旋, 它不断轮询前驱的状态, 假设发现前驱释放了锁就结束自旋

AQS中的数据结构

节点和同步队列

Node节点属性

同步队列数据结构

核心方法分析

public final void acquire(int arg)

该方法是独占模式下线程获取共享资源的入口, 如果获取到资源后, 线程直接返回.否则将进入等待队列, 直到获取到资源为止(整个过程忽略中断的影响). 这就是Lock.lock()的语义, 你也可以自定义Lock顶层接口, 参考 Doug Lea对Lock的定义.

  public final void acquire(int var1) {
        if (!tryAcquire(var1) && acquireQueued(addWaiter(Node.EXCLUSIVE), var1)) {
            selfInterrupt();
        }
    }

函数流程如下:

  1. tryAcquire(): 尝试直接获取资源, 如果成功直接返回(调用tryAcquire更改状态,需要保证原子性. 这里体现了非公平锁, 每个线程获取锁时会尝试直接抢占加塞一次, 而CLH队列中可能还有别的线程在等待).
  2. addWaiter(): 如果获取不到, 将当前线程构造成节点Node并加入sync队列的尾部, 并且标记为独占模式.
  3. acquireQueued(): 使线程阻塞在等待队列中获取资源, 一直获取到资源后才返回. 如果在整个等待过程中被中断过, 则返回true, 否则返回false.
  4. 如果线程在等待过程中被中断过, 它是不响应的. 只是获取资源后才再进行自我中断selfInterrupt(), 将中断补上(响应前面说的, 整个等待过程忽略中断的影响).
1. tryAcquire()方法

此方法尝试去获取独占资源. 如果获取成功, 则直接返回true, 否则直接返回false. 这也正是tryLock()的语义, 还是那句话. 当然不仅仅只限于tryLock().
如下是tryAcquire()的源码

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

这里throw异常是留给我们进行实现的. AQS只是一个框架, 具体资源的获取和释放逻辑由我们自定义同步器去实现(就像ReentrantLock类). 需要自定义实现的方法都没有定义成abstract, 由我们根据同步器独占/共享自有选择.

2. addWaiter(Node)方法
    private Node addWaiter(Node mode) {
        // 以给定模式构造结点. mode有两种: EXCLUSIVE(独占)和SHARED(共享)
        Node node = new Node(Thread.currentThread(), mode);
        //尝试直接将节点放到sync队列尾部,
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //如果放入尾部失败, 调用enq()入队
        enq(node);
        return node;
    }
3. enq(Node)方法
    private Node enq(final Node node) {
        //CAS"自旋", 直到成功加入队尾
        for (;;) {
            Node t = tail;
            if (t == null) { //  队列为空, 创建一个空的结点作为head结点, 并将tail也指向它, 这是一个初始化的动作
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {//正常流程, 放入队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

CAS自旋volatile变量, 保证了可见性, 操作上又是原子方法. 这是一种很经典的用法

4. acquireQueued(Node, int)方法

当节点进入同步队列后, 接下来就是要等待获取锁(访问控制), 同一时刻只有一个线程在运行, 其他都要进入等待状态. 每个线程节点都是独立的, 他们进行自旋判断, 当发现前驱节点是头结点并且获取了状态(tryAcquire()自己实现原子性操作), 那这个线程就可以运行了.

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;//标记是否可以成功拿到状态
        try {
            boolean interrupted = false;//处理过程中是否被中断过
            for (;;) {//自旋
                final Node p = node.predecessor();//获取当前节点的前驱节点
              //如果前驱节点是head, 当前节点就是排第二. 这个时候可以尝试去获取资源了(头结点可能释放完唤醒自己了)
                if (p == head && tryAcquire(arg)) {
                    setHead(node);//设置头节点为当前节点
                    p.next = null; // help GC setHead()中node.prev已置为null, 此处再将head.next置为null. 方便gc回收head节点.
                    failed = false;//标记成功获取资源
                    return interrupted;
                }
                //不满足唤醒条件, 调用park()进入waiting状态, 等待unpark(). 如果等待的过程被中断, 线程会从park()中醒过来, 发现拿不到资源后继续进入park()中等待.
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;//如果线程被终端, 标记interrupted为true, 等待线程获取到资源后在中断
            }
        } finally {
            if (failed)//如果等待过程中没有成功获取资源(不可控异常), 取消线程在队列的等待
                cancelAcquire(node);
        }
    }
shouldParkAfterFailedAcquire()方法如果发现前驱节点状态不是SIGNAL, 会标记前驱节点状态为SIGNAL(-1). 如果发现前驱节点放弃等待了就一直往前找节点, 直到找到正常等待的节点排队到它后面.
parkAndCheckInterrupt()使线程进入waiting状态, 如果发现被唤醒, 检查是不是被中断了并且清除状态.

acquire()方法总结

  1. 尝试直接插队获取资源, 如果不成功进入同步队列排队.
  2. 调用park()进入waiting状态, 等待前驱节点调用unpark()或者interrupt()唤醒自己. interrupt()唤醒拿不到资源继续进入waiting状态.
  3. 被唤醒后尝试获取资源, 如果获取不到资源进入2流程, 获取到资源就执行后续代码(如果等待过程被中断过此时会调用selfInterrupt()将中断补上).

public final boolean release(int arg)

该方法是独占模式下线程释放共享资源的入口.

    public final boolean release(int arg) {
        if (tryRelease(arg)) {//释放资源, 自定义函数实现
            Node h = head;
            if (h != null && h.waitStatus != 0)//拿到头结点
                unparkSuccessor(h);//唤醒等待队列中的下一个线程
            return true;
        }
        return false;
    }
1. tryRelease(arg)方法

需要我们实现的独占资源释放函数.

protected boolean tryRelease(int arg) {
     throw new UnsupportedOperationException();
}
2. unparkSuccessor(node) 方法

唤醒等待队列中的下一个线程

    private void unparkSuccessor(Node node) {
        //当前线程节点的状态
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);//设置当前线程的节点状态为0, 因为已经释放资源
        Node s = node.next; //找到下一个需要唤醒的节点
        if (s == null || s.waitStatus > 0) {//下一个节点为空或者已经放弃等待就取消唤醒操作
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)//从后往前找有效的节点
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//唤醒有效节点
    }
下一个有效的线程被唤醒后处在acquireQueued()的自旋流程中, 然后进入资源判断获取(if (p == head && tryAcquire(arg))).

public final void acquireShared(int arg)

此方法是共享模式下线程获取共享资源的顶层入口. 它会获取指定量的资源(state), 获取成功后直接返回, 获取失败进入等待队列, 直到获取到资源(整个过程忽略中断的影响).参考ReentrantReadWriteLock设计.

 public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)//改方法需要自定义同步器实现. 返回语义负数表示失败, 0或者大于零表示获取成功.
            doAcquireShared(arg);//小于零进入等待队列, 获取资源后返回
    }
1. doAcquireShared(arg)方法
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);//加入队列的尾部, 模式为共享. addWaiter()方法参考上面介绍
        boolean failed = true;//成功失败标识
        try {
            boolean interrupted = false;//是否中断标识
            for (;;) {//CAS自旋
                final Node p = node.predecessor();//获取前驱节点
                if (p == head) {//前驱节点为头结点, 尝试获取资源(此处有可能是前驱节点唤醒了自己)
                    int r = tryAcquireShared(arg);//获取资源
                    if (r >= 0) {//成功
                        setHeadAndPropagate(node, r);//将head指向自己, 此时r>0, 还有剩余资源唤醒后续排队线程
                        p.next = null; // help GC
                        if (interrupted)// 中断标识
                            selfInterrupt();//补上中断
                        failed = false;
                        return;
                    }
                }
                 //不满足唤醒条件, 调用park()进入waiting状态, 等待unpark(). 如果等待的过程被中断, 线程会从park()中醒过来, 发现拿不到资源后继续进入park()中等待.
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
2. setHeadAndPropagate(Node, int)方法
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        setHead(node);/head指向自己
        //如果还有剩余量, 继续唤醒下一个排队的线程
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

acquireShared()方法总结

  1. tryAcquireShared()方法尝试获取资源, 成功直接返回, 如果不成功进入同步队列排队.
  2. 调用park()进入waiting状态, 等待前驱节点调用unpark()或者interrupt()唤醒自己.
  3. 被唤醒后尝试获取资源, 如果获取不到资源进入2流程, 获取到资源就执行后续代码.
    其实同acquir()方法一样, 只不过该方法在自己拿到资源后回去唤醒后继线程

public final boolean releaseShared(int arg)

该方法是共享模式下线程释放共享资源的入口. 跟独占模式下的资源释放方法release()很相似, 不同的是独占模式一般是完全释放资源(state=0)后才允许去唤醒其他线程, 而共享模式往往不会这么控制, 具体实现要看自定义同步器的逻辑.

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//尝试释放资源, 该方法需要自定义共享同步器实现.
            doReleaseShared();//唤醒后继节点
            return true;
        }
        return false;
    }
1. tryReleaseShared()方法

需要我们自己实现的共享资源释放方法.

    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
2. doReleaseShared()方法

该方法是用来唤醒后继节点的.

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;           
                    unparkSuccessor(h);//唤醒后继节点
                } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;               
            }
            if (h == head)  //head节点如果发生变化即退出自旋
                break;
        }
    }

releaseShared()方法总结

  1. tryReleaseShared()方法进行共享资源的释放.
  2. doReleaseShared()方法用来唤醒后继节点.
以上是几个AQS常用的资源获取和释放的基本方法, 其实还有一些方法和上面分析的方法略有不同, 如下:

测试案例

1. ExclusiveLock(自定义独占锁)

ExclusiveLock是互斥的不可重入锁实现, 对锁资源State的操作只有0和1两个状态, 0代表未锁定,1代表锁定. 按照上面的分析, 我们需要实现AQS的tryAcquire()和tryRelease()方法.

public class ExclusiveLock implements Lock {
    //自定义内部类同步器
    private static class ExclusiveSync extends AbstractQueuedSynchronizer {
        //判断是否是锁定状态
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        //尝试获取资源, 如果成功直接返回. 获取成功返回true, 否则返回false.
        @Override
        protected boolean tryAcquire(int arg) {
            if(compareAndSetState(0, 1)){//状态变更必须为CAS原子操作, 保证原子性
                setExclusiveOwnerThread(Thread.currentThread());//同样也是原子操作
                return true;
            }
            return false;
        }
        //尝试释放资源
        @Override
        protected boolean tryRelease(int arg) {
            if(getState() == 0){
                throw new UnsupportedOperationException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        
    }
    //创建自定义同步器的实现
    private final ExclusiveSync sync = new ExclusiveSync();
    //获取资源, 同acquire()语义一样, 获取不到进入同步队列等待成功返回
    @Override
    public void lock() {
        sync.acquire(1);
    }
   //判断锁是否被占有
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    //获取资源, 立刻返回结果
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    //
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
    //释放资源
    @Override
    public void unlock() {
        sync.release(1);
    }
    
}

1. ShareLock(自定义共享锁)

ShareLock为一个共享同步器的实现, 设计同一时刻可以有两个线程获取到资源, 超过两个进行同步队列阻塞. 按照上面的分析, 我们实现AQS的tryAcquireShared()和tryReleaseShared()方法.

public class ShareLock implements Lock {
    
    
    public static class ShareSync extends AbstractQueuedSynchronizer{
        
        //定义同步器的初始状态为2
        ShareSync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must large than zero.");
            }
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int reduceCount) {
            for (;;) {
                int current = getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }

        }

        @Override
        protected boolean tryReleaseShared(int reduceCount) {
            for (;;) {
                int current = getState();
                int newCount = current + reduceCount;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }
    }
    
    
    private final ShareSync sync = new ShareSync(2);
    
    
    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) >= 0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}
上一篇下一篇

猜你喜欢

热点阅读