(七)Java并发编程之ReentrantLock

2019-11-27  本文已影响0人  陪安东尼的漫长岁月

ReentrantLock 是上文提到的 AQS 其中的一个实现类,是一个可重入的互斥锁,和 synchronized 有相同的基本行为和语义,但是具有扩展功能。它由上一次成功锁定并且尚未解锁的线程拥有。

ReentrantLock 源码初探 (JDK11)

    // 创建公平锁
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    // 创建公平锁(true) or 非公平锁(false)   
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    /** AQS 的实现类, 重写 AQS 中的 protected 方法
    有两个实现类  1、NonfairSync 非公平锁,2、FairSync公平锁 */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
        
        /** 尝试获取非公平锁 */
        @ReservedStackAccess 
        final boolean nonfairTryAcquire(int acquires) { // 
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { // 判断state状态是否为0,不为0直接加锁
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current); //独占状态锁持有者指向当前线程
                    return true;
                }
            }   // state状态不为0 但是锁被当前线程持有 则state+1
            else if (current == getExclusiveOwnerThread()) { 
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;   //加锁失败
        }
        /** 释放锁,公平锁和非公平锁的释放操作是相同的 */
        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
        /** 判断持有独占锁的线程是否是当前线程 */
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    
        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
        // 获取锁重入次数
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }
    /** 非公平锁 */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        // 重写AQS的方法逻辑,由AQS的 acquire(int arg) 方法调用
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires); // 这个方法由Sync实现
        }
    }
    /** 公平锁 */ 
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        @ReservedStackAccess
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 分配锁资源逻辑(这里只有分配成功的操作,对于分配失败的逻辑,在AQS中实现)
            if (c == 0) {
                // 队列中没有等待节点,则对当前线程分配锁资源
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 重入逻辑 (对当前state的值 +1 )
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
// 判断队列中是否有等待节点    
public final boolean hasQueuedPredecessors() {
        Node h, s;
        if ((h = head) != null) {
            // 当 head 的下一个节点为 null 或者 是等待被剔除状态的时候
            if ((s = h.next) == null || s.waitStatus > 0) {
                s = null; // traverse in case of concurrent cancellation
               // 从 tail 节点往前遍历,获取最接近 head 的等待节点
                for (Node p = tail; p != h && p != null; p = p.prev) {
                    if (p.waitStatus <= 0) // 过滤掉队列中撤销等待的节点
                        s = p;
                }
            }
            // 该等待节点可以被唤醒(当前访问线程不是该等待节点的线程)
            if (s != null && s.thread != Thread.currentThread())
                return true;
        }
        // 队列中没有等待节点
        return false;
    }

加锁解锁过程

写到这里,对 ReentrantLock 所实现的逻辑有了一个大概的了解,但是可以发现,上面只有对加锁,解锁进行了操作,但是我们一直提及的节点,队列在上面并没有体现,那么我们加锁失败是怎么操作的呢,传说中的入队出队又是一个什么情况呢,接下来就对 ReentrantLock 的 lock() , unloc() 操作对整个加锁解锁逻辑串联进行梳理。
这里我们看一下非公平锁的操作吧。

  1. 入列操作
入列操作.png
    public void lock() {
        // 这里的 acquire 方法由 AQS 实现
        sync.acquire(1);
    }
    public final void acquire(int arg) {
        /** !tryAcquire(arg) 代表获取锁的状态(修改stat,设置线程),获取成功返回true取反则跳出 accquire;
            获取失败返回false取反则进行后续 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 操作*/
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    // 获取锁失败会进行addWaiter操作;将当前线程对应的节点添加到队列中,指向tail
    private Node addWaiter(Node mode) {
        Node node = new Node(mode);
        // 自旋将当前获取锁失败的线程对应的节点添加到队列中,并指向tail。
        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail); 
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    // 返回的是当前请求线程对应的节点
                    return node;
                }
            } else {
                // 初始化同步队列的操作,初始化结束后 head 和 tail 指向的是同一个节点
                initializeSyncQueue();
            }
        }
    }
    private final void initializeSyncQueue() {
        Node h;
        if (HEAD.compareAndSet(this, null, (h = new Node())))
            tail = h;
    }
   // 初始化独占模式的节点
   /** java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
    .Node(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node) */
   Node(Node nextWaiter) {
       this.nextWaiter = nextWaiter;
       THREAD.set(this, Thread.currentThread());
  }
    // 这里的 node 为当前线程在队列中对应的节点,arg为获取锁时的标记状态(1)
    final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;  // 线程中断标记
        try {
            for (;;) {
                final Node p = node.predecessor();
                /** 如果node的前一个节点为 head 则尝试获取锁,
                    这里是下面中断的线程被唤醒后,重新进入循环,会执行这一段代码
                    这里并没有使用CAS来设置头结点,因为 tryAcquire 里面的CAS操作,
                    只能有一个线程进入到if里面的代码块*/
                if (p == head && tryAcquire(arg)) {
                    // 加锁成功,将当前节点设置为头节点,代表当前节点出队。
                    setHead(node);  
                    p.next = null; // help GC
                    return interrupted;
                }
                // 判断是否可以挂起(通过当前节点的前一个节点的waitStatus属性判断是否可以被唤醒)
                // 如果该线程被唤醒,则继续循环执行上面的加锁操作,唤醒后将该节点剔除队列
                if (shouldParkAfterFailedAcquire(p, node))
                    // 执行挂起操作
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            // 以上操作抛出异常的时候,撤销本次请求
            cancelAcquire(node);
            if (interrupted)
                selfInterrupt();
            throw t;
        }
    }
    // 这里传入的 node 为当前访问线程对应的节点,
    // 确保前节点状态是可以被唤醒
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // 如果前一个节点是等待状态,则返回true执行挂起操作
            return true;
        if (ws > 0) {
            /** 头节点的 waitStatus 值是0,这里是从当前节点(tail)往前遍历,
                取最近的一个等待状态的节点 or head节点*/
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
          /** waitStatus必须为0或PROPAGATE。CAS设置属性之后重新进入判断*/
           pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }
    // 对当前线程执行挂起操作
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
// 在执行挂起操作发生异常时,取消正在获取资源的请求 
// 这里的node是当前请求线程对应的节点
private void cancelAcquire(Node node) {
        if (node == null)
            return;
        node.thread = null;

        // 从当前节点(也就是尾节点)向前遍历,找到最近的一个等待的节点退出循环
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        // 这里的 predNext 是距离尾节点最近的等待节点的下一个节点(不一定是当前节点)
        Node predNext = pred.next;
        // 标记当前节点为取消状态,其余节点则可以跳过该节点
        node.waitStatus = Node.CANCELLED;

        // 如果当前节点是尾节点,则移除自己
        if (node == tail && compareAndSetTail(node, pred)) {
            pred.compareAndSetNext(predNext, null);
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
                pred.thread != null) {
                // 这里是将前一个等待节点 next 设置尾当前节点的 next,将当前 node 剔除;
                // 假如中间有撤销节点的话,这样操作也会将其过滤掉;
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    pred.compareAndSetNext(predNext, next);
            } else {
                // 当前节点是head节点 or 当前节点 waitStatus 为 PROPAGATE 时进入当前逻辑
                // 唤醒当前节点的后继节点
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }
  1. 出列操作
    同步队列(CLH)遵循FIFO,首节点是获取同步状态的节点,首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点。设置首节点是通过获取同步状态成功的线程来完成的(获取同步状态是通过CAS来完成),只能有一个线程能够获取到同步状态,因此设置头节点的操作并不需要CAS来保证,只需要将首节点设置为其原首节点的后继节点并断开原首节点的next(等待GC回收)应用即可。
出列操作.png
    // tryRelease 修改实现类的state修改和节点线程解绑,成功返回true,从头节点开始出队
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);
        Node s = node.next;
        // 这里是判断下一个节点是否撤销等待,
        // 如果撤销的话,找下一个等待节点(从tail开始往前找最远的一个等待节点)
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        // 对其执行唤醒操作
        if (s != null)
            LockSupport.unpark(s.thread);
    }

总结:AQS将获取锁,释放锁的操作交由子类去实现,这里是由ReentrantLock的公平锁,非公平锁来实现;其本身实现了共享模式和独占模式的入列出列操作,这里的代码看的是独占模式的入队出队操作。获取锁失败则将当前线程的对应的节点添加到tail,添加时要保证它的前一节点是可以被唤醒的,添加成功后将当前线程挂起;唤醒操作是对head的后继节点进行唤醒,唤醒后会重复执行入队操作中的头节点获取锁的逻辑,获取成功,即可跳出循环。

上一篇下一篇

猜你喜欢

热点阅读