Android开发程序员

Java小白系列(十二):抽象队列同步器(AbstractQue

2021-02-17  本文已影响0人  青叶小小

一、前言

抽象队列同步器,是一个抽象类,它采用了模板方法的设计模式《Java设计模式二:模板模式》。同时,它也内置了 CLH 队列《小白十:CLH》,让加入的线程自旋,当该线程(节点)的前驱节点拿到锁时,该线程就进入阻塞,等待前驱节点的唤醒,而线程的唤醒方式则是通过线程中断的方式来实现的,关于线程中断,大家可以详看:《小白十一:线程中断》
既然 AQS 是抽象类,所以,本文只分析核心流程,遇到需要子类实现的方法时,我们先跳过,到时候再结合具体的子类我们进行整体分析。

二、独占与共享

我们在学习 CLH 的时候,就讲到了一个节点可能是独占模式,也可能是共享模式,但对于 AQS 来说,但凡没有获取到锁的线程,都要被加入到 AQS 的 CLH 队列中,因此,AQS 并不是特别关心获取锁的线程是哪种模式,我们在使用模板设计模式时,也需要有这种思想。

虽然 AQS 不关心,但它给出了一组可选的方法需要子类来重载:

override.png

这段注释说明中,我们可以了解到几点:

因此,AQS 提供了几个可选的方法需要实现:

private static final class LockSync extends AbstractQueuedSynchronizer {
    protected LockSync() {
        super();
    }

    @Override
    protected boolean isHeldExclusively() {
        return super.isHeldExclusively();
    }
    
    @Override
    protected boolean tryAcquire(int arg) {
        return super.tryAcquire(arg);
    }

    @Override
    protected boolean tryRelease(int arg) {
        return super.tryRelease(arg);
    }

    @Override
    protected int tryAcquireShared(int arg) {
        return super.tryAcquireShared(arg);
    }

    @Override
    protected boolean tryReleaseShared(int arg) {
        return super.tryReleaseShared(arg);
    }
}

我们来分析一下这几个方法的含义:

根据上面这些,我们之后会分析到的(提前剧透):

三、AQS 核心流程

我们的 AQS 内部采用 CLH 队列来管理所有需要获取锁而获取失败,需要加入到队列中,本地自旋来等待锁的释放。

因此,流程的起点是:

这也是符合『锁』的流程:加锁 or 解锁!

我们来看个独占锁的请求锁和释放锁的流程:

   // 获取锁流程
   Acquire:
       while (!tryAcquire(arg)) {
          // enqueue thread if it is not already queued;
          // possibly block current thread;
       }
   // 释放锁流程
   Release:
       if (tryRelease(arg))  {
          // unblock the first queued thread;
      }

(共享模式与上面的类似,只是调用的方法不同;且因为是共享模式,当一个锁释放时,会引起一堆线程的竞争)

3.1、再谈 CLH 的公平与非公平性

不知道大家是否还记得我在《小白十》中分析 CLH 时,通过文字的描述方式和大家说过了,CLH 本身是公平的(严格的FIFO),但是,之所以存在不公平的竞争,原因在于:新来的线程是先入队再请求锁,还是先请求锁且失败后再入队列。JDK 源码中注释也说了这一点:

公平与非公平.png

原注释也说了:

因为在入队之前可以先尝试一下获取锁,因此,新来的线程就有机会与队列中被阻塞的首元素竞争。当然,我们也可以在 tryAcquire 或 tryAcquireShared 方法中来禁止这种不公平的竞争行为。一般大多数公平同步器会根据 AQS 中的 hasQueuedPredecessors 这个方法的返回来返回结果。

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

该方法会在之后详细展开分析,现简单分析返回结果;返回 true 表明队列中有正在排队或正在竞争的线程;返回 false 表明队列可能为空(或者当前竞争的线程就是自己)。

所以,tryAcquire 实现公平性,仅依赖 hasQueuedPredecessors 的返回结果即可,简单来说,代码可如下实现:

protected boolean tryAcquire(int arg) {
    return !hasQueuedPredecessors();
}

即:新来的线程想要获取锁时,先获取 hasQueuedPredecessors 的结果,如果 hasQueuedPredecessors 返回 true,表明队列有排队的元素,那么 tryAcquire 就返回 false ,表明获取锁失败!

3.2、获取锁流程

3.2.1、AQS.acquire

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
}

该方法是独占模式调用的:

3.2.2、AQS.addWaiter(重点:入队)

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {
    
    /**
     * Creates and enqueues node for current thread and given mode.
     */
    private Node addWaiter(Node mode) {
        // 封装为一个独占模式的 Node
        Node node = new Node(Thread.currentThread(), mode);
        // 获取 tail 尾节点
        Node pred = tail;
        // tail != null,pred 就是 tail 节点
        if (pred != null) {
            // 新节点.prev = pred (tail节点 )
            node.prev = pred;
            // CAS,直接设置 tail = node
            if (compareAndSetTail(pred, node)) {
                // pred (tail节点).next = node
                pred.next = node;
                return node;
            }
        }
        
        // tail = null,表明队列还未初始化
        // 需要先初始化 head & tail
        enq(node);
        
        // 返回新的节点
        return node;
    }
}

3.2.3、AQS.enq

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
}

该方法主要是用来初始化 CLH 队列头结点,然后再插入新的节点。

我们来分析下:

