Java并发

Java并发开篇--ReentrantLock公平锁的可重入性

2020-04-04  本文已影响0人  慕北人

Java并发编程--ReentrantLock可重入性探索

我们直接先看其公平锁情况下的可重入性到底是怎么回事,由于我们讨论的是公平锁的情况,而相关的代码在ReentrantLock的内部类FairSync中。

1. lock()

public void lock() {
    sync.lock();
}  

由于是公平锁,所以我们需要重FairSync中查看lock方法:

final void lock() {
    acquire(1);
}  

而这里的acquire方法继承自AbstractQueuedSynchronizer

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}  

首先我们先提前说一下tryAcquire返回值是一个boolean,为true说明当前线程成功获取了ReentrantLock的锁,并且ReentrantLock锁是一个独占锁,而这个if条件,如果成功获取了锁,那么acquire方法就直接返回了。

AQS已经为该方法做了方法的实现,在FairSync中我们只要实现tryAcquire方法即可:

protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {  // 说明当前锁还未被持有
            // hasQueuedPredecessors返回false的情况为:当前线程在等待队列的头部或者等待队列为空
            // 这就说明了:只有等待队列的头结点可以获取锁  
            // Q:什么情况下当前线程已经在头结点了,但是还没有获取锁?
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {   // 这里说明当前线程就是独占的线程
            int nextc = c + acquires;       // 持有锁的线程获取锁的次数,这也表明了可重入性
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);           // 原来可重入性是这么个意思
            return true;         // 如果current再次使用该所对象加锁,那么会直接返回true,可重入就是这么个意思
        }
        return false;
    }

如注释中所说,当前线程能够成功获得锁有两种情况,分别代表首次加锁和重入锁

从这一个成功加锁的过程我们可以产生一些大胆的推断:

  1. 独占该锁的线程位于等待队列的头部
  2. state属性表示独占的线程加锁的次数,在之后解锁的时候可能也要这么多次数的unlock才可以释放锁
  3. 可重入性的保证就是一句current == getExclusiveOwnerThread()

注意:有没有发现,对于第一个加锁的线程,它会加锁成功,但是这个第一次加锁的线程,没有被封装成一个Node节点放到队列中;所以说,持有锁的线程是队列的头结点这句话有问题:因为持有锁的线程根本不在队列中,何来头结点一说。在下文分析加锁失败的情况会证明这一论断。

关于hasQueuedPredecessors方法:

public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}  

首先对于第一次加锁的线程,此时由于h == t == null所以返回false,而对于后面的返回false的情况大都是h != t但是s != null、s.thread == Thread.currentThread()返回的false,也就是只有头结点的后继节点调用该方法时,才会返回false表示可以尝试加锁。同时这也是可重入锁中公平锁的来源,对于之前已经在队列中的节点,那么新来的节点想要加锁,该方法会返回true说明队列中在你之前还有人在等待,得前面没人等待了你才能返回false,才能尝试去加锁,保证了先来后到的公平性

注意:对于第二个尝试加锁的线程,由于此时前面有一个人持有锁,所以它在调用lock-acquire-tryAcquire方法时由于判断state!=0且当前线程不是独占锁的线程直接判断了加锁失败,从而被添加到了队列中,这条路线中不涉及到hasQueuedPredecessors方法的调用(显然此时它是满足h==t这个判断的)

加锁失败情况

还记得acquire方法吗:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}  

如果tryAcquire方法返回false的话,说明加锁失败,同时通过上面的代码我们知道,如果加锁失败的话,当前线程没有被执行各种处理,所以我们在分析acquireQueued方法的时候没有任何后顾之忧,它的代码没有收到tryAcquire的影响:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {   // 注意,这里的意思是头结点的后继节点tryAcquire成功,也就是获取了锁
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;             // 这里说明了,如果头结点的后继节点成功获得了锁,直接返回false
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())          // parkAndCheckInterrupt方法最后一句return Thread.currentThread().interrupted()
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}  

对于这个入队操作,有几点需要说明:

  1. 只有头结点的next节点才会主动调用tryAcquire方法取申请获取锁
    1. 当头结点的next节点成功的得到了锁之后,通过setHead方法会将自己设为头结点
    2. 移除原来的头结点之后return false

好了,接下来我们就来说一说为啥上面说持有锁的线程不在队列中,如果说上文对于首次加锁的线程没有加入队列产生怀疑的话,那么这里的setHead方法会使你幡然醒悟:

private void setHead(Node node) {     
    head = node;
    node.thread = null;
    node.prev = null;
}  

