JVM

AbstractQueuedSynchronizer实现分析

2019-01-08  本文已影响17人  黑小鹰

1.什么是AbstractQueuedSynchronizer?
2.同步队列中的节点(Node)
3.独占式同步状态获取与释放
4.并发问题
5.挂起等待线程
6.一个例子

什么是AbstractQueuedSynchronizer?

AbstractQueuedSynchronizer即是同步器(简称AQS),同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

AQS的核心思想是基于volatile int state这样的一个属性同时配合Unsafe工具对其原子性的操作来实现对当前锁的状态进行修改。当state的值为0的时候,标识改Lock不被任何线程所占有。

同步队列中的节点(Node)

同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点,节点的属性类型与名称以及描述如下表所示


节点是构成同步队列(等待队列)的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的基本结构如下图所示


同步器队列基本结构

同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。试想一下,当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

同步器将节点加入到同步队列的过程


节点加入到同步队列.jpg

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点,该过程


首节点的设置

独占式同步状态获取与释放

当多个线程同时去竞争锁的时候发生了什么?

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

多个线程同时进来,他们会首先会通过CAS去修改state的状态,如果修改成功,那么竞争成功,因此这个时候多个线程只有一个CAS成功,其他两个线程失败,也就是tryAcquire返回false。

接下来,addWaiter会把将当前线程关联的EXCLUSIVE类型的节点入队列:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

如果队尾节点不为null,则说明队列中已经有线程在等待了,那么直接入队尾。对于我们举的例子,这边的逻辑应该是走enq,也就是开始队尾是null,其实这个时候整个队列都是null的。

private Node enq(Node node) {
        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return oldTail;
                }
            } else {
                initializeSyncQueue();
            }
        }
    }

如果Thread2和Thread3同时进入了enq,同时t == null,则同时进入initializeSyncQueue()方法;initializeSyncQueue方法通过HEAD.compareAndSet(this, null, (h = new Node()创建同步器tail,第二次同步器尾节点不为空则都进入if代码块,进行CAS加入队尾,这个时候只有一个线程能够成功,然后其他继续进入循环,因此这个时候又是只有一个线程成功,我们假设是Thread2成功,哈哈,Thread2开心的返回了,Thread3失落的再进行下一次的循环,最终入队列成功,返回自己。

并发问题

基于上面两段代码,他们是如何实现不进行加锁,当有多个线程,或者说很多很多的线程同时执行的时候,怎么能保证最终他们都能够乖乖的入队列而不会出现并发问题的呢?

这也是这部分代码的经典之处,多线程竞争,热点、单点在队列尾部,多个线程都通过【CAS+死循环】这个free-lock黄金搭档来对队列进行修改,每次能够保证只有一个成功,如果失败下次重试,如果是N个线程,那么每个线程最多loop N次,最终都能够成功。

挂起等待线程

上面只是addWaiter的实现部分,那么节点入队列之后会继续发生什么呢?那就要看看acquireQueued是怎么实现的了,为保证文章整洁,代码我就不贴了,同志们自行查阅,我们还是以上面的例子来看看,Thread2和Thread3已经被放入队列了,进入acquireQueued之后:

对于Thread2来说,它的prev指向HEAD,因此会首先再尝试获取锁一次,如果失败,则会将HEAD的waitStatus值为SIGNAL,下次循环的时候再去尝试获取锁,如果还是失败,且这个时候prev节点的waitStatus已经是SIGNAL,则这个时候线程会被通过LockSupport挂起。

对于Thread3来说,它的prev指向Thread2,因此直接看看Thread2对应的节点的waitStatus是否为SIGNAL,如果不是则将它设置为SIGNAL,再给自己一次去看看自己有没有资格获取锁,如果Thread2还是挡在前面,且它的waitStatus是SIGNAL,则将自己挂起。

如果Thread1死死的握住锁不放,那么Thread2和Thread3现在的状态就是挂起状态啦,而且HEAD,以及Thread的waitStatus都是SIGNAL,尽管他们在整个过程中曾经数次去尝试获取锁,但是都失败了,失败了不能死循环呀,所以就被挂起了。当前状态如下:


一个例子

在上述对同步器AbstractQueuedSynchronizer进行了实现层面的分析之后,我们通过一个例子来加深对同步器的理解:

设计一个同步工具,该工具在同一时刻,只能有两个线程能够并行访问,超过限制的其他线程进入阻塞状态。
对于这个需求,可以利用同步器完成一个这样的设定,定义一个初始状态,为2,一个线程进行获取那么减1,一个线程释放那么加1,状态正确的范围在[0,1,2]三个之间,当在0时,代表再有新的线程对资源进行获取时只能进入阻塞状态(注意在任何时候进行状态变更的时候均需要以CAS作为原子性保障)。由于资源的数量多于1个,同时可以有两个线程占有资源,因此需要实现tryAcquireShared和tryReleaseShared方法

public class TwinsLock implements Lock {
    private final Sync sync = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -7889272986162341211L;

        Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must large than zero.");
            }
            setState(count);
        }

        public int tryAcquireShared(int reduceCount) {
            for (;;) {
                int current = getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        public boolean tryReleaseShared(int returnCount) {
            for (;;) {
                int current = getState();
                int newCount = current + returnCount;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    }

    public void lock() {
        sync.acquireShared(1);
    }

    public void unlock() {
        sync.releaseShared(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean tryLock() {
        return sync.tryAcquireShared(1) >= 0;
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

测试用例

public class TwinsLockTest {

    public void test() {
        final Lock lock = new TwinsLock();
        class Worker extends Thread {
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        SleepUtils.second(1);
                        System.out.println(Thread.currentThread().getName());
                        SleepUtils.second(1);
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }
        // 启动10个线程
        for (int i = 0; i < 10; i++) {
            Worker w = new Worker();
            w.setDaemon(true);
            w.start();
        }
        // 每隔1秒换行
        for (int i = 0; i < 10; i++) {
            SleepUtils.second(1);
            System.out.println();
        }
    }
    
    public static void main(String[] args) {
        new TwinsLockTest().test();
    }
}

参考文献:
[1] AbstractQueuedSynchronizer的介绍和原理分析
[2] 扒一扒ReentrantLock以及AQS实现原理 原
[3] 分析ReentrantLock的实现原理
[4] 《Java并发编程艺术》

上一篇下一篇

猜你喜欢

热点阅读