AQS(AbstractQueuedSyncronizer)
参考文档:
美团2019技术合集
前言
Java 中的大部分同步类(Lock、Semaphore、ReentrantLock 等)都是基于AbstractQueuedSynchronizer(简称为 AQS)实现的。AQS 是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架
- 结合ReentrantLock来进行学习
Lock和Syncronized的对比
data:image/s3,"s3://crabby-images/bf1a5/bf1a521376e9f6fe7d3b4f8687a2796156be0e4f" alt=""
原理概览
AQS 核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是 CLH 队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
AQS 中的队列是CLH 变体的虚拟双向队列(FIFO),AQS 是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配
整体思路理解
- 1.Lock和AQS是什么关系
各式各样的Lock如ReentrantLock是基于AQS实现的。
public class ReentrantLock implements Lock, java.io.Serializable {
...
static final class NonfairSync extends Sync {
....
...
}
...
}
abstract static class Sync extends AbstractQueuedSynchronizer
{}
Lock的lock等API方法底层是同步内部静态类Sync,也就是AQS的子类实现的。而AQS的底层实现是CLH Node的变种双向链表
以ReentrantLock的非公平锁来梳理细节
data:image/s3,"s3://crabby-images/483f1/483f14fa5fe37f2b1233f6dfe1591179e1e9828b" alt=""
- 自定义同步器体现在哪里,就是在自定义AQS的tryAcquire是的实现里
1.线程调用lock
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
- 第一个线程,将AQS的state置为1,设置独占线程为当前线程
- 第二个线程,调用 acquire(1);
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
可以看到,它又会去调用自定义的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//当持有锁的线程没释放时,state为1
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//可重入的思路
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可见第一个条件tryAcquire(arg)为false,取非为真
第二个条件acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
如果尾结点为null,直接调用enq方法,里面会自旋设置尾结点
如果尾结点不为null,设置前驱节点为尾结点,并发情况下,第一个设置成功返回。第二个就要enq去自旋设置尾结点
将节点写入后,我们会不断的轮询该节点的前驱节点,直到前驱节点释放,
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
标记等待过程中是否中断过
boolean interrupted = false;
开始自旋,要么获取锁,要么中断
for (;;) {
final Node p = node.predecessor();
如果 p 是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
说明 p 为头节点且当前没有获取到锁(可能是非公平锁被抢占了)
或者是 p 不为头结点,这个时候就要判断当前 node 是否要被阻塞(被阻塞条件:前驱节点的
waitStatus 为 -1),防止无限循环浪费资源。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
data:image/s3,"s3://crabby-images/2a438/2a438e7208eb674106e3583e83485e69192b5c3f" alt=""
从上图可以看出,跳出当前循环的条件是当“前置节点是头结点,且当前线程获取锁成功”。为了防止因死循环导致 CPU 资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起并不是所有节点都在自旋,这里还是有优化的,在node的state值里有表述
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
当你的前驱节点是signal状态,当前节点会park,需要被unpark
data:image/s3,"s3://crabby-images/a36ad/a36ad408a2dc994ac4790bef2cdd4828a8be4426" alt=""
那什么时候unpark
release代码
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release将节点状态改变后,unpark方法去进行唤醒.将头结点传进行,当头结点的后一个节点不为null时,即unpark
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}