Lock非公平锁源码解读
2021-04-15 本文已影响0人
断忆残缘
ReentrantLock锁源码解析
每当谈起锁,我一定要说一下大神Doug Lea
,可以说是java并发编程鼻祖,juc包的作者,把每一行代码的性能做到了极致,曾经吊打jvm的synchronized
锁,直接导致了jvm项目组对synchronized
锁进行了优化。
先上调试demo代码,并发编程没有场景,很难理解源码。
static Lock lock = new ReentrantLock();
public static void main(String[] args) {
//t1
new Thread(() -> {
try {
lock.lock();
// 睡眠一天
TimeUnit.SECONDS.sleep(60*60*24);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "T1").start();
//t2
new Thread(() -> {
lock.lock();
log.info("获取到锁了");
lock.unlock();
}, "T2").start();
}
我特意画了一张非公平锁等待队列的图,方便大家理解非公平锁等待队列,因为这是难点,其他部分不是很难。
lock非公平锁的等待队列.png推荐学习方法:可以边调试demo,边看我的源码注释,因为Doug Lea循环逻辑很绕,不调试很难看懂。
NonfairSync非公平锁#lock方法
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
/**
* state锁状态
* 0表示没有线程持有的状态
* 1表示有线程持有
*
* cas操作是原子的,所以当多个线程来竞争的时候只有一个线程能获取到锁
* cas解释,判断当前state状态是否为0,如果是0就改为1,反之失败
*
* T1线程获取到锁
*/
if (compareAndSetState(0, 1))
/**
* 将持有锁的线程,记录到锁中。
*
* 反思点:为啥要记录持有锁的线程,个人认为是为了锁重入,后面代码会讲到。
*/
setExclusiveOwnerThread(Thread.currentThread());
else
/**
* 核心方法,主要一下几个功能
* 1. 尝试获取锁
* 2. 创建等待队列
*
*/
acquire(1);
}
从上面代码,我们能看出Doug Lea
写的代码真的是很精练,没有很很深的功力,个人觉得写不出这样的代码。
public final void acquire(int arg) {
/**
* 尝试获取锁
*
* 反思点:在前面已经尝试获取过一次锁了,为啥这边要第二次尝试获取锁
* 1.在高性能的cpu时代,每一条命令执行都是很快的,锁的持有和释放都是瞬间的,有可能立马就获取到锁了
* 2.主要原因还是减少park,减少用户态和内核态的切换,提高性能
*/
if (!tryAcquire(arg) &&
/**
* 创建/加入等待队列
*/
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
// 非公平锁尝试获取锁
return nonfairTryAcquire(acquires);
}
/**
* 返回结果true获取到锁,反之就没有获取到锁
*/
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取当前锁的状态
int c = getState();
// 如果为0,表示锁未被任何线程持有
if (c == 0) {
// cas操作尝试获取锁
if (compareAndSetState(0, acquires)) {
// 获取到锁后,设置线程
setExclusiveOwnerThread(current);
return true;
}
}
/**
*如果持有锁的线程为当前线程,锁重入(如果不记录那个线程持有,是不是没法做锁重入)
*
*/
else if (current == getExclusiveOwnerThread()) {
/**
* 因为同一个线程,所以以下操作都不需要原子性,因为只有一个线程在操作
*
* 锁的重入就是给state+1,表示同一个线程又重新进入一次,当放弃持有会state01
* 当state=0,表示线程释放锁
*/
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* 创建一个新的等待节点
*/
private Node addWaiter(Node mode) {
// 创建一个新的节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 判断等待队列是否有尾节点,如果有把当前节点设置为尾节点
Node pred = tail;
// 如果尾节点不为空
if (pred != null) {
// 设置当前的节点的pre为前尾节点
node.prev = pred;
// cas设置当前节点为尾节点
if (compareAndSetTail(pred, node)) {
// 设置前为节点的next为当前节点
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
/**
* 将节点插入队列,进行必要化的初始化
*
*/
private Node enq(final Node node) {
for (;;) {
// 获取尾节点
Node t = tail;
if (t == null) { // Must initialize
// 用空节点初始化尾节点
if (compareAndSetHead(new Node()))
// 头节点也指向同一个新节点
tail = head;
} else {
// 当前线程节点prev指向尾节点
node.prev = t;
// cas操作,将当前线程节点设置为尾节点
if (compareAndSetTail(t, node)) {
// 前尾节点的next节点设置为当前线程节点
t.next = node;
return t;
}
}
}
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
/**
* 加入等待队列,并进行park,直到被唤醒,重新获取到锁。
* 如果不行,继续park
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取node节点的prev节点
final Node p = node.predecessor();
/**
* 判断是否为头节点
* 如果是,在尝试获取一次锁
*/
if (p == head && tryAcquire(arg)) {
// 设置头节点
setHead(node);
// 清空上一个节点对当前节点的引用(help GC)
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 进入park,等待被唤醒。
// 唤醒之后,此方法返回false,不会进入下面的(interrupted = true)
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node
*/
private void setHead(Node node) {
/**
* 将当前节点变为空的头节点
*
* 反思点,为啥要有空头节点
* 个人观点:防止重复初始化等待队列,带来的性能损耗。
* 因为让没有等待线程时,队列中依然有一个空节点
*/
head = node;
node.thread = null;
node.prev = null;
}
抱怨一下:Doug Lea 为啥能把代码写的如此精炼,为啥???
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
/**
* 设置标识,用来当前执行完,是否唤醒下一个节点(难度较高)
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取上一个节点状态
int ws = pred.waitStatus;
// 如果是SIGNAL状态,直接true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
/*
* 如果前一个节点状态是取消节点就跳过,递归找上一个非取消节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 设置status为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
释放锁
/**
* 释放锁
*/
public void unlock() {
// 释放一次线程持有
sync.release(1);
}
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
// 获取头节点
Node h = head;
// 判断头节点状态
if (h != null && h.waitStatus != 0)
// 唤醒队列中的第一个等待线程
unparkSuccessor(h);
return true;
}
return false;
}
/**
* 尝试释放锁
*
* 反思点:尝试释放锁为啥没有任何cas操作
* 因为只有当前线程持有这把锁,别的线程无法持有,所以所有的操作都是原子性的
*/
protected final boolean tryRelease(int releases) {
// 释放一次锁
int c = getState() - releases;
// 判断当前线程是否为持有线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 当前线程是否释放锁标识
boolean free = false;
// c==0表示释放锁
if (c == 0) {
free = true;
// 锁中的线程清空
setExclusiveOwnerThread(null);
}
// 设置state标识
setState(c);
return free;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 查看节点等待状态
int ws = node.waitStatus;
// 设置头节点(空节点)状态为0,不需要唤醒别的节点
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取头节点的下一个真实节点
Node s = node.next;
// 找到下一个非空或者非取消节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒park线程
if (s != null)
LockSupport.unpark(s.thread);
}
到此大家应该已经学习到lock非公平锁的核心内容了吧
思考问题:
- lock非公平锁,在park之前,最多进行几次尝试获取锁?
- 通过非公平锁学习,自己学习一下公平锁,回答公平锁与非公平锁那个效率更高,为什么?
大家可以在评论区讨论答案,也可以检查一下,自己掌握的程度。