AQS独占模式

2020-04-09  本文已影响0人  LENN123
前言

AQS(AbstractQueuedSynchronizer)是一个抽象类,预定义了一些需要由我们自己实现的方法,用来构建自定义的同步工具。

抽象类同接口一样,都无法利用new语句创建对象实例,它存在意义在于预先帮我们实现好了一些方法,并暴露出一些抽象方法由子类实现,本质上相当于一个模版。

先看一下AbstractQueuedSynchronizer类里有哪些方法需要我们自己实现。

//独占模式
protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
// 共享模式
protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

上述五种方法,虽然没有明确要求子类实现,但是可以看到其方法内部没有任何判断逻辑,而是直接抛出了一个UnsupportedOperationException()的异常,表示不支持该操作,显然当我们实现定制化同步工具时,这5个方法就是关键所在。而这5个方法又可以分为独占模式共享模式两大类

独占模式,比如互斥锁,一个锁在一个时刻只能由一个线程占用,其他线程必须阻塞等待该线程释放锁。而共享模式,比如读锁,可以由多个线程共同持有。

这里我们介绍的是独占模式,也就是前三个方法,我们先来分析一下这三个方法分别代表的含义。

方法名 作用
boolean tryAcquire(int arg) 尝试去获取同步状态
boolean tryRelease(int arg) 尝试去释放同步状态
boolean isHeldExclusively 判断当前线程有无获取到同步状态

这里先解释一下,什么是同步状态。在AQS类内部,给我们定义了一个私有的类变量state

 private volatile int state;

这个state变量由volatile关键词修饰,所以保证了其在多个线程之间的可见性。在独占模式下,我们可以定义这个state只有01两个状态, 假设0表示已有线程进入同步代码块,则其他线程阻塞等待,假设1表示当前没有线程进入同步代码块,则当前线程可以尝试进入。那么当一个线程想进入同步代码块时,会判断state是否是0,若是则修改其为1,表示自己已经成功进入同步代码块,其他线程需要等待,这个过程就是tryAcquire。而当已经在同步代码块内部的线程准备离开时,必须要修改当前state1变为0,这个过程就是tryRelease
理解了这个是不是就很容易说自己实现tryAcquire方法呢?比如如下代码。

//错误的实现
  @Override
  protected boolean tryAcquire(int arg) {
      if (state == 0) {
           state = 1;
           return true;
      } else {
           return false;
      }
  }

显然在并发场景下这里会发生错误,比如线程A和线程B同时判断state==0true,然后都修改state = 1,此时线程AB都进入同步代码块,这显然和我们独占模式的设计冲突了。这是一个经典的比较并更新的场景,需要我们保证这两个操作的原子性。AQS内部给我们提供了几个访问和设置state值的方法。

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
     return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
protected final void setState(int newState) {
      state = newState;
}
protected final int getState() {
      return state;
}

可以看到第一个compareAndSetState方法利用CAS保证了原子性,而第二个setState则没有,setState方法可以用在我们之前所说的tryRelease方法里,因为独占模式下同步代码块内部同一时刻最多只存在一个线程,所以不会发生并发错误,只要简单的通过setState方法将state的值由1设置为0就可以了。所以正确的tryAcquire实现如下:

@Override
protected boolean tryAcquire(int arg) {
    return compareAndSetState(0, 1);
}

现在只要我们实现了自己的tryAcquire方法和tryRelease方法逻辑,就可以自定义一个同步工具了,当一个线程tryAcquire成功时,就会发生阻塞。当一个线程tryRelease成功时,就离开了同步代码块。AQS已经帮我们实现了诸如阻塞等待等的复杂方法,tryAcquiretryRelease只是它们判断逻辑中小小的一环而已,下面看一下AQS的具体执行流程。

acquire()

tryAcquire方法的注释里有一段话,This method is always invoked by the thread performing acquire. If this method reports failure, the acquire method may queue the thread. 说的是tryAcquire都会被acquire执行,并且如果返回false,就有可能把当前的请求线程加入到一个等待队列中去,直到被唤醒。我们先看下这个等待队列是什么样的。

    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;
        Node nextWaiter;


        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

AQS内部定义了一个静态内部类Node,也就是这个等待队列的节点,从Node的定义上可以看出,这个等待队列实际上是一个双向链表,链表中的每一个节点代表了对应的Thread,同时还给每个Node设置了waitStatus状态(之后细说)。
AQS内部定义指向这个队列的头节点和尾节点的引用,同样用volatile修饰,保证多线程这间的可见性。

 private transient volatile Node head;
 private transient volatile Node tail;

之前说了,我们自己实现的方法tryAcquire只是acquire执行逻辑之的一环,我们来看看acquire具体是如何执行,并使用这个双向链表结构的队列的。

     public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire失败时,说明此时已有其他线程在同步代码块内部,需要等待,于是执行addWaiter方法。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

addWaiter方法的功能是创建一个代表当前线程的Node加入到等待队列中去,其执行逻辑为

  1. 为当前进入同步代码块失败的线程,创建一个对应的Node实例,这里的mode有两种Node.EXCLUSIVENode.SHARED,在独占模式下也就是选择 Node.EXCLUSIVE
  2. 拿到当前等待队列的尾节点tail,如果当前等待队列不为空且执行CAS操作插入成功,则该节点成为新的tail,则返回。
  3. 否则调用enq方法进行插入,并返回。