看到了没:node.thread = null把头结点的线程置为空了!!!所以,对于独占到锁的线程来说,它此时已经不再队列中了!!所以说,只有头结点时持有锁的节点这句话不准确!!

那么问题来了,这个头结点时怎么初始化的呢?

头结点的初始化

答案就在调用acquireQueue方法时的addWaiter方法:

private Node addWaiter(Node mode) {  
    // 思路:新键当前线程的Node节点;如果tail被初始化过,则直接添加到尾部;否则执行enq操作先初始化tail再入队
    // 根据注释:mode传递的值要么是Node.SHARED要么是Node.EXCLUSIVE  

    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}  

这里的代码思路很简单,就是构建Node节点,然后插入到尾部,对于这个if判断语句,为false的情况会执行enq方法,在enq方法里面会有头结点的初始化:

private Node enq(final Node node) {
    // 思路:获取尾部,将参数中的node节点直接加入尾部;然后CAS更新tail引用 

    for (;;) {

        // 获取尾部
        Node t = tail;

        // 如果tail为空,说明是第一次入队操作,通过CAS初始化节点  
        // 这里初始化只是调用了默认构造器,目的是为了第二次for循环时tail不为null
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            // 因为tail是AQS在并发环境下的共享资源,所以修改tail变量要使用CAS操作
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}  

有没有发现,这个头结点的thread没有传递是参数,是一个null,这也证明了我们之前说的是正确的。

所以我们就知道了,在第一次有线程加锁的时候,它会成功得到锁,那么当第二个线程需要等待这个锁的时候,调用addWaiter的时候会初始化链表的头结点。

阻塞线程的挂起

可能有的人会问,每一个阻塞的节点都有一个无限for循环自旋,那么线程没有被挂起的话岂不是很浪费cpu资源?挂起的操作就在for循环的后面

if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())          // parkAndCheckInterrupt方法最后一句return Thread.currentThread().interrupted()
                interrupted = true;  

其中shouldParkAfterFailedAcquire(p, node)方法用来判断是否需要挂起获取锁失败的线程:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)

        // 判断是否可以挂起的思想是:如果该节点的pred节点的waitStatus已经被设置为了SIGNAL  
        // 那么说明该节点已经料理好后事了,可以在某个时刻被唤醒,所以可以安全的挂起
        return true;
    if (ws > 0) {    // 说明pred已经被cancelled了,直接移除
     
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {

        // 说明pred的waitStatue是0或者PROPAGATE,此时设置为SINGAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}  

看到了吗,只有参数中的pred,也就是需要被挂起的节点node的前一个节点的waitStatus为SINGLE的时候,才会返回true。而我们调用addWaiter方法创建Node节点的时候,waitStatue都是默认值0,所以在该方法的后面else语句中有一个CAS操作将其设置为SINGLE,这样的话,在acquireQueued方法的for自旋中,需要被挂起的线程经历两个for循环就可以使得shouldParkAfterFailedAcquire方法返回true

之后,真正为node执行挂起的操作位于parkAndCheckInterrupt方法

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}  

这样的话,没有获取到锁的线程就真的被挂起了。

2. unlock()

public void unlock() {
    sync.release(1);
}  

release方法同acquire方法一样,都是在AQS中又方法实现的:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}  

AQS的实现类只需要重写tryRelease方法即可。

protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }  

tryRelease方法很简单,就是将ReentrantLock的state值减去一,然后如果此时state为0说明独占的线程已经完全释放了锁,此时可以解除绑定,否则返回false。

而真正实现释放锁后唤醒其他线程的方法位于release中的

if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);  

我们上面在分析线程的挂起的时候说到了要想挂起,那么node的前一个节点的waitStatus必须为SINGLE,而SINGLE在Node这个类中的值为-1.是小于0的,所以一定会执行到unparkSuccessor方法

private void unparkSuccessor(Node node) {   // 通过名称,该方法是唤醒后继节点
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;  
      
    // 根据注释和代码:这里是处理node参数的next节点为null或者已经取消的情况
    // 此时从尾部开始遍历,找没有被canclled的节点唤醒  
    // PS:为何不从node开始往后找,而是从尾部开始往前找?
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }

    // 代表参数node的next节点不为空,则唤醒其中的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}  

这里的LockSupport.unpark方法就是唤起其他线程的地方,且一次只会唤醒一个线程,大部分情况就是头结点的后继节点。

上一篇 下一篇

猜你喜欢

热点阅读