15. 并发终结之AQS
2020-09-26 本文已影响0人
涣涣虚心0215
AQS是抽象队列同步器,基于模板方法模式提供了很多锁相关的方法,前面我们分析的Condition,ReentrantLock,CountDownLatch、Semaphore等都是基于AQS来实现的。
AQS中维护了一个volatile int state变量,以及基于Node对象的FIFO线程等待队列(sync queue)。
NonFair非公平锁具体实现
abstract static class Sync extends AbstractQueuedSynchronizer {
//NonFair提供lock的实现
abstract void lock();
//NonFaire尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//尝试释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
static final class NonfairSync extends Sync {
//lock的具体实现
final void lock() {
//这里不管是不是有前继节点,上来就开始CAS设置state,设置成功则抢锁成功
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//如果开始抢锁不成功,则调用AQS的acquire方法
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
========
//AQS实现acquire方法 -- lock
public final void acquire(int arg) {
//这里tryAcquire是NonFair里面的实现,nonfairTryAcquire还是直接开始CAS更新state抢锁
//如果抢锁失败,则addWaiter新建一个EXCLUSIVE的node加到sync queue的末尾
//接着调用acquireQueued来循环尝试加锁
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//Condition里面也有分析,再次看下这个acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//拿到当前node的前继节点,如果前继节点是head,才会尝试获取锁
if (p == head && tryAcquire(arg)) {
//获取锁成功后,表示当前线程可以执行,就将当前node设置为head节点,并剔除原来的head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果前继节点不是head,或者tryAcquire获取锁失败
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//在前继节点不是head,或者tryAcquire获取锁失败,就要看前继节点的状态,来确定是否需要挂起当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果前继节点是SIGNAL,则表示当前线程可以挂起等待前继节点唤醒
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
//ws>0表示cancelled,则需要循环遍历剔除cancelled的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//尝试将前继节点设置为SIGNAL,因为是再循环里,这里设置成功,下次循环就会挂起当前线程,否则会继续循环判断前继节点
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//AQS实现release方法 -- unlock
public final boolean release(int arg) {
//调用ReentrantLock里面Sync提供的tryRelease方法
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//拿到头结点,并且头结点的waitStatus不是0,则唤醒一个后继节点
unparkSuccessor(h);
return true;
}
return false;
}
//唤醒后继节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
//head节点状态<0可能SIGNAL或者CONDITION状态,设置到0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//那到后继节点,如果状态>0则是cancelled状态, 需要遍历剔除这些node,直到找到<=0的node
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//找到状态<=0的node之后,unpark唤醒这个node所对应的线程
LockSupport.unpark(s.thread);
}
在下面栗子里,lock1-thread线程第一个获取锁,获得锁之后sleep一段时间,然后lock2-thread以及lock3-thread也开始获取锁,获取锁失败。
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
new Thread(()->{
lock.lock();
try{
Thread.sleep(100000); //获取锁之后sleep,但是不会释放锁
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "lock1-thread").start();
Thread.sleep(10000);
new Thread(()->{
lock.lock();
try{
System.out.println("lock2 invoked");
} finally {
lock.unlock();
}
}, "lock2-thread").start();
Thread.sleep(10000);
new Thread(()->{
lock.lock();
try{
System.out.println("lock3 invoked");
} finally {
lock.unlock();
}
}, "lock3-thread").start();
}
运行结果
可以看到这里lock2-thread和lock3-thread都获取不到锁,变成WAITING状态(这个是与synchronized有区别的,同样的逻辑用synchronized,获取不到锁时BLOCKED状态),直到lock1-thread调用unlock方法,找到sync queue里不是cancelled状态的后继节点进行unpark唤醒。
CLH FIFO队列
上面的图是AQS加锁过程中FIFO队列生成情况,我们总结一下加锁的过程:
- lock1-thread线程调用lock()方法,因为默认是NonFair非公平锁实现,所以调用NonFair.lock()方法,进行CAS设置state的值,设置成功表示获取锁成功,然后进入sleep()。
- lock2-thread线程开始调用lock()方法,同样调用NonFair.lock(),此时lock1-thread持有锁,那么此时CAS设置state失败。
2.1 进入acquire(1)方法, acquire方法会调用tryAcquire()->nonfairTryAcquire()方法再次尝试CAS更新state,同样还是失败。
2.2 进入acquireQueued(addWaiter(Node.EXCLUSIVE)流程,这里首先调用addWaiter()方法添加一个包含lock2-thread的Node,并添加到FIFO的尾部tail,注意这里会先new Node()作为head。
2.3 节点添加完之后,acquireQueued()方法开始for循环,拿到当前节点的前继节点,如果前继节点是head,就开始尝试tryAcquire()获取锁。如果获取锁成功,就将当前节点设置为head节点,并剔除原来的head节点,当前线程获取锁之后继续执行;如果获取锁失败,调用shouldParkAfterFailedAcquire()方法,查看前继节点的waitStatus是不是SIGNAL,如果是,则挂起lock2-thread;如果不是还要看状态是不是CANCELLED,如果是,就遍历下一个节点直到找到waitStatus<=0的节点;如果不是CANCELLED,就CAS设置前继节点的状态到SIGNAL;然后开始下一次for循环,下一次循环还不能获取到锁,基本上都会走到挂起当前线程的代码(acquireQueued过程是不响应中断的,只有获取资源后才能判断当前线程是否中断)。 - lock3-thread同理lock2-thread,获取不到锁之后会CAS设置前继节点的状态到SIGNAL,然后挂起当前线程,等待unlock唤醒。
Synchronized和AQS的区别
Synchronized:
- Synchronized关键字是底层C++实现,存在两个重要的数据结构Entry Set和Wait Set。
- Wait Set中存放的是调用了Object.wait()方法的线程对象(被封装成C++的Node对象)。
- Entry Set中存放的是争抢锁失败陷入到阻塞状态,需要获取monitor的那些对象。
- 当一个线程被notify之后,它就会从Wait Set中移动到Entry Set中。
- 当进入到Entry Set后,该线程依然需要与其他线程争抢monitor对象(ReentrantLock的非公平实现)。
- 如果争抢到,表示该线程获取到了对象的锁,它就可以以排他方式执行对应的同步代码。
AQS:
- AQS也存在两种队列,分别是ConditionObject上的条件队列condition queue,以及AQS本身的阻塞队列sync queue。
- 这两个队列中的每一个对象都是Node实例(里面封装了Thread对象)
- 当位于Condition条件队列的线程被其他线程signal之后,该线程就会从condition queue中移动到sync queue,这里await方法会继续尝试获取锁(公平或者非公平)。
- 位于AQS阻塞队列中的Node对象本质是由一个双向链表来构成。
- 在获取锁时,这些进入到sync queue中的线程会按照在队列中排序先后尝试获取。
- 当sync queue的线程获取到锁后,就表示该线程已经可以正常执行了。
- 陷入到阻塞状态的线程,依然需要进入到操作系统的内核态,进入阻塞(LockSupport.park方法实现)