值得关注是,若当前队列不为空,这个插入操作是需要通过CAS来完成的,否则可能造成并发错误,使得多个节点都误以为自己插入成功,实际上最终真正插入成功的只有一个,从而造成一些节点丢失的现象。在这里,多个线程在第一步都会把自己的prev指向当前队列的尾部,然后通过CAS,使得只有一个节点成为新的tail节点,其余的线程会再去执行enq方法。

CAS加入队尾

然后再看下未通过CAS成功插入的节点会发生什么。

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq方法整体上采用了一个无限循环结构,也就是不断的尝试插入,直到成功为止。

  1. 判断当前队列是否为空,如果为空则插入一个新节点作为头部,该节点不表示任何线程,具体作用之后会提到。
  2. 若当前队列不为空,采用和之前addWaiter里一样的方法,执行CAS插入,不成功则重试,直到成功为止。

当一个代表特定线程的Node被加入到队列中(尾部)之后,acquire会接着调用acquireQueued方法,acquireQueued方法具体实现如下。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

acquireQueued内部实现整体上还是一个无限循环,具体执行流程如下

  1. 拿到之前加入队列的节点的前驱节点
  2. 判断这个前驱节点是不是head节点,(注意我们之前提到过head节点是不指向任何线程的 ), 如果是head节点,那么说明当前节点排在等待队列的队首,即有可能马上就会被唤醒,所以阻塞该线程之前,再尝试一下看看能否进入同步代码块。
  3. 如果运气好,tryAcquire成功,那么这个等待的线程说明可以进入同步代码块了,下面要做的就是把该节点设为新的头节点(因为这个节点已经没用了)。并且把之前的头节点的next指针设置成null,来让GC把它回收掉。
  4. 如果不能不满足上述条件,说明当前线程需要阻塞等待了,但在真正进入阻塞等待状态(调用parkAndCheckInterrupt方法)之前,还会再进行一次判断,是不是应该wait, 如果不是,再重新执行循环流程。这个再判断的方法就是shouldParkAfterFailedAcquire
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0){
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

这里出现了我们之前没有介绍的waitStatus,Node节点内部定义了几种Status

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

CANCELLED = 1表示当前线程已经被取消,可以忽略并把它移出队列了,SIGNAL = -1表示当前节点的后继结点需要被唤醒, condition表示该线程处于条件等待状态。waitStatus的初始状态为0。基于此我们再来看shouldParkAfterFailedAcquire的执行流程。

  1. 获取当前节点其前驱节点的waitStatus,如果waitStatus== node.SIGNAL,说明其后继节点(也就是当前节点)处于一个等待被唤醒的状态,也就是说处于阻塞等待状态,因此返回true
  2. 如果ws > 0 也就是其处于CANCELLED状态,因当从队列中移除,于是利用node.prev = pred = pred.prev;移除所有CANCELLED节点。
  3. 否则,利用CAS操作将前驱节点的waitSatus设置为SINGNAL, 表示其后继节点应当处于阻塞且需要被唤醒状态,返回false,当处于外层的循环下一次执行事,就会返回true并开始阻塞。

也就是说shouldParkAfterFailedAcquire,除了给当前节点赋予需要被唤醒状态的同时(通过设置前驱节点的waitStatus来完成),还复杂移除当前节点前驱中的代表已经被取消的线程的节点。

release()

相较于acquirerelease方法的逻辑要简单很多

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  1. 首先判断是否能够成功释放同步状态,若可以,则获取当前队列的头节点,否则返回false。
  2. 获得当前队列的头节点, 如果当前队列不为空,且其waitStatus != 0,说明其后继节点所关联的可能是一个处于阻塞状态需要被唤醒的线程,于是调用unparkSuccessor唤醒它,这里体现了第一个不代表任何线程的头节点的作用。
  3. 无论是否成功唤醒一个等待中的线程,最终都会释放同步状态,返回true。

再来看下unparkSuccessor方法做了什么。

private void unparkSuccessor(Node node) {
     
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
  1. 首先如果当前节点的waitStatus<0,则将其清0,因为它的后继节点所代表的线程会被唤醒。
  2. 找到要被唤醒的线程,如果要被唤醒的线程被取消或者为null,则反向从队尾开始找到离队首最近的未被取消的线程节点。
    3.如果线程存在,则调用LockSupport.unpark(s.thread)方法,将其从阻塞状态中恢复。
应用

ReentrantLockJUC包里提供的一个支持公平/非公平的可重入锁,其实际就是基于AQS构建的一个同步工具。

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        ....   
     }

    static final class NonfairSync extends Sync {
        ....
    }
    static final class FairSync extends Sync {
        ....
    }
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    ....
}

其内部包含一个继承于 AQS的抽象子类sync,并在sync的基础上实现了NonfairSync非公平锁和FairSync公平锁来实现对应的功能。
下面让我们也基于AQS,模仿ReentrantLock中的写法,来实现自己的一个互斥锁。

public class MyLock {

    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean tryAcquire(int arg) {
            return compareAndSetState(0, 1);
        }

        @Override
        protected boolean tryRelease(int arg) {
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }

    private Sync sync;
    public MyLock(){
        sync = new Sync();
    }

    public void lock(){
        sync.acquire(1);
    }
    public void unlock(){
        sync.release(1);
    }

}

上一篇下一篇

猜你喜欢

热点阅读