AQS原理(一):基于ReentrantLock
AQS介绍
在使用者的角度上,AQS的功能分为两类:独占功能和共享功能,它的所有子类中,要么实现并使用了它独占功能的 API,要么使用了共享锁的功能,而不会同时使用两套 API,就算是 ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套 API 来实现的
独占锁
解读AQS可以从使用了它独占控制功能的子类 ReentrantLock 说起,分析 ReentrantLock 的同时看一看 AQS 的实现,再推理出 AQS 独特的设计思路和实现方式。最后,再看其共享控制功能的实现。
一般用ReentrantLock,都是这么使用:
reentrantLock.lock()
//do something
reentrantLock.unlock()
ReentrantLock保证 do something 在同一个时间只有一个线程执行这段代码,其他线程被挂起,直到获取锁。所以,ReentrantLock 实现的就是一个独占锁的功能:有且只有一个线程获取到锁,其余线程全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。没错,ReentrantLock 使用的就是 AQS 的独占 API 实现的。
我们从 ReentrantLock 的实现开始一起看看重入锁是怎么实现的。
竞争锁
找到ReentrantLock类的lock()方法:
public void lock() {
sync.lock();
}
ReentrantLock 内部有代理类完成具体操作,ReentrantLock 只是封装了统一的一套 API 而已。并且,ReentrantLock 又分为公平锁和非公平锁,所以,ReentrantLock 内部只有两个 sync 的实现:
公平锁:每个线程抢占锁的顺序为先后调用 lock 方法的顺序依次获取锁,类似于排队吃饭。
非公平锁:每个线程抢占锁的顺序不定,谁运气好,谁就获取到锁,和调用 lock 方法的先后顺序无关,类似于堵车时,加塞的那些 XXXX。
ReentrantLock或者AQS的实现简述如下:有一个被volatile修饰的标志位被叫做key,有没有线程拿走了锁,还需要一个线程安全的队列,维护一堆被挂起的线程。当锁被归还时,能通知这些被挂起的线程,可以来竞争锁。
而公平锁和非公平锁,唯一的区别是获取锁的时候是先进入队列排队再获取锁,还是先获取锁,不成功再进入队列。
公平锁
首先我们看一下ReentrantLock中的公平锁的实现:
final void lock() {
acquire(1);
}
acquire()方法调用的是AQS的acquire()方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
此方法先尝试获取锁,获取不到则创建一个waiter(当前线程)后放入到队列中。
点击tryAcquire()方法:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
留空了,Doug Lea 是想留给子类去实现(既然要给子类实现,应该用抽象方法,但是 Doug Lea 没有这么做,原因是 AQS 有两种功能,面向两种使用场景,需要给子类定义的方法都是抽象方法了,会导致子类无论如何都需要实现另外一种场景的抽象方法,显然,这对子类来说是不友好的。)
既然是子类的tryAcquire()方法,我们查看FairSync的tryAcquire()方法(FairSync几次Sync,而Sync继承AQS):
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
首先调用了AQS的getState()方法,因为在 AQS 里面有个叫 state 的标志位 :
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
那个state就是我们前面所说的key。
继续FairSync的tryAcquire()方法:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();// 获取当前线程
int c = getState();// 获取父类 AQS 中的标志位
if (c == 0) {
// 如果队列中没有其他线程 说明没有线程正在占有锁!
if (!hasQueuedPredecessors() &&
// 修改一下状态位,注意:这里的 acquires 是在 lock 的时候传递来
//的,从上面的图中可以知道,这个值是写死的 1
compareAndSetState(0, acquires)) {
// 如果通过 CAS 操作将状态为更新成功则代表当前线程获取锁,因
//此,将当前线程设置到 AQS 的一个变量中,说明这个线程拿走了锁。
setExclusiveOwnerThread(current);
return true;
}
}
// 如果不为 0 意味着,锁已经被拿走了,但是,因为 ReentrantLock 是重
//入锁,是可以重复 lock,unlock 的,只要成对出现行。一次。这里还要再判
//断一次 获取锁的线程是不是当前请求锁的线程。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // 如果是的,累加在 state 字段上就可以了。
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
到此,如果如果获取锁,tryAcquire 返回 true,反之,返回 false,回到 AQS 的 acquire 方法。如果没有获取到锁,按照我们的描述,应该将当前线程放到队列中去,只不过,在放之前,需要做些包装。
先看 addWaiter 方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
用当前线程去构造一个Node对象,node是一个表示 Node 类型的字段,仅仅表示这个节点是独占的,还是共享的,或者说,AQS 的这个队列中,哪些节点是独占的,哪些是共享的。
创建好节点后,将节点加入到队列尾部,此处,在队列不为空的时候,先尝试通过 cas 方式修改尾节点为最新的节点,如果修改失败,意味着有并发,这个时候才会进入 enq 中死循环,“自旋”方式修改。
将线程的节点接入到队里中后,当然还需要做一件事: 将当前线程挂起!这个事,由 acquireQueued 来做。
在解释 acquireQueued 之前,我们需要先看下 AQS 中队列的内存结构,我们知道,队列由 Node 类型的节点组成,其中至少有两个变量,一个封装线程,一个封装节点类型。
而实际上,它的内存结构是这样的(第一次节点插入时,第一个节点是一个空节点,代表有一个线程已经获取锁,事实上,队列的第一个节点就是代表持有锁的节点):
黄色结点为队列默认的头节点,每次有线程竞争失败,进入队列后其实都是插入到队列的结点(tail 后面)后面。这个从 enq ()方法可以看出来,上文中有提到 enq ()方法为将节点插入队列的方法:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
接下来看AQS的acquireQueued()方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 如果当前的节点是 head 说明他是队列中第一个“有效的”节点,因此尝试获取,上文中有提到这个类是交给子类去扩展的。
setHead(node);// 成功后,将上图中的黄色节点移除,Node1 变成头节点。
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 否则,检查前一个节点的状态为,看当前获取锁失败的线程是否需要挂起。
parkAndCheckInterrupt())
// 如果需要,借助 JUC 包下的 LockSopport 类的静态方法 Park 挂起当前线程。知道被唤醒。
interrupted = true;
}
} finally {
if (failed) // 如果有异常
cancelAcquire(node);// 取消请求,对应到队列操作,就是将当前节点从队列中移除。
}
}
上面的代码有几处需要说明:
1.Node结点中,除了存储当前线程,结点类型,队列中前后元素的变量,还有一个叫waitStatus的变量,用于描述结点的状态。
原因是:AQS的队列中,在有并发时,会存取一定数量的结点,每个结点中都有表示线程状态的量,如果有些线程等不及获取锁,需要放弃竞争,退出队列;有些线程等待一些条件满足后才能执行(这里的描述很像某个 J.U.C 包下的工具类,ReentrankLock 的 Condition,事实上,Condition 同样也是 AQS 的子类)等等,总之,各个线程有各个线程的状态,但总需要一个变量来描述它,这个变量就叫 waitStatus, 它有四种状态:
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
分别表示:
- 1.结点取消
- 2.结点等待触发
- 3.结点等待条件
- 4.结点状态需要向后传播
只有当前节点的前一个节点为 SIGNAL 时,才能当前节点才能被挂起。
- 对线程的挂起及唤醒操作是通过使用 UNSAFE 类调用 JNI 方法实现的。当然,还提供了挂起指定时间后唤醒的 API,在后面我们会讲到。
到此为止,一个线程对于锁的一次竞争才告于段落,结果有两种,要么成功获取到锁(不用进入到 AQS 队列中),要么,获取失败,被挂起,等待下次唤醒后继续循环尝试获取锁,值得注意的是,AQS 的队列为 FIFO 队列,所以,每次被 CPU 假唤醒,且当前线程不是出在头节点的位置,也是会被挂起的。AQS 通过这样的方式,实现了竞争的排队策略。
释放锁
释放锁的流程简述如下:因为获取锁的线程的结点在AQS的头节点位置,所有先应该释放锁,再将头节点移除,然后让后一个做头节点,并通知它来竞争锁。
我们可以先从ReentrantLock的FairSync来说明:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unlock()方法调用了AQS中的release()方法,同样传入了参数1。
同样,release()方法为空方法,子类自己实现逻辑
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常。
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {// 因为是重入的关系,不是每次释放锁 c 都等于 0,直到最后一次释放锁时,才通知 AQS 不需要再记录哪个线程正在获取锁。
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
释放锁,成功后,继续执行unparkSuccessor()方法:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
值得注意的是,寻找的顺序是从队列尾部开始往前去找的最前面的一个 waitStatus 小于 0 的节点。
到此,ReentrantLock 的 lock 和 unlock 方法已经基本解析完毕了,唯独还剩下一个非公平锁 NonfairSync 没说,其实,它和公平锁的唯一区别就是获取锁的方式不同,一个是按前后顺序一次获取锁,一个是抢占式的获取锁,那 ReentrantLock 是怎么实现的呢?再看两段代码:
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
非公平锁的 lock 方法的处理方式是: 在 lock 的时候先直接 cas 修改一次 state 变量(尝试获取锁),成功就返回,不成功再排队,从而达到不排队直接抢占的目的。
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
.......
}
而对于公平锁:则是老老实实的开始就走 AQS 的流程排队获取锁。如果前面有人调用过其 lock 方法,则排在队列中前面,也就更有机会更早的获取锁,从而达到“公平”的目的。
参考资料
https://www.infoq.cn/article/jdk1.8-abstractqueuedsynchronizer