java程序员Android知识

撸一撸ReentrantLock的源码

2016-11-24  本文已影响157人  hello_coke

<b>synchronized:</b>把代码块声明为 synchronized,有两个重要后果,通常是指该代码具有 原子性(atomicity)可见性(visibility)。原子性意味着一个线程一次只能执行由一个指定监控对象(lock)保护的代码,从而防止多个线程在更新共享状态时相互冲突。可见性表示每次都从内存中拿去最新的值,以及代码块结束之前将最新的值更新到内存
<b>ReentrantLock:</b>相对于synchronized添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。(换句话说,当许多线程都想访问共享资源时,JVM 可以花更少的时候来调度线程,把更多时间用在执行线程上。)
之前也就是拿ReentrantLock来用用或者主要还是用synchronized比较多,还是想撸撸ReentrantLock的源代码:
<b>uml 类图梳理:</b>
<pre>


</pre>
<b>Sync类:</b>
<pre>
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/*
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
/
abstract void lock(); //抽象方法被公平锁和非公平锁分别实现
/
*
* Performs non-fair tryLock. tryAcquire is
* implemented in subclasses, but both need nonfair
* try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) { //尝试获取非公平锁
final Thread current = Thread.currentThread();
int c = getState(); //获取基类AbstractQueuedSynchronizer的 volatile state属性
if (c == 0) { //表示空闲状态
if (compareAndSetState(0, acquires)) { // 使用了sun.misc下面的UnSafe类的compareAndSwapInt方法,跟AtomicInteger等原子性类实现一样
setExclusiveOwnerThread(current); //设置锁被占用的线程为当前线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //如果锁已经被当前线程使用,重用锁
int nextc = c + acquires; //在原有的锁上面使state的值变大,所以如果释放锁的话需要使state直到变成0,单纯减1没有释放锁
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) { //尝试释放锁
int c = getState() - releases; //使得state值变小
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //锁空闲
free = true; //可用
setExclusiveOwnerThread(null); //清空占用该锁的线程
}
setState(c);
return free;
}
    protected final boolean isHeldExclusively() {  //其实上面的判断可用改成直接调用isHeldExclusively
        // While we must in general read state before owner,
        // we don't need to do so to check if current thread is owner
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    // Methods relayed from outer class

    final Thread getOwner() {  //获取锁的线程属主
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    final int getHoldCount() {  //获取锁被入次数,因为线程可以重入锁
        return isHeldExclusively() ? getState() : 0;
    }

    final boolean isLocked() {  //判断锁是否被占用
        return getState() != 0;
    }

    /**
     * Reconstitutes this lock instance from a stream.
     * @param s the stream
     */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
    }
}

</pre>
<b>UnSafe.compareAndSwapInt补充:</b>CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

<b>NonfairSync类:</b>
<pre>
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {    //获取锁
        if (compareAndSetState(0, 1))  //这是跟公平锁的主要区别,一上来就试探锁是否空闲,可以插队
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1); //具体实现看如下
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

AbstractQueuedSynchronizer类的acquire实现:
public final void acquire(int arg) {
if (!tryAcquire(arg) && //尝试获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //如果获取锁失败则丢到锁等待的队列中
selfInterrupt();
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); //其实这个主要是处理当pred == null 时处理两步 1、新建head 且 tail = head 2、tail = node 里面是个循环操作,不为空时只处理第二步,我看了一下enq这个方法有好几处在用所以作者把它包装起来
return node;
}
<b>双向链表图:</b>

链表.png
<b>acquireQueued源码:</b>
/
*
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //循环:如果获取到锁则结束,如果前面还有节点则等待前面节点获取锁结束以后释放信号
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && 判断后续线程是否应该被阻塞,这里的p的后续线程其实就是当前线程
parkAndCheckInterrupt()) //里面 LockSupport.park(this);阻塞当前线程,以及检测是否被中断
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //CANCELLED[1]:当前线程已被取消
SIGNAL[-1]:后继节点将被或者已经被阻塞,所以当前节点在释放或者取消时,需要unpark它的后继节点。
CONDITION[-2]:当前线程(处在Condition休眠状态)在等待Condition唤醒
PROPAGATE[-3]:(共享锁)其它线程获取到“共享锁”
/

* This node has already set status asking a release
* to signal it, so it can safely park.
/
return true;
if (ws > 0) {
/

* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/

* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
</pre>
<b>FairSync类源码:</b>
<pre>
/
*
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
    final void lock() {  //相对于NonfairSync的lock少了一来就compareAndSetState(0, 1)空闲就上锁
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&   //相对于NonfairSync的tryAcquire就是多了一个先判断等待队列中是否有其它等待的线程
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

</pre>

<b>NonfairSync 和 FairSync比较分析:</b>非公平锁来了先判断是否空闲,空闲就占用锁,公平锁先查看等待队列中是否有其它线程等待,按序获取锁不能插队。可想而知非公平锁效率更高,但是公平锁也有其特定的使用场景。new ReentrantLock() 默认是指定的非公平锁也相当于new ReentrantLock(false) 表示调用NonfairSync 的非公平锁,new ReentrantLock(true) 表示调用 FairSync的公平锁

<b>ReentrantLock如何响应锁中断?</b>
先说说线程的打扰机制,每个线程都有一个 打扰 标志。
这里分两种情况:

  1. 线程在sleep或wait,join, 此时如果别的线程调用此线程的 interrupt()方法,此线程会被唤醒并被要求处理InterruptedException

  2. 此线程在运行中, 则不会收到提醒。但是 此线程的 “打扰标志”会被设置, 可以通过isInterrupted()查看并 作出处理
    lockInterruptibly()和上面的第一种情况是一样的, 线程在请求lock并被阻塞时,如果被interrupt,则此线程会被唤醒并抛出InterruptedException
    上个例子:
    <pre>
    public class Test {
    public static void test() throws Exception {
    final Lock lock = new ReentrantLock();
    Thread t1 = new Thread(new Runnable() {
    @Override
    public void run() {
    try {
    Thread.sleep(2000);
    lock.lockInterruptibly();
    } catch (InterruptedException e) {
    System.out.println(Thread.currentThread().getName() + " interrupted.");
    }
    }
    });
    t1.start();
    t1.interrupt();
    Thread.sleep(10000000);
    }

    public static void main(String[] args) throws Exception {
    Test.test();
    }
    }
    输出结果为:
    Thread-0 interrupted.
    </pre>
    <pre>
    public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
    }
    基类代码:
    public final void acquireInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    if (!tryAcquire(arg))
    doAcquireInterruptibly(arg);
    }
    doAcquireInterruptibly的实现跟acquireQueued实现有些类似。
    doAcquireInterruptibly以 throw new InterruptedException()来中断
    acquireQueued 即使检测到了Thread.interruptted为true一样会继续尝试获取锁,失败则继续等待只是在最后获取锁成功之后在把当前线程置为 interrupted = true状态。
    </pre>

上一篇下一篇

猜你喜欢

热点阅读