java收藏

AQS - ReenTrantLock “加锁”源码分析

2021-12-20  本文已影响0人  Burlong

加锁操作(公平、独占锁情形下)

假设有T1、T2、T3三个线程按 T1 → T2 → T3 的先后顺序来抢锁:

// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
    // T1只会在第一个判断之后返回
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    // 1、确保当前节点是CHL队列中的第一个线程
    // 2、cas操作将state改为1
    // 3、将exclusiveOwnerThread指向当前线程,标记当前持有锁的线程
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
        // tryAcquire: T2在第一个判断后发现拿不到锁(tryAcquire返回false)
        // addWaiter: 则调用addWaiter尝试入队
        // acquireQueued: 入队完成后,再次去尝试拿锁,如果拿到锁则出队,拿不到则park(阻塞)。此处注意acquireQueued返回值:如果是true代表被中断过,那么需要调用selfInterrupt中断当前线程??
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 由于是这个同步器第一次入队,因此pred(tail)肯定为空,那么T2会直接到这,去创建一个队列并入队
    enq(node);
    return node;
}

// 很经典的一个线程安全的双向链表构建范式!
// 特点:只有一处返回,即队列构建完成(构建队列的动作也可能在中间被其他线程给完成)并入队成功才返回head节点
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 先构建一个虚拟节点作为head节点,cas设置head节点
            if (compareAndSetHead(new Node()))
                // 如果成功了,将head和tail同时指向这个虚拟节点
                tail = head;
        } else {
            // 说明队列已经构建完成,则cas尝试让当前节点入队(尾插)
            // 也就是将上一个tail作为当前节点的prev节点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                // 入队成功后,将上一个tail的下个节点指向当前节点,完成入队
                t.next = node;
                return t;
            }
        }
    }
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点就是head并且获取锁成功(tryAcquire方法前面已分析),则执行当前节点出队操作
            if (p == head && tryAcquire(arg)) {
                // 下面操作旨在将当前节点置为一个dummy node供head节点引用
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 拿不到锁,则
            // 1、shouldParkAfterFailedAcquire:判断前驱节点waitStatus是否为-1,否则先改前驱节点的waitStatus为-1,在第二次进入这段逻辑判断则会返回true,进入parkAndCheckInterrupt
            // 2、parkAndCheckInterrupt:将当前线程挂起
            // (由于LockSupport.park(Thread)方法可被中断唤醒,因此如果当前线程被中断唤醒了,那么当前方法最终返回值则为true)
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 这段比较复杂,待分析
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
    // T3此处与T2唯一的不同点在于入队操作时不需要再创建队列,只需创建节点尝试入队
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
    }

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        // 由于T2已创建了队列,则tail不为空,那么尝试入队
        // 可以发现下面这块代码和java.util.concurrent.locks.AbstractQueuedSynchronizer#enq的for循环中else分支基本一模一样,也就是尾插入队操作
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

// T3除了在入队操作上不需再创建队列之外,其他的代码逻辑跟T2完全相同
上一篇 下一篇

猜你喜欢

热点阅读