3.2.4、AQS.acquireQueued(重点:自旋 -> park阻塞 -> 拿到锁返回)

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {
    
    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            
            // 重点:自旋
            for (;;) {
                // 获取 node 的前驱节点
                final Node p = node.predecessor();
                
                // 如果前驱节点是头节点,且获取到锁,才会退出该循环
                // 因此,这里我们也要注意一点:
                // 即便前驱节点是头节点,此时该线程也只是尝试竞争获取锁,而
                // 在非公平模式下,还是有可能失败使其继续自旋!
                if (p == head && tryAcquire(arg)) {
                    // 将该节点设为头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                
                // 该线程也不能一直自旋消耗CPU是吧?所以,自旋到了一定时候,就要进入阻塞状态,
                // 等待被唤醒,如果下面的两个条件成立,则表示线程进入阻塞状态,待之后前驱节点
                // 释放锁后,通过中断来唤醒该线程
                if (shouldParkAfterFailedAcquire(p, node) && // 判断前驱节点的状态是否为 Node.SIGNAL
                    // 若 shouldParkAfterFailedAcquire = true 才会走到该方法
                    // 该方法就是 park 线程进入阻塞状态,一旦park成功,就被挂起,
                    // 挂起中的线程是无法响应线程中断的,因此在 unpark 后检查一下
                    // 线程是否曾经被中断过,返回线程中断的状态
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),
            // 那么取消结点在队列中的等待
            if (failed)
                cancelAcquire(node);
        }
    }
}

parkAndCheckInterrupt 线程挂起后就不会再继续执行后续代码!直到被前驱节点释放锁时 unpark 才行!并且,在挂起过程中,线程中断也无法响应!

3.2.5、AQS.shouldParkAfterFailedAcquire

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {
    
    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 前驱节点已经被设置为该状态:即该节点线程释放锁时会通知下一个节点
             */
            return true;
        if (ws > 0) {
            /*
             * 该线程取消了
             */
            do {
                // 不断向前查找节点,直到节点的状态 <= 0
                // 这行代码分解为两行:
                // pred = pred.prev;  // 获取前驱节点;
                // node.prev = pred;  // 新节点的prev指向新的前驱节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // 新的前驱节点的next = 新的节点 
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
}

重点是 compareAndSetWaitStatus,一旦前驱节点的状态被设置为 SIGNAL,那么 node 的下一次自旋就会进入阻塞状态,因为,下一次循环进入该方法时,前驱节点的 waitStatus 状态为 SIGNAL。

3.2.6、AQS.parkAndCheckInterrupt(重点:挂起)

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {
        
    private final boolean parkAndCheckInterrupt() {
        // 调用 park 使线程进入 waiting 状态
        LockSupport.park(this); // -----------------挂起!unpark后才会继续执行下一行代码
        // 返回线程是否被中断过,并清除中断状态!
        return Thread.interrupted();
    }
}

该方法很简单,直接让线程进入阻塞状态,然后,返回是否曾经被中断过。

至此,整个获取锁的流程就分析完了,我们来小结下:

  1. tryAcquire尝试获取锁;
  2. 获取失败,addWaiter入队;
  3. acquireQueued 自旋、检查or设置前驱节点 waitStatus = SIGNAL;
  4. 然后 park 挂起,挂起中不响应中断;
  5. 等待前驱节点释放时来 unpark (下面要分析的);
  6. unpark 后的下一次循环开始进入锁的竞争(因为 unpark 后,其前驱是 head 节点);
  7. 如果失败,再次挂起,重复 5,6步骤;
  8. 如果成功,则把自己设置为 head 节点,返回是否曾经被中断过;

我们可以看到 acquire 是整个获取锁的入口,一旦没有成功,该方法也会被阻塞,直接该调用者(即线程)拿到锁,或者超时、取消等其它原因才会继续执行完;整个过程其实非常的简单,只是有些细节,大家需要注意,凡是我标注的重点,大家都应该仔细看看其中的注释,细细品味各种可能性。

3.3、释放锁流程

我们分析完了获取锁的流程,并且也提到了一点点 unpark ,所以,我们的释放锁的流程会非常的容易理解,流程也非常的短小。获取锁的入口是『acquire』,而释放锁的入口则是『release』;对于共享模式,释放锁的模式则是『releaseShared』。

3.3.1、AQS.release

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {
        
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
}

该方法很简单,调用子类『tryRelease』,具体由子类实现,我们直接跳过,只研究成功的情况。当 tryRelease 成功释放锁,则拿到当前 head 节点,如果 head.waitStatus != 0 ,则去 unpark;那 unpark 谁呢?当然是 unpark 下一个节点嘛。

3.3.2、unparkSuccessor

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {
        
    private void unparkSuccessor(Node node) {
        // 先尝试将自己的 waitStatus 置为 0
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        // 获取下一个节点,并尝试去唤醒它!
        Node s = node.next;
        
        // 如果下一个节点不存在,或者已被 cancelled
        if (s == null || s.waitStatus > 0) {
            s = null; // 置该节点为null
            
            // 从后向前遍历,直到 t = null or t == node 时结束
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        
        if (s != null)
            // unpark!取消线程的挂起
            LockSupport.unpark(s.thread);
    }
}

整个解锁流程就分析完了,对,就这么简单!

四、小结

AQS 咋一看,方法很多,但我们只关注主流程,其实就几个函数,也非常简单;希望大家通过学习并掌握主流程之后,再去了解其它方法,就不会有任何压力了。AQS 分析完了,接下来,我将会带领大家开始分析我们常用的一些子类,如:ReentrantLock、ReadWriteLock等。

上一篇下一篇

猜你喜欢

热点阅读