AQS源码分析
一、AQS是什么
AQS(AbstratctQueuedSynchronizer)是一个抽象类的同步器,使用了模板方法的设计模式,也就是在抽象类AQS中定义算法的骨架,然后具体的子类需要重写AQS的一些方法,这样就能实现不同场景所需要的同步器,java中的ReetrantLock,ReentrantReadWriteLock等很多锁都是基于AQS实现的。AQS简单来说就是有一个当前工作线程并且维护一个被锁定的线程队列,并且实现独占锁或者是共享锁。
大概原理是这样的
image
二、AQS怎么使用
1. 模板方法
- 模板方法使用继承,在父类中定义了一系列算法骨架,和一些可复用的方法,子类继承父类的方法,在不同的应用场景重写算法,满足不同需求。
- 模板方法和策略模式很像,都是在高层定义算法,底层实现具体的实现,模板方法多是用继承实现,而策略模式通过委托实现。策略模式的好处就是耦合比较低,但是AQS是用模板方法实现的,原因就是可以在模板方法定义一些公共的方法,子类可以复用,而策略模式则不能,策略模式更多体现的是可替换性,也就是不同的算法之间可替换,而不会对应用造成影响。
2. 自定义AQS
- 了解了模板方法之后,我们就知道如何使用AQS自定义同步器了,也就是要继承AQS,然后重写它的一些方法
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
- 独占方式重写tryAcquire和tryRelease,共享方式重写tryAquireShared和tryReleaseShared方法
- 重写实现的主要是对资源state的占用和释放方式。
- 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
- 学习一下ReetrantLock的源码,我们大概就知道怎么使用了。
三、学习源码之前需要了解
1. 线程的状态
imageimage
当线程进入到synchronized方法或者synchronized代码块时,线程切换到的是BLOCKED状态,而使用java.util.concurrent.locks下lock进行加锁的时候线程切换的是WAITING或者TIMED_WAITING状态,因为lock会调用LockSupport的方法。
2. CAS+volatile实现线程安全
- volatile语义可见性+有序性,但不能保证并发安全,使用场景:
(1)对变量的写操作不依赖于当前值。
(2)该变量没有包含在具有其他变量的不变式中。
- CAS则是一种乐观锁策略,通过一条处理器级指令保证并发安全,如果没有获得锁,就会进入自旋尝试获取锁。也就是在竞争不是很大的时候的一种并发策略。
- CAS+volatile的使用既保证了这个变量的可见性,又能在存在竞争的时候线程安全地获取资源,虽然volatile修改存在并发安全问题,但是用CAS可以避免这个问题。
- 至于什么时候不用synchronized而是CAS+volatile非阻塞乐观锁?应该在synchronized的阻塞造成了系统性能的瓶颈的时候,因为CAS+volatile的编写更加复杂
三、AQS源码分析
1. 数据结构
static final Node SHARED = new Node();//共享模式的标记
static final Node EXCLUSIVE = null;//独占模式的标志
static final int CANCELLED = 1;//表明线程已经取消
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
//一种线程等待状态,需要工作线程唤醒
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;//一种等待状态,表明线程有条件性等待
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:状态域,包括以下值
* SIGNAL: * 这个节点的继任者被阻塞(通过park),所以当前线程当释放或者取消的时候必须要唤醒它的继任者
为了避免竞争,获取方法需要先表明signal,然后原子性获取。失败之后进入bolck状态
* CANCELLED:
这个节点由于超时或者中断被取消。节点不释放它的资源。尤其,一个个被取消的节点不会再一次blocks
* CONDITION:
这个节点当前在一个条件等待队列
* PROPAGATE:
非负表明节点不需要唤醒。所以,大多数代码不需要检查特点值,只是用来标记
这个参数初始化为0
condition会初始化为CONDITION,通过CAS修改值,或者无条件地volatile写**/
volatile int waitStatus;
/*当前工作线程的前继线程
由于head线程总是不取消,所以一个节点只能通过aquire获取。一个取消的线程不能aquire。一个线程总是自我取消,而不是其他节点来取消*/
volatile Node prev;
volatile Node next;
/*入队的线程.构造函数初始化,使用后置为null*/
volatile Thread thread;
Node nextWaiter;
- waitStatus表示当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。
- CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
- SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
- CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
- PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
- 0状态:值为0,代表初始化状态。
/**
等待队列头节点,除了初始化,通过setHead修改。注意:如果头存在,它的等待状态不能是CANCELLED.
*/
private transient volatile Node head;
/**
尾节点,通过enq方法添加新的等待节点
*/
private transient volatile Node tail;
/**
* 资源state.
*/
private volatile int state;
-
AQS维护着一个队列,维护着共享资源的访问如下
image
state的有三种访问方式
/**
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
- 那么它为什么有两种方式修改state呢,我觉得可能是使用场景不同吧
2. 独占模式下的获取acquire
2.1. acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们可以看到这个入口函数是final修饰的,说明设计的时候不希望子类修改这个方法,它先是用tryAcquire获取,如果获取成功就返回;addWaiter将该线程加入等待队列的尾部,并标记为独占模式;acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false;如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
2.2 tryAquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这个方法通过protected修饰,只有子类访问,也就是在创建同步器的时候自定义获取方式。这个AQS是抽象类,所以只抛出异常
2.3 addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
这个方法先尝试快速入队,如果成功就返回,否则通过enq入队
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq主要是通过自旋一直尝试将节点插入尾部,所以可以在快速尝试失败后调用。这个就是CAS自旋volatile的经典使用
2.4 acquiredQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
}
} finally {
if (failed)
cancelAcquire(node);
}
}
由于尝试获取资源失败,于是进行等待队列等待别的线程结束唤醒自己。如果自己的前任节点是head,并且获取成功,返回;不然就在waiting中一直等待
2.5
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;//如果已经设置了前结点释放资源后会唤醒它,那么就可以返回休息了
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
/*
* 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。 */
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
不是SIGNAL,需要加入等待队列等待,同时尝试下看有没有机会轮到自己
2.6 总结
流程如下
image
3. 独占模式下的release
3.1 release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
直接利用tryRelease释放线程,因为独占模式下只有head线程占有资源,因此直接释放即可。然后要唤醒一个等待线程
3.2 tryRealse
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
这个方法也是需要在子类重写的,父类直接抛出异常。重写的时候如果释放资源成功返回true,否则返回false
3.3 unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
//头节点的线程状态<0,那么利用cas把它状态设置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//从tail往前遍历,寻找最前面需要唤醒的节点,然后unpark唤醒
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
这个函数就是寻找继任线程,并将它唤醒
4. acquireShared
- 然后我们继续研究非独占式的获取共享资源和释放共享资源
4.1 acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
通过tryAcquiredShared获取资源,如果没有获取到,执行doAcquireShared
4.2 tryAcquireShared
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
类似于tryacquire,抽象类中直接抛出异常
4.3 doAcquireShared
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
//共享节点加到队列
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取node的前继节点
final Node p = node.predecessor();
if (p == head) {
如果前继是head,看能不能获取资源
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果还有资源,要唤醒下一个等待节点 setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
20 //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这个函数试图获取资源,并且将head设置为signal,表示等他释放资源了就能唤醒它的next。这里的资源只能是head.next获取,体现了公平性
4.4 setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);//node设置为head
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//释放head的资源
doReleaseShared();
}
}
4.5 总结
这个非独占式的获取跟acquire区别不是很大,无非就是线程拿到资源之后如果发现资源有剩余,会唤醒下一个正在等待的线程,而且唤醒的是临近等待的线程,体现了公平性,但是性能会差一点
5. releaseShared
- 这个跟独占模式下的释放区别不大,主要也是释放资源,唤醒线程。区别主要在于独占模式需要state=0之后才唤醒其他线程,而非独占模式只要剩余资源就回去唤醒线程