Java AQS
AQS直译为抽象同步队列,其内部封装了锁的状态,维护了请求锁的线程相关的数据结构。锁的状态只有两种,被持有或者释放。AQS如何维护这两种状态呢?通过其内部volatile变量state,state本身被volatile修饰,相关的三个方法getState()
、setState()
、compareAndSetState()
又是原子操作,保证了其多线程操作下的安全问题。state = 0
表示锁被释放或者说无任何线程持有;state > 0
表示锁已经被某个线程持有。
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
那么问题来了,某个线程持有了锁,某个是哪个?也就是说state从0变为1的时候需要设置类似CurrentThread这样的变量来标识当前持有锁的线程。
AQS父类AbstractOwnableSynchronizer
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
当state已经大于1也就是说某个线程持有了锁,这时有其它线程进来请求获取锁,当然是排队啦。AQS毕竟就叫队列,其内部维护的数据结构是双向链表。
AQS.Node
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
}
回过头来粗略总结一下,Lock.lock()请求锁就是把state值从0改为1并记录当前Thread为持有锁的线程exclusiveOwnerThread
,如果state已经为1,将当前线程添加到双向链表表尾。
这么一说可能还是有点懵逼,结合Lock接口的实现类ReentrantLock
请求锁lock()
释放锁unlock()
的API调用链看一遍流程就知道了。
ReentrantLock.lock()
private final Sync sync;
public void lock() {
sync.lock();
}
Sync
继承AQS实现了锁相关的功能,下面看Sync.lock()
abstract void lock();
抽象方法,子类实现。很多同学都知道ReentrantLock有公平锁和非公平锁的实现,这里先看非公平锁的实现NonfairSync.lock()
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
调用AQS.compareAndSetState(0, 1)
方法将state从0改为1。设置成功标识持有锁的线程setExclusiveOwnerThread
;设置失败调用AQS.acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcquire
是空实现,默认抛出异常。非公平锁NonfairSync重写了该方法,继续看NonfairSync.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
判断state是否为0:
- 0->cas修改state为1,修改成功标识CurrentThread。
- 大于0->判断当前线程是否已经获取了锁,已经获取继续+1以此实现可重入锁。
其它情况下tryAcquire()方法都返回false,回看AQS.acquire()
;tryAcquire()返回false时调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
AQS.addWaiter()
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
创建一个线程节点Node放到双向链表表尾,enq()
自旋直到节点成功添加到表尾并处理了空表的情况。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
AQS.acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
不必深究细节,须知此处线程入队后自旋获取锁,获取成功将当前线程置为头结点,返回线程是否被中断过interrupted
看一下ReentrantLock公平锁实现有何不同FairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
cas多了一个!hasQueuedPredecessors()
条件
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
判断获取锁的线程是否是头结点以此实现公平策略,所以公平非公平,其实AQS内的双向链表已经维护好了,取头结点判断即可。
接下来看释放锁ReentrantLock.unlock()
public void unlock() {
sync.release(1);
}
sync.release(1)
调用到父类AQS.release()
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
AQS.tryRelease()
调用到子类实现Sync.tryRelease()
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
获取当前state值并-1,Check释放锁的线程是否是持有锁的线程。state为0表示锁已被释放,currentThread同步置为null;state不为0重新设置state。锁释放成功后唤醒队列中等待的线程获取资源。
上述为独享锁的流程,实际上AQS还有对应实现共享锁的方法tryAcquireShared()
、tryReleaseShared()
。以共享锁CountDownLatch
为例,CountDownLatch
将任务拆分为N个子线程执行,state对应初始化为N。N个子线程并行执行,子线程任务执行完后都会调用countDown(),此时state通过cas减1。当所有子线程执行完毕也就是state为0时,unpark()主线程,主线程从await()函数返回。
说实话这玩意我没用过,看一下官方示例:
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // wait for all to finish
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
state初始化为线程数量,CountDownLatch
构造方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
和ReentrantLock一样,锁的实现交给了内部类Sync
,调用AQS.setState()
初始化state值
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CountDownLatch.countDown()
public void countDown() {
sync.releaseShared(1);
}
调用父类AQS.releaseShared()
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
回到子类实现Sync.tryReleaseShared()
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
自旋,cas操作state值减1,tryReleaseShared()返回值->操作后的state是否为0。如果为0,表示所有子线程任务都执行完毕,回到AQS.doReleaseShared()
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
唤醒队列中等待的节点,继续看CountDownLatch.await()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
调用父类AQS.acquireSharedInterruptibly()
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
回到子类实现Sync.tryAcquireShared()
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
当所有子线程执行完毕state为0时会走到AQS.doAcquireSharedInterruptibly()
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
将当前线程也就是执行await()
的线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,也就是state为0时调用的AQS.doReleaseShared()
。
独享锁或共享锁只需对应实现AQS的两组(tryAcquire()、tryRelease(),tryAcquireShared()、tryReleaseShared()
)方法实现state控制。