Java多线程

J.U.C之AQS:源码解析:共享式同步

2019-05-01  本文已影响0人  贪睡的企鹅

获取共享式同步状态

入口函数

     /**
     * 共享式获取同步状态,如果当前线程获取同步状态成功则直接返回,
     * 如果获取失败则线程阻塞,并插入同步队列进行.等待调用releaseShared
     * 释放同步状态时,重新尝试获取同步状态。成功则,同时会通知后置节点线程从阻塞中唤醒,
     * 获取同步状态并返回,失败则阻塞等待下次release
     */
    public final void acquireShared(int arg) {
        /**
         *子类实现tryAcquireShared能否获取的共享式同步状态
         *如果返回>=0则获取同步状态成功方法直接返回
         *如果返回< 0则获取同步状态失败进入if语句
         */
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

doAcquireShared

这里步骤4 表示了和独占最大的不同:独占同步的节点获得同步状态后,直接退出同步队列。而共享同步的节点会通知自己后置的同步节点线程(后置也是共享节点)从阻塞中唤醒去尝试获取同步状态,如果获取同步状态成功,则表示2个线程获得了一把锁(一次释放动作表示一把锁),同时执行。同时第2个线程的节点同样会通知后置同伴节点线程从阻塞中唤醒并尝试获取同步状态。

/**
     * 创建一个共享式节点node,添加到同步队列尾部.
     * 进入自旋,找到CLH头部后置第一个节点,尝试获取同步状态,成功则设置其为新head节点,
     * 并通知后置节点线程从阻塞中唤醒竞争同步状态.失败则阻塞.
     */
    private void doAcquireShared(int arg) {
        /** 创建一个共享式节点node,添加到同步队列尾部..**/
        final Node node = addWaiter(Node.SHARED);
        /** 执行是否发生异常 **/
        boolean failed = true;
        try {
            /** 标识是否被中断 **/
            boolean interrupted = false;
            /** 进入自旋 **/
            for (;;) {
                /** 1. 获得当前节点的先驱节点  **/
                final Node p = node.predecessor();
                if (p == head) {
                    /** 如果当前节点的先驱节点是头结点并且成功获取同步状态 **/
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        /** 将当前节点设置为head,同时只要同步队列中存在等待的节点,
                         * 且节点为共享节点则唤醒head节点后置节点阻塞去竞争同步状态. **/
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        /** 如果当前线程中断 **/
                        if (interrupted)
                            /** 中断当前线程 **/
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                /** 获取锁失败,在shouldParkAfterFailedAcquire中设置节点的等待状态,并线程阻塞(可响应线程被中断),
                 * 如果是中断响应设置interrupted = true;
                 * 重新进入自旋**/
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            /** 发生异常,将当前节点等待状态设置为取消**/
            if (failed)
                cancelAcquire(node);
        }
    }

setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);

        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

doReleaseShared

1 内部通过一个自旋,找到Head节点后置节点中线程从阻塞状态中唤醒竞争同步状态.

2 同时会检测释放节点的线程是否会获取同步状态,如果检查获取同步状态成功,则在次释放节点的下一个节点中的线程从阻塞中唤醒,重复迭代(是多线程可能释放的线程还没来的及获取同步状态。同步队列head节点没有改变退出的情况也是存在)因而此出逻辑作用是为性能考虑。

3 即使没有检测释放释放线程获取到同步状态返回,也不会太大影响,因为每个线程获取同步状态时还是会调用setHeadAndPropagate检查是否需要释放自己下个节点的线程的阻塞。

private void doReleaseShared() {
        //进入自旋
        for (;;) {
            //获取当前head节点,判断等待队列中是否存在等待节点,不存在进入步骤4直接退出.
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //如果当前节点的状态为-1(存在等待的后置节点),使用CAS设置其为0(使用CAS失败进入自旋重新设置)同时会唤醒head后置节点中线程从阻塞中释放.
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                //如果当前节点的状态为0(存在后置节点已经取消等待),使用CAS设置其为PROPAGATE(使用CAS失败进入自旋重新设置),
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            /**
            判断唤醒线程是否获取同步状态,获取同步状态同步队列head节点引用会发生改变,则释放被释放节点后置节点的线程中阻塞中唤醒竞争同步状态
            **/
            if (h == head)                   // loop if head changed
                break;
        }
    }

释放共享式同步状态

入口函数

releaseShared

public final boolean releaseShared(int arg) {
         /*
        *子类实现能否释放的共享式同步状态
        *如果返回true则表示释放同步状态准入条件成功进入if语句
        *如果返回false则表示释放同步状态失败返回false
        */
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

总结

独占式同步可以看作是一把共享锁,每次可以多个线程获的锁

总体流程如下

尝试锁失败 --> 进入等待队列排队  -->  阻塞当前线程 --> 当等待队列排到自己被唤醒 --> 尝试获取锁(可能被其他线程插队而导致获取锁失败,失败在次阻塞,等待下次排到自己)--> 成功通知等待队列前面共享节点线程从阻塞中唤醒  --> 执行自己的业务逻辑 --> 尝试释放锁--> 成功通知等待队列前面共享节点线程从阻塞中唤醒。

子类可以扩展如何获取锁,如何释放锁,但整体流程不变。

白话

我们举一个生活中例子来看独占式同步,比如我们去餐厅吃过饭,餐厅吃饭这个动作就是我们要执行业务,但要进入餐厅吃饭的前提要获取同步状态。对于独占来说只有一个位置,那么就独占式同步,那么进入餐厅这个动作(获取锁)只能保证同时一个人吃饭(一个线程),离开餐厅这个动作(释放锁)也只能同时保证一个进入(一个线程)。那么对于共享式同步,餐厅多个位置,那么就是共享式同步,因为同步多个人进入餐厅吃饭。当然能不能进入逻辑时餐厅这个AQS子类去定制的。

我们来看共享式例子,此时餐厅共有2个位置。

客人A进入餐厅吃饭,餐厅此时没有人,他可以顺利进入餐厅吃饭。

客人B进入餐厅吃饭,餐厅还有位置,他可以顺利进入餐厅吃饭。

客户C也进入餐厅吃饭,餐厅此时被A占着,尝试进入餐厅(子类实现逻辑判断),进入餐厅失败

餐厅给C指定一个排队编号,此时C想插队,在次尝试进入餐厅(子类实现逻辑判断),A,B还没有出来,进入餐厅失败阻塞等待

客户D也进入餐厅吃饭,餐厅此时被D占着,尝试进入餐厅(子类实现逻辑判断),进入餐厅失败

餐厅给D指定一个排队编号,此时D想插队,在次尝试进入餐厅(子类实现逻辑判断),A,B还没有出来,进入餐厅失败阻塞等待

A 吃饭完毕离开餐厅,尝试离开餐厅(子类实现逻辑判断),成功餐厅通知C(唤醒C线程),C尝试进入餐厅(可能还会被插队),成功,餐厅通知D(唤醒D线程),
D尝试进入餐厅,B还没有出来,进入餐厅失败阻塞等待

B 吃饭完毕离开餐厅,尝试离开餐厅(子类实现逻辑判断),成功餐厅通知D(唤醒D线程),D尝试进入餐厅,成功

上一篇下一篇

猜你喜欢

热点阅读