AQS懵逼探索之路

2021-06-22  本文已影响0人  DH大黄

想到了之前一直想看又不能静下心去细看的AQS,于是乎今天上班摸鱼的时候又去看了一遍
自己看的时候做的笔记都放在了下面的代码注释里面,方便以后自己去回顾,也分享给同样有需要的小伙伴们(代码有点多,希望能够耐心看完)
不得不感叹,Doug Lea大佬真的是太强了!
另外,如果有写的不对的地方,希望大佬们能够帮忙指正

/**
 * 获取锁及获取不到锁的处理逻辑
 */
// ReentrantLock
public void lock() {
    sync.lock();
}

final void lock() {
    acquire(1);
}
// tryAcquire为协作类即ReentrantLock实现的(不在此次范围)

// AQS
// 尝试获取锁
public final void acquire(int arg) {
    // tryAcquire 尝试获取锁 acquireQueued 入队 addWaiter 新增一个节点
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 中断当前线程
        selfInterrupt();
}

private Node addWaiter(Node mode) {
    // 新建一个节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // pred = 尾节点
    Node pred = tail;
    // 尾节点不为空
    if (pred != null) {
        // 入队并将当前节点作为尾节点
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 入队
    enq(node);
    return node;
}

// 入队
private Node enq(final Node node) {
    for (;;) {
        // t = 尾节点
        Node t = tail;
        if (t == null) { // Must initialize
            // 新建一个节点,并将其设置为头节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            // 将当前节点作为尾节点
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

// 加入队列
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 循环
        for (;;) {
            // 获取当前节点的前置节点
            final Node p = node.predecessor();
            // 假如为头节点,则尝试获取一下锁
            if (p == head && tryAcquire(arg)) {
                // 将当前节点设置为头节点,并清空当前节点的信息(线程,前置节点置为null)
                setHead(node);
                p.next = null; // help GC
                failed = false;
                // 获取锁成功后,就不需要中断了
                return interrupted;
            }
            // 获取锁失败,判断是否需要挂起当前线程,
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 挂起线程
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前置节点的waitStatus
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 前置节点待唤醒,需要挂起当前节点
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        // 前置节点取消,再往前找,一直找到不为取消的位置(删除那些已经取消的节点)
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 回去将前置节点的waitStatus修改为SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

/**
 * 获取锁及获取不到锁的处理逻辑
 */
 public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// tryRelease为协作类即ReentrantLock实现的(不在此次范围)

// AQS
// 释放锁
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 有头节点且waitStatus不为0
        if (h != null && h.waitStatus != 0)
            // 唤醒线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    /*
        * If status is negative (i.e., possibly needing signal) try
        * to clear in anticipation of signalling.  It is OK if this
        * fails or if status is changed by waiting thread.
        */
    int ws = node.waitStatus;
    // ws<0 有未取消,在等待唤醒的线程(将头节点waitStatus设置为0)
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
        * Thread to unpark is held in successor, which is normally
        * just the next node.  But if cancelled or apparently null,
        * traverse backwards from tail to find the actual
        * non-cancelled successor.
        */
    // s = head.next
    Node s = node.next;
    // 假如头节点的下一个节点=s为空或已经取消,将s置为空,并从后往前找
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从后往前找,直到找到距离头节点最近的待唤醒的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 如果s不为空,则唤醒s对应的线程
        LockSupport.unpark(s.thread);
}


/**
 * 在队列中放弃等待
 */
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    // 删除cancel的节点
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    // compareAndSetTail(node, pred) 将前置节点设置为尾节点
    if (node == tail && compareAndSetTail(node, pred)) {
        // 删除当前节点
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                // 如果后继的节点需要被唤醒,则将当前节点的前置节点的next设置为当前节点的后置节点
                compareAndSetNext(pred, predNext, next);
        } else {
            // 唤醒后继节点(后续节点被唤醒后拿不到锁,又会被挂起)
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

/**
 * Node的waitStatus枚举
 */
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED =  1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL    = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
 * waitStatus value to indicate the next acquireShared should
 * unconditionally propagate
 */
static final int PROPAGATE = -3;

补充一下看代码的思路


AQS.png

学习对应AQS协作类的思路
主要关注三个内容
1.对State的修改
2.对应的队列(FIFO)
3.获取和释放锁的方式(分为公平与非公平两种情况)

上一篇下一篇

猜你喜欢

热点阅读