深入解析AbstractQueuedSynchronizer源码
2018-09-03 本文已影响0人
hcy0411
AbstractQueuedSynchronizer是juc包下面解决资源竞争的基础,功能主要包括三部分:
第一部分Condition监视器,已在Condition源码解析文章中做了分析。
第二部分独占模式,ReentrantLock,ReentrantReadWriteLock的写锁都是基于AbstractQueuedSynchronizer的独占锁实现的。
第三部分共享模式,ReentrantReadWriteLock的读锁,CountDownLatch,Semaphore都是基于AbstractQueuedSynchronizer的共享锁实现的。
AbstractQueuedSynchronizer实现由两个队列实现,一个Sync queue。一个Condition Queue,Condition Queue再第一篇文章中已介绍。
Sync queue是实现共享锁和独占锁的基础。
本文分析独占锁实现。
AbstractQueuedSynchronizer 类变量
// 记录Sync Queue 的头
private transient volatile Node head;
/**
* 记录Sync Queue 的尾部
* method enq to add new wait node.
*/
private transient volatile Node tail;
// 同步状态,lock,计数器,信号箱的资源都是维护这个状态,类似于资源总数
private volatile int state;
Node类
该类是两个队列实现的关键,Sync queue,Condition Queue里的节点都是Node节点
static final class Node {
/**
SHARED 和EXCLUSIVE 主要区分共享模式和独占模式
**/
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// 取消状态
static final int CANCELLED = 1;
// 该状态说明后继节点需要被唤醒,独占模式
static final int SIGNAL = -1;
// 该状态说明节点是Condition下的节点,参考Condition解析
static final int CONDITION = -2;
// 该状态用于共享模式,无条件传播唤醒
static final int PROPAGATE = -3;
// waitStatus 表示node节点的状态,0初始结束状态
volatile int waitStatus;
// Sync Queue是双向链表
volatile Node prev;
volatile Node next;
// node线程
volatile Thread thread;
// Condition queue 单向链表
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
独占获取资源方法
独占获取资源的入口如下:
// 忽略中断异常
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 抛出中断异常
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// 超时获取,并抛出中断异常
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
tryAcquire留给工具自己去实现,用于判断是否满足获取资源要求
本文重点分析acquireQueued,doAcquireInterruptibly,doAcquireNanos实现。
acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 独占模式一次只能唤醒一个节点,即唤醒head的后继节点,这里判断节点是否是head的后继节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 如果是head的后继节点,说明没有竞争,直接把改节点设置成head节点,然后返回中断状态
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果不是head的后继节点,说明存在竞争,需要封装改节点,然后等待被唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
/**
* 该方法检查是否满足节点被封装的要求
* 即pred的状态是否是SIGNAL,SIGNAL代表后继节点需要被唤醒
* 不满足要求,修改满足要求,返回false,再来一次
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 满足要求,返回true
if (ws == Node.SIGNAL)
return true;
// 大于0表示节点的前驱节点被取消,需要跳过已被取消的所有节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 否则把前驱节点设置成SIGNAL状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
// 该方法封装阻塞线程,runnable->waiting,被唤醒之后检查是否是中断唤醒
private final boolean parkAndCheckInterrupt() {
// 这里阻塞,线程状态由runnable变成waiting状态,直到被唤醒
LockSupport.park(this);
// 被唤醒之后,检查是否是中断唤醒
return Thread.interrupted();
}
cancelAcquire
//因异常取消获取节点
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null; // 1. 线程引用清空
Node pred = node.prev;
while (pred.waitStatus > 0) // 2. 若前继节点是 CANCELLED 的, 则也一并清除
node.prev = pred = pred.prev;
Node predNext = pred.next; // 3. 这里的 predNext也是需要清除的(只不过在清除时的 CAS 操作需要 它)
node.waitStatus = Node.CANCELLED; // 4. 设置成清除状态
if (node == tail && compareAndSetTail(node, pred)) { // 5. 若需要清除额节点是尾节点, 则直接 设置pred为尾节点,并删除predNext
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL || // 6. 如果pred没被取消,设置pred的waitStatus==SIGNAL 表示后继节点需要唤醒
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0) // 7. next.waitStatus <= 0 表示 next 是个一个想要获取lock的节点
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node); // 若 pred 是头节点, 直接唤醒下node的next节点。
}
node.next = node; // help GC
}
}
以上是独占方式获取资源的过程,其中doAcquireInterruptibly,doAcquireNanos两方法实现和acquireQueued实现区别不是很大,支持抛出中断异常,和超时等待,不做具体分析。
release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 如果head不为空,并列有后继节点需要被唤醒,直接唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 唤醒后继节点之前,先把node设置成最终状态0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 拿到node的后继节点,如果被取消,设置成null,遍历,直到拿到需要被唤醒的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//不为空,唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
以上就是独占模式下获取资源和释放资源的过程,之后会继续分析共享模式实现。