java语言之二- Concurrent Lock源码解读
Lock所处的位置
见上一节《java并发知识体系简要指南》知识体系图
image.png
在unsafe中提供了两个关键机制:
- CAS,CPU提供的原子指令
- park/unpark, 调用这个native函数会将线程调度和停止调度
由于安全机制,unsafe不能被直接调用,但是可以被虚拟机代码调用,而LockSupport则封装了这两个机制,可以供程序员调用,而为了实现各种场景的ECF(异常控制流),Concurrence的作者 设计了AbstractQueueSychronizer这个中间类,继承这个中间类,就可以轻松实现各种场景的ECF语义。
而这个中间类的代码是设计巧妙的,比较难懂的,最好的方法是懂得原理的情况下,使用各种ECF调试。
理解AbstractQueueSychronizer的关键:
- 数据结构,使用的最简单的双向链表
- 操作系统底层,就是上面介绍的LockSupport封装
- 控制逻辑,主要是通过state和Node中的waitState控制的
各种异常流
ReetrantLock
需要理解:
- 什么时候线程被加锁什么时候解锁
对于线程不能用串行化的思想来考虑问题,否则就不容易理解了,lock()内部,需要了解的就是什么时候加锁解锁,关键代码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 添加节点在addwaiter
selfInterrupt();
}
acquireQueued的关键代码
for (;;) { // 恢复后代码又在这里运行,这里是种递归的运行方式, node就是每一个新节点,这里有点玄妙, release后唤醒的必然是head的后继节点, 这时会在这里更新头节点,或者说进行节点 的删除操作,然后lock的代码退出,进入业务逻辑代码
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { # 2. 有可能需要重新加锁
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) # 1. park后代码该线程运行就在这里停止了
interrupted = true;
}
如上,当线程调用lock的时候,该节点会被加入到queue中,如果lock已经被占用,则调用tryAcquire不成功,会调用parkAndCheckInterrupt,操作系统会将该线程挂起。
当前线程unlock的时候, 会首先将后继节点unpark
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // release操作的时候会释放后继节点,让后继节点运行起来
return true;
}
return false;
}
后继节点unpark后,会从上述“# 1. park后代码该线程运行就在这里停止了”标记处恢复运行,跳出死循环,跳出Lock()函数,执行线程本身的逻辑代码
- 基类AbstractQueueSychronizer中的queue的运转
如上图,当有两个线程都同时调用lock()的时候,ReetrantLock的队列如上图,头结点的thread是null,这是已经在执行的当前线程。
如上代码解释,当前线程执行unlock后,会执行unparkSuccessor, 其逻辑是当前节点的waitstate == -1,就找到后继节点,如上后继节点中thread = thread1,就执行unpark(thread1),恢复thread1的运行,queue如下:
image.png
如此往复执行,直到Node3的节点执行完毕,此时队列中就只剩下头结点了。
ReadWriteReetrantLock
关键代码:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r); # 在这里加入到队列
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) # 1. 执行了park
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, long propagate) {
Node h = head; // Record old head for check below
setHead(node); #删除头节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
例如说一堆线程运行,加read锁或者加write锁,执行lock后,queue如下:
image.png
如上,thread为Exclusive表示是写锁,是独占的,thread是Shared表示是读锁,共享的。运行过程如下:
- 写锁运行时,后续的线程都被阻塞了。
- 当写锁完成,发现waitstate=-1,解锁后续线程,后续线程是shared的,它会一直解锁线程,直到所有连续的shared线程都解锁。
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
这段代码的关键是,Node2、Node3 unpark后执行跳出lock时会增加state计数,执行完成是执行unlock则会减少state计数,只有当state计数减少为0的时候,才会引发Node4的解锁
- Node4是独占锁,和1执行相同,如此往复。
semaphore
假定semaphore被初始化为2,有5个线程启动加semaphore
image.png
注意看这里的关键点state,state就是表示有几个可解锁的park。
关键代码
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
当state 不为0的时候,获取锁总是能成功,此时有2个线程获取锁成功,其它线程获取不到锁会加入到queue中。
正在运行某个线程运行完毕,则会将state加1,并且删除queue头结点,唤醒后续结点,如此往复,一直保持有2个线程可以运行
由于只有2个线程在运行,因此state最大只能还原为2.
Cond
Cond需要有一把锁,用于保护condition对wait queue转移到queue。
image.png
如图,当有两个线程调用了wait()后,均被阻塞,在wait queue中有两个结点。
当调用了signal后,讲产生结点转移
Node1从wait queue转移到queue中,当锁unlock后,会去掉头结点,并去头结点的waitstate=-1,将会唤醒后续结点node1.
如此往复,直至所有的节点执行完毕
CountDownLanch
这是数据运行最简单的一个队列了,假定初始化为3
image.png
如图,主线程调用await(),被park()加入到queue中。每次有线程调用countDown()时,会讲state减去1,并且检查state是否为0,如果为0,则会讲头结点删除,解锁后续结点,即将主线程解锁。