Java 并发编程之JUC从 ReentrantLock 开始读
我们都知道 Java 并发编程是个重点,而并发编程除了理论知识外,实际使用一般都通过 JUC(Java 并发工具包)来完成。而 JUC 除了线程池外用的最多的就是 JUC 锁,而 JUC 锁用的最多的是 ReentrantLock,所以 ReentrantLock 几乎是工作面试饶不过去的一道坎。
理解了 ReentrantLock 对理解 JUC 其他类很有帮助,思想和实现都很类似,而且难度并不低,可以学到很多细节。
所以说彻底搞懂 ReentrantLock 还是很有必要的。
开始
其实实现一个锁基本过程都是类似的,没有竞争的话不加锁,有竞争的时候让竞争者等待(自旋或线程挂起),锁释放时去竞争锁资源。synchronized、ReentrantLock、分布式锁都是这个思路,区别在于这几步的细节不一样,导致达到的性能、使用的场景也不一样。
synchronized 没有竞争的时候通过直接修改对象锁计数器加锁,ReentrantLock 通过修改 state 变量,Redis 通过 setnx 一个 key。
有竞争时:synchronized 通过升级成自旋锁不断去判断占用者有没有释放锁,超过一定次数停止自旋放入对象锁池等待锁资源释放;而 ReentrantLock 会把线程封装成队列节点放入 AQS 队列,也是不断自旋去竞争锁资源,需要阻塞时挂起;Redis 分布式锁也是自旋去获取锁。
所以很多东西的思想都是一样的,区别在于实现的细节,往往是为了达到别的锁达不到的效果,或者为了适应特定场景,在细节处做了精心设计,这时候分析源码去了解这些细节就很有意思,也可以学到一些设计思想。
源码分析
分析主要分三个方面:ReentrantLock 整体结构、加锁过程具体实现、一些重点实现方法分析。
整体流程
从代码中可以看到 ReentrantLock 实现 Lock 接口,通过继承自 AQS 的内部抽象类 Sync 实现几乎所有的操作。
public class ReentrantLock implements Lock {
/**
* 核心同步器
*/
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
/**
* 非公平锁尝试获取锁,和下面的公平锁获取逻辑一模一样,唯一区别是获取锁是少了 hasQueuedPredecessors() 判断条件
*/
final boolean nonfairTryAcquire(int acquires) {
}
}
}
Sync 类的两个实现类:NonfairSync(公平锁)、FairSync(非公平锁,默认),两个实现类内部重写了 lock() 和 tryAcquire(int acquires) 两个方法。除了这两个实现方法不同外,全部调用 AQS 提供方法,所以之后具体的实现逻辑就都交给 AQS 了。
lock() 区别: 因为非公平锁不需要判断有没有等待的线程,所以他的 lock() 直接尝试占用锁,失败才正常获取锁,公平锁不是抢占式不能直接尝试占用锁。
tryAcquire(int acquires) 区别: 公平锁在尝试获取锁的时候会判断有没有排队等待的线程(公平锁实现的关键,下面有分析),如果有不会去抢占,会根据队列顺序分配锁,实现公平性。非公平锁获取逻辑除了少了个是否有排队线程条件,其他一模一样。
static final class NonfairSync extends Sync {
/**
* 非公平锁加锁
*/
final void lock() {
// 直接尝试 CAS 修改锁状态(立即占有锁)
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 失败了走正常获取锁流程
acquire(1);
}
/**
* 非公平锁尝试获取锁:通过父类方法实现,和公平锁实现一模一样,唯一区别是少了个判断条件:!hasQueuedPredecessors()
*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
/**
* 公平锁加锁:直接正常获取锁
*/
final void lock() {
acquire(1);
}
/**
* 公平锁尝试获取锁
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 当 state 为 0 时,代表空闲
if (c == 0) {
// 如果没有排队等待的线程
if (!hasQueuedPredecessors() &&
// 且 state 更新为 1 了就代表占用成功
compareAndSetState(0, acquires)) {
// 设置当前线程为独占线程
setExclusiveOwnerThread(current);
// 返回 true 加锁成功
return true;
}
// 否则如果 AOS 当前占用线程等于当前线程
} else if (current == getExclusiveOwnerThread()) {
// state + 1,实现重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
加锁的具体实现
获取锁的过程调用父类 AQS 的模板方法,典型的模板模式,定义好加锁的主流程,子类只要重写子流程就好。
先尝试获取锁,如果失败说明被占用了,那就封装成 Node 节点加入 AQS 队列。
加入队列后自己就在队首,再尝试获取,不在队首或尝试获取失败后就挂起,然后通过自旋获取锁,并提供中断标记。
不断循环上面这个过程直到获取锁。
类似排队买票,先尝试直接去买票(tryAcquire),发现有人在排队(tryAcquire 返回 false),就加到队伍的队尾(addWaiter),加进入后如果队伍就自己一个人了再尝试直接买票,否则就玩手机(挂起)直到前面的人买完再买。
public final void acquire(int arg) {
// 先尝试获取锁(调用子类重写的尝试获取锁方法,上面讲了)
if (!tryAcquire(arg) &&
// 如果失败了,把当前线程封装成独占 Node 加入队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 设置中断标记,提供中断功能
selfInterrupt();
}
/**
* 加入 AQS 队列,并通过自旋获取锁
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 不断循环去获取锁
for (;;) {
final Node p = node.predecessor();
// 如果当前节点的前驱节点是队列头指针,说明轮到当前线程获取锁了,调用 tryAcquire 尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取到锁了,把当前节点设置为头结点,返回
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
// 判断是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果失败了,取消获取锁
if (failed)
cancelAcquire(node);
}
}
/**
* 为当前线程和给定模式创建并排队节点
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 当 AQS 队列尾结点不为空时(即有等待的线程时)
if (pred != null) {
// 新节点前驱节点指向 AQS 队列尾结点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
AQS 其他重要属性和方法
# 锁状态:不为 0 代表锁被占用,值等于重入次数
private volatile int state;
# 队列头指针(通过指针的移动来遍历数据)
private transient volatile Node head;
# 队列尾指针
private transient volatile Node tail;
# state 变量内存偏移量:对象在内存中会占用一段内存,对象的字段按一定的顺序放在这段内存中,这个方法可以计算出 state 变量在 AQS 对象相对于起始地址的偏移量
private static final long stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
/**
* 队列节点
*/
static final class Node {
// 入队线程
volatile Thread thread;
// 等待状态
volatile int waitStatus;
}
/**
* 判断是否需要阻塞,以及寻找安全点挂起
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果当前节点的前驱节点的等待状态是 SIGNAL(等待唤醒),可以安全 park
if (ws == Node.SIGNAL)
return true;
// 如果是取消状态(只有取消状态大于 0),把前面所有取消状态的节点都从链表中删除
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前一个Node的状态小于等于0,则把其状态设置为等待唤醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
公平锁实现关键:hasQueuedPredecessors()
公平锁和非公平锁尝试获取锁的逻辑几乎一样,唯一区别是多了这个方法判断。
这个方法作用是判断当前线程前面是否有排队等待的线程。没有当前线程才能加锁成功,以此实现公平性。
true 代表有,有要满足队列不为空,且队首不是当前线程;
false 代表没有,没有可能是因为队列为空,也可能是因为队首就是当前线程。
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// 判断队列是否为空,为空头结点才会和尾结点相等,为空直接返回 false
return h != t &&
// 如果队列不为空,并且队列队首的线程是当前线程(队首是等待时间最久的),返回 false
// 这里判断 (s = h.next) == null 的原因是有可能其他线程正在初始化队列,还没给头结点的 next 赋值
// 这种情况说明有其他线程先入队列,直接返回 true
((s = h.next) == null || s.thread != Thread.currentThread());
}
作者:吟游诗人巴德
链接:https://juejin.cn/post/7095271699339280392
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。