撸一撸ReentrantLock的源码
<b>synchronized:</b>把代码块声明为 synchronized,有两个重要后果,通常是指该代码具有 原子性(atomicity)和 可见性(visibility)。原子性意味着一个线程一次只能执行由一个指定监控对象(lock)保护的代码,从而防止多个线程在更新共享状态时相互冲突。可见性表示每次都从内存中拿去最新的值,以及代码块结束之前将最新的值更新到内存
<b>ReentrantLock:</b>相对于synchronized添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。(换句话说,当许多线程都想访问共享资源时,JVM 可以花更少的时候来调度线程,把更多时间用在执行线程上。)
之前也就是拿ReentrantLock来用用或者主要还是用synchronized比较多,还是想撸撸ReentrantLock的源代码:
<b>uml 类图梳理:</b>
<pre>
</pre>
<b>Sync类:</b>
<pre>
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/*
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
/
abstract void lock(); //抽象方法被公平锁和非公平锁分别实现
/*
* Performs non-fair tryLock. tryAcquire is
* implemented in subclasses, but both need nonfair
* try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) { //尝试获取非公平锁
final Thread current = Thread.currentThread();
int c = getState(); //获取基类AbstractQueuedSynchronizer的 volatile state属性
if (c == 0) { //表示空闲状态
if (compareAndSetState(0, acquires)) { // 使用了sun.misc下面的UnSafe类的compareAndSwapInt方法,跟AtomicInteger等原子性类实现一样
setExclusiveOwnerThread(current); //设置锁被占用的线程为当前线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //如果锁已经被当前线程使用,重用锁
int nextc = c + acquires; //在原有的锁上面使state的值变大,所以如果释放锁的话需要使state直到变成0,单纯减1没有释放锁
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) { //尝试释放锁
int c = getState() - releases; //使得state值变小
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //锁空闲
free = true; //可用
setExclusiveOwnerThread(null); //清空占用该锁的线程
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() { //其实上面的判断可用改成直接调用isHeldExclusively
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() { //获取锁的线程属主
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() { //获取锁被入次数,因为线程可以重入锁
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() { //判断锁是否被占用
return getState() != 0;
}
/**
* Reconstitutes this lock instance from a stream.
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
</pre>
<b>UnSafe.compareAndSwapInt补充:</b>CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
<b>NonfairSync类:</b>
<pre>
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() { //获取锁
if (compareAndSetState(0, 1)) //这是跟公平锁的主要区别,一上来就试探锁是否空闲,可以插队
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); //具体实现看如下
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
AbstractQueuedSynchronizer类的acquire实现:
public final void acquire(int arg) {
if (!tryAcquire(arg) && //尝试获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //如果获取锁失败则丢到锁等待的队列中
selfInterrupt();
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); //其实这个主要是处理当pred == null 时处理两步 1、新建head 且 tail = head 2、tail = node 里面是个循环操作,不为空时只处理第二步,我看了一下enq这个方法有好几处在用所以作者把它包装起来
return node;
}
<b>双向链表图:</b>
<b>acquireQueued源码:</b>
/*
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //循环:如果获取到锁则结束,如果前面还有节点则等待前面节点获取锁结束以后释放信号
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && 判断后续线程是否应该被阻塞,这里的p的后续线程其实就是当前线程
parkAndCheckInterrupt()) //里面 LockSupport.park(this);阻塞当前线程,以及检测是否被中断
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //CANCELLED[1]:当前线程已被取消
SIGNAL[-1]:后继节点将被或者已经被阻塞,所以当前节点在释放或者取消时,需要unpark它的后继节点。
CONDITION[-2]:当前线程(处在Condition休眠状态)在等待Condition唤醒
PROPAGATE[-3]:(共享锁)其它线程获取到“共享锁”
/
* This node has already set status asking a release
* to signal it, so it can safely park.
/
return true;
if (ws > 0) {
/
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
</pre>
<b>FairSync类源码:</b>
<pre>
/*
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() { //相对于NonfairSync的lock少了一来就compareAndSetState(0, 1)空闲就上锁
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && //相对于NonfairSync的tryAcquire就是多了一个先判断等待队列中是否有其它等待的线程
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
</pre>
<b>NonfairSync 和 FairSync比较分析:</b>非公平锁来了先判断是否空闲,空闲就占用锁,公平锁先查看等待队列中是否有其它线程等待,按序获取锁不能插队。可想而知非公平锁效率更高,但是公平锁也有其特定的使用场景。new ReentrantLock() 默认是指定的非公平锁也相当于new ReentrantLock(false) 表示调用NonfairSync 的非公平锁,new ReentrantLock(true) 表示调用 FairSync的公平锁
<b>ReentrantLock如何响应锁中断?</b>
先说说线程的打扰机制,每个线程都有一个 打扰 标志。
这里分两种情况:
-
线程在sleep或wait,join, 此时如果别的线程调用此线程的 interrupt()方法,此线程会被唤醒并被要求处理InterruptedException
-
此线程在运行中, 则不会收到提醒。但是 此线程的 “打扰标志”会被设置, 可以通过isInterrupted()查看并 作出处理
lockInterruptibly()和上面的第一种情况是一样的, 线程在请求lock并被阻塞时,如果被interrupt,则此线程会被唤醒并抛出InterruptedException
上个例子:
<pre>
public class Test {
public static void test() throws Exception {
final Lock lock = new ReentrantLock();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
lock.lockInterruptibly();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " interrupted.");
}
}
});
t1.start();
t1.interrupt();
Thread.sleep(10000000);
}public static void main(String[] args) throws Exception {
Test.test();
}
}
输出结果为:
Thread-0 interrupted.
</pre>
<pre>
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
基类代码:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly的实现跟acquireQueued实现有些类似。
doAcquireInterruptibly以 throw new InterruptedException()来中断
acquireQueued 即使检测到了Thread.interruptted为true一样会继续尝试获取锁,失败则继续等待只是在最后获取锁成功之后在把当前线程置为 interrupted = true状态。
</pre>