ReentranLock源码浅析
本文为作者个人理解难免有误,欢迎指正
ReentranLock 的基本构成
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer{...}
static final class NonfairSync extends Sync{...}
static final class FairSync extends Sync{...}
常用的lock方法具体实现是在FairSync和NonfairSync。Nonfaire体现在lock时尝试直接获取锁,如果获取失败,则使用和公平锁一样通过AbstractQueuedSynchronizer的acquire去加锁。
加锁
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
对于锁的获取实质上是改变AbstractQueuedSynchronizer中state属性的值。
改变的过程大致是将state属性在的java内存中的偏移映射为本地内存地址,再通过Atomic::cmpxchg
改值。这些都是native方法,在java层面上可以不关心。简单点讲,本例中我们只需要记住所谓的加锁就是将state值从0改为1。
AbstractQueuedSynchronizer是核心!!!!
到这里,我们需要先看下AbstractQueuedSynchronizer的基本结构。
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
Node组成双向链表,在源码中的称呼为CLH队列。
+------+ prev +-----+ +-----+
| head | <---- | | <---- | tail|
| | ----> | | ----> | |
+------+ next +-----+ +-----+
Node的结构
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
因为是个双向链表,所以自然有前驱和后继。thread表示尝试加锁的线程。waitStatus的值从-3到1,默认值0(下文用DEFAULT表示)
在本例中我们聚焦SIGNAL。它表示被标记为SIGNAL的节点的后继结点(的线程)需要被唤起!
acquire方法的实现
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
首先看下tryAcquire方法.以FairSync为例,对于NonfairSync,对比代码发现就少了!hasQueuedPredecessors()这个逻辑
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
先抛开hasQueuedPredecessors(),从代码来看
如果没获得锁,就做加锁操作,已经获得锁,就对当前state 加acquires(本例中为1,即+1)
为了略微全面一些,我们以公平锁为例.
没有人持有锁时,head和tail都为null,所以也不会有Queued Predecessors,tryAcquire结果为true,acquire直接返回,加锁成功结束。
相同线程执行lock,即锁重入时,对state做递增操作(+1)并state赋值。
如果是不同线程的话,就需要继续执行acquireQueued
先看addWaiter方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
创建了一个Node,Node是加锁线程的代,我们取名N1。由于head和tail都为null,进入enq方法。首先创建一个Node(取名N0)初始化head和tail,初始化后如下图,
+------+
| head |
| tail |
+------+
继续循环,走入else后,N1是tail,N0是head
+------+ +------+
| N0 | <---- | N1 |
| head | ----> | tail |
+------+ +------+
加入成功后就会执行
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
从上图的结构看,node的前驱就是head,但是tryAcquire是不成功的,因为已经被别的线程加锁了,所以走shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
刚开始,我们创建的Node和默认新增的都是没有设置waitStatus的,所以N0(pred)和N1(node)都默认值0。
经过这段代码,实质上更改waitStatus的值
+------+ +------+
| N0 | <---- | N1 |
|SIGNAL| ----> |DEFAULT|
+------+ +------+
此时acquireQueued继续循环,shouldParkAfterFailedAcquire走入第一个if返回true,执行parkAndCheckInterrupt,线程就会park在LockSupport.park(this);
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
如果别的线程继续加锁
+------+ +------+ +------+ +------+
| N0 | <---- | N1 | <---- | Nn-1 | <---- | Nn |
|SIGNAL| ----> |SIGNAL| ----> |SIGNAL| ----> |DEFAULT|
+------+ +------+ +------+ +------+
锁释放
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
首先对在tryRelease中清除当前持有锁的线程信息,更改state的值。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
unparkSuccessor主要是将当前节点的值归0,然后唤醒下一个节点。
+------+ +------+
| N0 | <---- | N1 |
|DEFAULT| ----> |DEFAULT|
+------+ +------+
当下一节点的被唤醒时,原来part在LockSupport.park(this)上的线程就开始继续执行acquireQueued中的for循环了。重点看一下
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
N1的前驱还是head,并且因为锁以释放,在公平锁的情况下,能尝试加锁成功。那么原本的N0节点就被移除了,N1就变成了HEAD,同时也是tail。Java