JVM

AbstractQueuedSynchronizer

2020-06-07  本文已影响0人  专职掏大粪

AbstractQueuedSynchronizer
AbstractQueuedSynchronizer队列同步器,简称AQS,它是java并发用来构建或者其他同步组件基础框架

AQS本身是没有实现任何同步接口的,它仅仅只是定义了同步状态获取和释放的方法来供自定义的同步组件的使用
AQS主要是怎么使用的呢?
在java的同步组件中,AQS的子类一般是同步组件的静态内部类

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
    ... ...

AQS的实现依赖内部的同步队列(FIFO双向队列)来完成同步状态的管理,假如当前线程获取同步状态失败,AQS会将该线程以及等待状态等信息构造成一个Node,并将其加入同步队列,同时阻塞当前线程。当同步状态释放时,唤醒队列的首节点

static final class Node {
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
}
  static final Node EXCLUSIVE = null;

        当前线程被取消;
        static final int CANCELLED =  1;
       当前节点的后继节点需要运行;
        static final int SIGNAL    = -1;
        当前节点在等待condition
        static final int CONDITION = -2;
        当前场景下后续的acquireShared可以执行
        static final int PROPAGATE = -3;

prev:前驱节点;
next:后继节点;
thread:进入队列的当前线程;
nextWaiter:存储condition队列中的后继节点。

Node是sync队列和condition队列构建的基础,AQS拥有三个成员变量:

    private transient volatile Node head;

    private transient volatile Node tail;

    private volatile int state;

对于的获取,请求形成节点将其挂在队列尾部,至于资源的转移,是从头到尾进行,队列的基本结构就出来了:

image.png

同步队列插入/删除节点

    Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }

需要传递当前线程认为的尾节点当前节点,设置成功后,当前节点尾节点建立关联。

image.png

注:设置首节点是由获取同步状态成功的线程来完成,因为每次只会有一个线程能够成功的获取到同步状态,所以,设置首节点并不需要CAS来保证。
AQS源码解析
AQS提供以下接口以供实现自定义同步器:

AQS提供两种方式来操作同步状态,独占式与共享式,
独占式同步状态获取 - acquire实现

1.调用tryAcquire方法尝试获取同步状态
2.如果获取不到同步状态,将当前线程构造成节点Node并加入同步队列
3.再次尝试获取,如果还是没有获取到那么将当前线程从线程调度器上摘下,进入等待状态。
下面我们具体来看一下节点的构造以及加入同步队列部分的代码实现
addWaiter实现

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

使用当前thread构造Node;
尝试在队尾插入节点,如果尾节点已经存在,就做以下操作

快速在队尾插入节点失败则进入enq(Node node)方法

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq的逻辑可以确保Node可以有顺序的添加到同步队列中,具体的加入队列的逻辑如下:

初始化同步队列:如果尾节点为空,分配一个头结点,并将尾节点指向头结点;
节点入队,通过CAS将节点设置为尾节点,以此在队尾做节点插入。
可以看出,整个enq方法通过“死循环”来保证节点的正确插入。

进入同步队列之后接下来就是同步状态的获取了,或者说是访问控制acquireQueued。对于同步队列中的线程,在同一时刻只能由队列首节点获取同步状态,其他的线程进入等待,直到符合条件才能继续进行。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

获取当前节点前驱节点
如果当前节点前驱节点头节点,并且可以获取同步状态,设置当前节点头结点,该节点占有锁
不满足条件的线程进入等待状态。
在整个方法中,当前线程一直都在“死循环”中尝试获取同步状态

image.png
从代码的逻辑也可以看出,其实在节点与节点之间在循环检查的过程中是不会相互通信的,仅仅只是判断自己当前的前驱是不是头结点,这样设计使得节点的释放符合FIFO,同时也避免了过早通知。
  public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

尝试释放状态,tryRelease保证将状态重置回去,同样采用CAS来保证操作的原子性;
释放成功后,调用unparkSuccessor唤醒当前节点后继节点线程
unparkSuccessor实现

    private void unparkSuccessor(Node node) {
       
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

取出当前节点的next节点,将该节点线程唤醒,被唤醒的线程获取同步状态。这里主要通过LockSupportunpark方法唤醒线程

共享式同步状态获取

共享式获取与独占式获取最主要的区别就是在同一时刻能否有多个线程可以同时获取到同步状态。这两种不同的方式在获取资源区别如下图所示

image.png
共享式访问资源时,其他共享式访问都是被允许的;
独占式访问资源时,在同一时刻只能有一个访问,其他的访问都被阻塞
AQS提供acquireShared方法来支持共享式获取同步状态
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

调用tryAcquireShared(int arg)方法尝试获取同步状态:
tryAcquireShared方法返回值 > 0时,表示能够获取到同步状态;
获取失败调用doAcquireShared(int arg)方法进入同步队列。

以CountDownLatch的tryAcquireShared实现来看,state为0时,返回1,获取不到同步状态

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

获取当前节点的前驱节点;
如果当前节点的前驱节点是头结点,并且获取到的共享同步状态 > 0,设置当前节点的为头结点,获取同步状态成功;
不满足条件的线程自旋等待。
与独占式获取同步状态一样,共享式获取也是需要释放同步状态的,AQS提供releaseShared(int arg)方法可以释放同步状态。

共享式同步状态释放 - releaseShared实现
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

调用tryReleaseShared方法释放状态
调用doReleaseShared方法唤醒后继节点

https://www.jianshu.com/p/df0d7d6571de
https://segmentfault.com/a/1190000008471362#item-2

上一篇 下一篇

猜你喜欢

热点阅读