再谈ReentrantLock
经过对AQS坚持不懈地研究,我们明白了,AQS定义了一种排队方式,解决那些暂时未获取到锁的线程该何去何从的问题,但是并没有告诉我们该如何获取、释放资源,而是留给了自定义同步器去实现。那本小节我们去研究下ReentrantLock是怎么获取和释放同步资源的,ReentrantLock又分为偏向锁(默认)和非偏向锁。
ReentrantLock有三大概念:可重入、公平锁&非公平锁、Condition等待/通知机制
锁的可重入是怎么实现的
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
setState是AQS里的方法:
protected final void setState(int newState) {
state = newState;
}
private volatile int state;
-- 这个变量用来记录线程获取锁的状态
ReentrantLock在获取锁的时候,如果判断出来当前线程就是锁的拥有者(current == getExclusiveOwnerThread()),就直接给这个状态state加上acquires值,
其实就是1,所以每次重入ReentrantLock锁时+1,在释放锁时也是每次-1,直到state=0才表示锁完全释放掉。
公平锁&非公平锁
首先,我们要知道ReentrantLock默认是非公平锁:
public ReentrantLock() {
sync = new NonfairSync();
}
当然也可以在创建锁的时候指定是公平的还是非公平的:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
image.png
NonfairSync还有个大兄弟FairSync
我们带着疑问往下看,什么是公平的什么是非公平的:
公平锁加锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这段代码主要分为两块:1)if (c==0);2)else if (current == getExclusiveOwnerThread())
- if (c==0)
尚没有锁获取到资源,这有可能是因为当前线程是头一个来抢锁的,也有可能是因为排在AQS首位的线程刚刚释放掉资源,保险起见还是先看看AQS里面有没有线程在等待:
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires))
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
什么情况下说明AQS里有前驱节点呢?
head和tail不是同一个节点,且head没有next或虽然有next但next不是当前线程。
(说明下,head的next节点就是等待队列的首个线程,head其实就是一个队列开始的标记,没有实际意义。)
判断出来没有线程在等待获取锁,尝试用CAS去获取资源:
compareAndSetState(0, acquires)
看看上面的加锁,acquires会传1
如果CAS成功,当前线程就获取到独占锁:setExclusiveOwnerThread(current)
- else if (current == getExclusiveOwnerThread())
当前线程本来就是资源的独占锁,说明之前已经获取到锁了,ok,直接对state累加:
int nextc = c + acquires;
非公平锁加锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
也不管AQS里有没有人在排队,上去就是干,上去就compareAndSetState(0, 1),万一干成功了呢?
万一成功了,就锁住资源呗:setExclusiveOwnerThread(Thread.currentThread())。
万一没成功呢?秒怂,try下试试,acquire(1),跟踪下,这个方法在AQS里面定义,
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这不就前面独占式获取同步状态的方式吗?尝试获取,失败就加入队列等着。
AQS的tryAcquire有这么几种实现:
image.png
显然我们要的是非公平锁的实现
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
(又跳回ReentrantLock类了)
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
跟上面公平锁的tryAcquire()方法对比下,仅仅是在if (c==0)后面少了个!hasQueuedPredecessors(),意思是这会儿共享资源的加锁状态是空的,我不管是因为没人来抢,还是因为head刚释放掉,我上来就用CAS去抢锁。
为什么要这么设计呢?少了个hasQueuedPredecessors()就不用再判断是不是又前驱节点了,效率上会有提升。当然我是我的想法,存在即合理,既然设计出来两种加锁方式,说明不会有绝对优劣。
解锁
public void unlock() {
sync.release(1);
}
解锁就一个,没有区分公平非公平
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
如果tryRelease释放掉了锁,就通知后继节点(unparkSuccessor)。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
如果减完1还剩0,说明资源已经完全释放了,那就清空对资源的占有(setExclusiveOwnerThread(null))。
如果减完1还>0,说明还没释放干净(重入锁lock了3次,释放了2次,就还剩1次),更新state值就好了,不释放资源。
图例
-
Lock - No contention (thread 0 acquire lock)
image.png -
Thread-1 来获取锁失败
image.png -
进入acquire -> addWaiter 流程
image.png -
acquireQueued ,前置虚节点 value 设置为SIGNAL
image.png -
Thread-2 , Thread-3 如此类推
image.png -
Thread-0 释放锁
image.png -
Thread-1 获得锁
image.png -
在第7步时假设有个thread-4 来竞争则谁抢到算谁的,此时thread-4抢到了锁
image.png -
再看Reentrant
image.png -
Interruput 处理
image.png -
FaireSync
image.png