JDK源码分析之ReentrantLock
2019-07-15 本文已影响0人
一岁一枯荣啊
ReentrantLock可重入锁基于AQS类实现。AQS提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架。ReentrantLock基于AbstractQueuedSynchronizer实现了Sync类
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {}
ReentrantLock分为公平锁和非公平锁,都继承Sync类进行实现,NonfairSync是非公平锁。FairSync 是公平锁。
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync { }
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {}
ReentrantLock根据构造方法确定使用公平或者非公平锁
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
非公平锁NonfairSync的加锁和释放
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
//尝试加锁
final void lock() {
//通过CAS将变量state从0更新为1
if (compareAndSetState(0, 1))
//变量更新成功将独占锁的主线程设置为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
//变量更新失败,调用acquire再次尝试更新,此处回调用到nonfairTryAcquire()
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
state变量更新失败,调用nonfairTryAcquire再次尝试更新
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//取得当前state
int c = getState();
//判断是不是未加锁状态
if (c == 0) {
//尝试CAS获取修改state并将当前线程设置为独占主线程
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//没有设置成功,就判断当前线程是不是独占锁的主线程,如果是则在state的基础上+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//如果也不是当前线程就返会false
return false;
}
Acquire失败后先进行addWaiter方法,然后进行acquireQueued
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们先看addWaiter(Node.EXCLUSIVE), arg)
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
//将当前线程封装为node结构,因为FIFO队列存储的是node数据结构
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//获取当前队列尾部节点 ,如果不为空,则CAS进行入队列,将当前node更新为尾节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果尾节点为空,则进入enq()初始化队列;
enq(node);
return node;
}
我们紧跟着看些enq方法干了什么
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
//自旋
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//CAS方式初始化一个node作为队列的头节点,同时这个节点也是尾节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//自旋第二次就把当前node作为尾节点,并返回。
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
这个时候我们通过addWaiter返回了一个刚放入队列的node,acquireQueued方法会再次更新
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果node的前驱节点是head就进行尝试acquire,成功则把当前节点设置为head。将当前节点中的thread设置为null、将当前节点的prev设置为null。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//否则将线程挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
ReentrantLock的unlock方法。
//ReentrantLock.Sync
protected final boolean tryRelease(int releases) {
//定义state-1之后的值
int c = getState() - releases;
//如果当前线程不是独占的主线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//定义free表示是否已释放
boolean free = false;
//如果state==0则表示可以释放。OwnerThread设置为空
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//更新state
setState(c);
return free;
}
//AbstractQueuedSynchronizer
public final boolean release(int arg) {
//释放成功后,获取队列头节点进行唤醒操作
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
以上就是非公平锁的源码实现
公平锁的源码实现
公平锁和非公平锁的区别就在于,获取资源的时候没有用当前的线程去操作,而是只从队列头部取节点进行尝试改变state.
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
//没有用当前线程尝试改变state
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平的方法
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//如果头节点没有next 返回false
//如果头节点有next 并且不是当前线程返回true
//如果头节点有next 并且是当前线程返回true
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}