不怕难之ReentrantLock及其扩展
一、问题导读
1. ReentrantLock公平锁和非公平锁有什么区别
2. obj.wait() 和 condition.await() 有什么区别
3. Condition中同步队列和条件队列是如何交互的
二、ReentrantLock实现原理
ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。
Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁),默认为NonfairSync(非公平锁)
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
线程抢锁之公平锁
static final class FairSync extends Sync {
final void lock() {
acquire(1); // 请求获取锁
}
// 因为有可能直接就成功了呢,也就不需要进队列排队了,
// 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 入同步队列
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 没有线程占用锁
// 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 重入过程
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
线程抢锁之非公平锁
static final class NonfairSync extends Sync {
final void lock() {
// 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// AbstractQueuedSynchronizer.acquire(int arg) 父类中方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { // 这里直接抢锁
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
总结:公平锁和非公平锁只有两处不同
非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。
公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。
相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。
同步队列入队流程
线程解锁
// 唤醒的代码还是比较简单的,跟加锁类似
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 往后看吧
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 通知后置节点
return true;
}
return false;
}
// 回到ReentrantLock看tryRelease方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否完全释放锁
boolean free = false;
// 其实就是重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 唤醒后继节点// 从上面调用处知道,参数node是head头结点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 如果head节点当前waitStatus<0, 将其修改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)
// 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
三、Condition实现原理
同步队列与条件队列交互流程
阻塞await实现原理
// 首先,这个方法是可被中断的,这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加到 condition 的条件队列中
Node node = addConditionWaiter();
// 释放锁,返回值是释放锁之前的 state 值
int savedState = fullyRelease(node);
int interruptMode = 0;
// 这里退出循环有两种情况,之后再仔细分析
// 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了
// 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被唤醒后,将进入阻塞队列,等待获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters(); // 清除队列中已经取消的节点
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
整个await的过程如下:
1.将当前线程加入Condition锁队列。特别说明的是,这里不同于AQS的队列,这里进入的是Condition的FIFO队列。
2.释放锁。这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。
3.自旋(while)挂起,直到被唤醒(signal把他重新放回到AQS的等待队列)或者超时或者CACELLED等。
4.获取锁(acquireQueued)。并将自己从Condition的FIFO队列中释放,表明自己不再需要锁(我已经拿到锁了)。
唤醒signal实现原理
// 唤醒等待了最久的线程// 其实就是,将这个线程对应的 node 从条件队列转移到同步队列
public final void signal() {
// 调用 signal 方法的线程必须持有当前的独占锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); //唤醒操作
}
// 从条件队列队头往后遍历,找出第一个需要转移的 node// 因为前面我们说过,有些线程会取消排队,但是还在队列中
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
final boolean transferForSignal(Node node) {
// CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消,
// 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点 // 否则,将 waitStatus 置为 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//enq(node): 自旋进入阻塞队列的队尾 // 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
// ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程,之后的操作看下一节 LockSupport.unpark(node.thread);
return true;
}
整个signal过程如下
从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。
对每个节点执行唤醒操作时,将节点加入同步队列,再等待同步队列被唤醒。
案例剖析
public class TestCondition {
public static void main(String[] args) {
final ReentrantLock reentrantLock =new ReentrantLock();
final Condition condition = reentrantLock.newCondition();
new Thread(()-> {
reentrantLock.lock();
System.out.println(Thread.currentThread().getName() +"拿到锁了");
System.out.println(Thread.currentThread().getName() +"等待信号");
try {
condition.await();
}catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +"拿到信号");
reentrantLock.unlock();
}, "线程1").start();
new Thread(()-> {
reentrantLock.lock();
System.out.println(Thread.currentThread().getName() +"拿到锁了");
try {
Thread.sleep(3000);
}catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +"发出信号");
condition.signal();
reentrantLock.unlock();
System.out.println(Thread.currentThread().getName() +"unlock结束");
}, "线程2").start();
}
}
同步队列和条件队列交互过程
AQS的同步队列,Condition的条件队列开始交互:
1. 线程1调用reentrantLock.lock时,尝试获取锁。如果成功,则返回,从AQS的队列中移除线程;否则阻塞,保持在AQS的等待队列中。
2. 线程1调用await方法被调用时,对应操作是被加入到Condition的等待队列中,等待signal信号;同时释放锁。
3. 锁被释放后,会唤醒AQS队列中的头结点,所以线程2会获取到锁。
4. 线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。注意,这个时候,线程1 并没有被唤醒,只是被加入AQS等待队列。
5. signal方法执行完毕,线程2调用unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,线程1被唤醒,线程1恢复执行。
所以:
发送signal信号只是将Condition队列中的线程加到AQS的等待队列中。只有到发送signal信号的线程调用reentrantLock.unlock()释放锁后,这些线程才会被唤醒。
可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。
总结:
ConditionObject由AQS提供,它实现了类似wiat、notify和notifyAll类似的功能。Condition必须与一个独占锁绑定使用,在await或signal之前必须现持有独占锁。Condition队列是一个单向链表,他是公平的,按照先进先出的顺序从队列中被“唤醒”,所谓唤醒指的是完成Condition对象上的等待,被移到Sync锁等待队列中,有参与竞争锁的资格(Sync队列有公平&非公平两种模式,注意区别)。