(七)Java并发编程之ReentrantLock
2019-11-27 本文已影响0人
陪安东尼的漫长岁月
ReentrantLock 是上文提到的 AQS 其中的一个实现类,是一个可重入的互斥锁,和 synchronized 有相同的基本行为和语义,但是具有扩展功能。它由上一次成功锁定并且尚未解锁的线程拥有。
ReentrantLock 源码初探 (JDK11)
- ReentrantLock 之构造器
// 创建公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 创建公平锁(true) or 非公平锁(false)
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
- ReentrantLock 之内部类 Sync
/** AQS 的实现类, 重写 AQS 中的 protected 方法
有两个实现类 1、NonfairSync 非公平锁,2、FairSync公平锁 */
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/** 尝试获取非公平锁 */
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) { //
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 判断state状态是否为0,不为0直接加锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current); //独占状态锁持有者指向当前线程
return true;
}
} // state状态不为0 但是锁被当前线程持有 则state+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; //加锁失败
}
/** 释放锁,公平锁和非公平锁的释放操作是相同的 */
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/** 判断持有独占锁的线程是否是当前线程 */
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 获取锁重入次数
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
- ReentrantLock 之内部类 NonfairSync,FairSync
非公平锁加锁逻辑由内部类Sync实现,一上来就会尝试获取锁资源;公平锁加锁逻辑由本身重写实现,在获取锁资源之前会判断队列中是否有正在等待的节点,如果没有才会尝试获取锁资源。他们之间的重入逻辑则是相同的,如果是当前节点的线程再次请求锁资源则会对该节点的 state + 1,表示重入的次数。
/** 非公平锁 */
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 重写AQS的方法逻辑,由AQS的 acquire(int arg) 方法调用
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires); // 这个方法由Sync实现
}
}
/** 公平锁 */
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 分配锁资源逻辑(这里只有分配成功的操作,对于分配失败的逻辑,在AQS中实现)
if (c == 0) {
// 队列中没有等待节点,则对当前线程分配锁资源
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入逻辑 (对当前state的值 +1 )
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
// 判断队列中是否有等待节点
public final boolean hasQueuedPredecessors() {
Node h, s;
if ((h = head) != null) {
// 当 head 的下一个节点为 null 或者 是等待被剔除状态的时候
if ((s = h.next) == null || s.waitStatus > 0) {
s = null; // traverse in case of concurrent cancellation
// 从 tail 节点往前遍历,获取最接近 head 的等待节点
for (Node p = tail; p != h && p != null; p = p.prev) {
if (p.waitStatus <= 0) // 过滤掉队列中撤销等待的节点
s = p;
}
}
// 该等待节点可以被唤醒(当前访问线程不是该等待节点的线程)
if (s != null && s.thread != Thread.currentThread())
return true;
}
// 队列中没有等待节点
return false;
}
加锁解锁过程
写到这里,对 ReentrantLock 所实现的逻辑有了一个大概的了解,但是可以发现,上面只有对加锁,解锁进行了操作,但是我们一直提及的节点,队列在上面并没有体现,那么我们加锁失败是怎么操作的呢,传说中的入队出队又是一个什么情况呢,接下来就对 ReentrantLock 的 lock() , unloc() 操作对整个加锁解锁逻辑串联进行梳理。
这里我们看一下非公平锁的操作吧。
- 入列操作
入列操作.png
- java.util.concurrent.locks.ReentrantLock.lock
public void lock() {
// 这里的 acquire 方法由 AQS 实现
sync.acquire(1);
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
/** !tryAcquire(arg) 代表获取锁的状态(修改stat,设置线程),获取成功返回true取反则跳出 accquire;
获取失败返回false取反则进行后续 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 操作*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
// 获取锁失败会进行addWaiter操作;将当前线程对应的节点添加到队列中,指向tail
private Node addWaiter(Node mode) {
Node node = new Node(mode);
// 自旋将当前获取锁失败的线程对应的节点添加到队列中,并指向tail。
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
// 返回的是当前请求线程对应的节点
return node;
}
} else {
// 初始化同步队列的操作,初始化结束后 head 和 tail 指向的是同一个节点
initializeSyncQueue();
}
}
}
private final void initializeSyncQueue() {
Node h;
if (HEAD.compareAndSet(this, null, (h = new Node())))
tail = h;
}
// 初始化独占模式的节点
/** java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
.Node(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node) */
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
// 这里的 node 为当前线程在队列中对应的节点,arg为获取锁时的标记状态(1)
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false; // 线程中断标记
try {
for (;;) {
final Node p = node.predecessor();
/** 如果node的前一个节点为 head 则尝试获取锁,
这里是下面中断的线程被唤醒后,重新进入循环,会执行这一段代码
这里并没有使用CAS来设置头结点,因为 tryAcquire 里面的CAS操作,
只能有一个线程进入到if里面的代码块*/
if (p == head && tryAcquire(arg)) {
// 加锁成功,将当前节点设置为头节点,代表当前节点出队。
setHead(node);
p.next = null; // help GC
return interrupted;
}
// 判断是否可以挂起(通过当前节点的前一个节点的waitStatus属性判断是否可以被唤醒)
// 如果该线程被唤醒,则继续循环执行上面的加锁操作,唤醒后将该节点剔除队列
if (shouldParkAfterFailedAcquire(p, node))
// 执行挂起操作
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
// 以上操作抛出异常的时候,撤销本次请求
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire
// 这里传入的 node 为当前访问线程对应的节点,
// 确保前节点状态是可以被唤醒
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前一个节点是等待状态,则返回true执行挂起操作
return true;
if (ws > 0) {
/** 头节点的 waitStatus 值是0,这里是从当前节点(tail)往前遍历,
取最近的一个等待状态的节点 or head节点*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/** waitStatus必须为0或PROPAGATE。CAS设置属性之后重新进入判断*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt
// 对当前线程执行挂起操作
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer#cancelAcquire
// 在执行挂起操作发生异常时,取消正在获取资源的请求
// 这里的node是当前请求线程对应的节点
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 从当前节点(也就是尾节点)向前遍历,找到最近的一个等待的节点退出循环
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 这里的 predNext 是距离尾节点最近的等待节点的下一个节点(不一定是当前节点)
Node predNext = pred.next;
// 标记当前节点为取消状态,其余节点则可以跳过该节点
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,则移除自己
if (node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {
// 这里是将前一个等待节点 next 设置尾当前节点的 next,将当前 node 剔除;
// 假如中间有撤销节点的话,这样操作也会将其过滤掉;
Node next = node.next;
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);
} else {
// 当前节点是head节点 or 当前节点 waitStatus 为 PROPAGATE 时进入当前逻辑
// 唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
- 出列操作
同步队列(CLH)遵循FIFO,首节点是获取同步状态的节点,首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点。设置首节点是通过获取同步状态成功的线程来完成的(获取同步状态是通过CAS来完成),只能有一个线程能够获取到同步状态,因此设置头节点的操作并不需要CAS来保证,只需要将首节点设置为其原首节点的后继节点并断开原首节点的next(等待GC回收)应用即可。
出列操作.png
- java.util.concurrent.locks.AbstractQueuedSynchronizer#release
// tryRelease 修改实现类的state修改和节点线程解绑,成功返回true,从头节点开始出队
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
// 这里是判断下一个节点是否撤销等待,
// 如果撤销的话,找下一个等待节点(从tail开始往前找最远的一个等待节点)
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
// 对其执行唤醒操作
if (s != null)
LockSupport.unpark(s.thread);
}
总结:AQS将获取锁,释放锁的操作交由子类去实现,这里是由ReentrantLock的公平锁,非公平锁来实现;其本身实现了共享模式和独占模式的入列出列操作,这里的代码看的是独占模式的入队出队操作。获取锁失败则将当前线程的对应的节点添加到tail,添加时要保证它的前一节点是可以被唤醒的,添加成功后将当前线程挂起;唤醒操作是对head的后继节点进行唤醒,唤醒后会重复执行入队操作中的头节点获取锁的逻辑,获取成功,即可跳出循环。