2020-06-14_关于AQS条件队列与同步队列分析

2020-06-14  本文已影响0人  kikop

关于AQS条件队列与同步队列分析

1 概述

本文主要演示一下Condition中同步队列和条件队列是如何交互的。

1.1 交互流程

image.png

图 1 条件同步队列(图片来源网上)

2源码分析(ConditionObject)

2.1 条件队列等待await

1.将当前线程加入Condition锁队列。特别说明的是,这里不同于AQS的队列,这里进入的是Condition的FIFO队列。

2.释放锁。这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。

3.自旋(while)挂起,直到被唤醒(signal把他重新放回到AQS的等待队列)或者超时或者CACELLED等。

4.获取锁(acquireQueued)。并将自己从Condition的FIFO队列中释放,表明自己不再需要锁(我已经拿到锁了)。

2.1.1 await源码分析

public final void await() throws InterruptedException {

            if (Thread.interrupted())

                throw new InterruptedException();

            Node node = addConditionWaiter();  //创建条件节点并加到条件队列

            int savedState = fullyRelease(node); // 释放锁,返回旧值

            int interruptMode = 0;

            while (!isOnSyncQueue(node)) { // 不在同步队列里等待signal

                LockSupport.park(this); // 阻塞

                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 清除中断标志位

                    break;

            }

// 被唤醒了,将节点移动到同步队列里面,自旋并阻塞等待信号unpark

            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

                interruptMode = REINTERRUPT;

            if (node.nextWaiter != null) // clean up if cancelled

                unlinkCancelledWaiters();

            if (interruptMode != 0)

                reportInterruptAfterWait(interruptMode);

        }


    public ReentrantLock() {

        sync = new NonfairSync();

}

2.2 条件队列通知signal

2.2.1 signal源码分析

        private void doSignal(Node first) {

do {

                if ( (firstWaiter = first.nextWaiter) == null)

                    lastWaiter = null;

//因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉

                first.nextWaiter = null;

            } while (!transferForSignal(first) && // 加到同步队列,然后unpark

                     (first = firstWaiter) != null);

}

 final boolean transferForSignal(Node node) {

        /*

         * If cannot change waitStatus, the node has been cancelled.

         */

        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

            return false;

        /*

         * Splice onto queue and try to set waitStatus of predecessor to

         * indicate that thread is (probably) waiting. If cancelled or

         * attempt to set waitStatus fails, wake up to resync (in which

         * case the waitStatus can be transiently and harmlessly wrong).

         */

        Node p = enq(node);

        int ws = p.waitStatus;

//ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。

        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

            LockSupport.unpark(node.thread); // 唤醒node对应的线程

        return true; // 通知结束,

}

从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。

对每个节点执行唤醒操作时,将节点加入同步队列,再等待同步队列被唤醒(什么时候,unlock时)。

2.3 细节分析

2.3.1 waitStatus几种值

CANCELED:1

0

SIGNAL:-1

CONDITION:-2

PROPAGATE:-3

2.3.2在同步队列的判断条件

参数node waitStatus==-2或者前置节点null,不在,否则下一步

Next != null,在,否则下一步

从同步队列尾部查找node

2.3.3条件队列转入同步队列

同步队列的初始化节点:initNode(默认waitStatus:0)

取出条件队列中的第一个等待节点(waitStatus:-2),cas操作将waitStatus修改为:0,然后如同步队列。

取同步队列的前置节点。

如果waitStatus>0(实际是CANCELED),或者 CAS 失败,会进到这里唤醒线程,直接唤醒当前node节点(否则等待lock.unlock操作)

2.3.4时序(重点)

这里:await LockSupport.park(this);t2:doSignal时,还是park在此

直到t2:lock.unlock()才能被唤醒,在同步队列中自旋。

3代码示例(ConditionObject)

3.1 等待线程


package com.kikop.demo.MyAqsConditionQueue;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.ReentrantLock;

/**

 * @author kikop

 * @version 1.0

 * @project Name: javainaction

 * @file Name: MyConditionAwaitRunnableTask

 * @desc 功能描述

 * @date 2020/6/13

 * @time 22:41

 * @by IDE: IntelliJ IDEA

 */

public class MyConditionAwaitRunnableTask implements Runnable {

    private ReentrantLock reentrantLock;

    private Condition condition;

    public MyConditionAwaitRunnableTask(ReentrantLock reentrantLock, Condition condition) {

        this.reentrantLock = reentrantLock;

        this.condition = condition;

    }

    @Override

    public void run() {

        try {

            reentrantLock.lock(); // 当前t2节点加到同步队列

            System.out.println(Thread.currentThread().getName() + "lock,拿到锁了");

            System.out.println(Thread.currentThread().getName() + "等待信号");

            try {

                condition.await();

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            System.out.println(Thread.currentThread().getName() + "拿到信号");

        } finally {

            reentrantLock.unlock();

            System.out.println(Thread.currentThread().getName() + "unlock结束");

        }

    }

}

3.2 唤醒线程

package com.kikop.demo.MyAqsConditionQueue;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.ReentrantLock;

/**

 * @author kikop

 * @version 1.0

 * @project Name: javainaction

 * @file Name: MyConditionAwaitRunnableTask

 * @desc 功能描述

 * @date 2020/6/13

 * @time 22:41

 * @by IDE: IntelliJ IDEA

 */

public class MyConditionSignalRunnableTask implements Runnable {

    private ReentrantLock reentrantLock;

    private Condition condition;

    public MyConditionSignalRunnableTask(ReentrantLock reentrantLock, Condition condition) {

        this.reentrantLock = reentrantLock;

        this.condition = condition;

    }

    @Override

    public void run() {

        try {

            reentrantLock.lock(); // 当前t2节点加到同步队列

            System.out.println(Thread.currentThread().getName() + "lock,拿到锁了");

            try {

                Thread.sleep(20*1000);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            System.out.println(Thread.currentThread().getName() + "发出信号");

            condition.signal(); // 通知条件队列中第一个节点,此时条件队列-->加到同步队列,并仍然阻塞

        } finally {

            reentrantLock.unlock(); // 从同步队列中头节点开始找,并unpark

            System.out.println(Thread.currentThread().getName() + "unlock结束");

        }

    }

}

3.3 测试

package com.kikop.demo.MyAqsConditionQueue;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.ReentrantLock;

/**

 * @author kikop

 * @version 1.0

 * @project Name: javainaction

 * @file Name: AqsConditionQueueTest

 * @desc 功能描述

 * @date 2020/6/13

 * @time 22:06

 * @by IDE: IntelliJ IDEA

 */

public class AqsConditionQueueTest {

    public static void test() {

        ReentrantLock reentrantLock = new ReentrantLock();

        Condition condition = reentrantLock.newCondition();

        Thread t1 = new Thread(new MyConditionAwaitRunnableTask(reentrantLock, condition), "thread1");

        Thread t2 = new Thread(new MyConditionSignalRunnableTask(reentrantLock, condition), "thread2");

        t1.start();

        t2.start();

    }

    public static void main(String[] args) {

        test();

    }

}

thread1lock,拿到锁了step1

thread1等待信号 step2

thread2lock,拿到锁了 step3

thread2发出信号 step4

thread2unlock结束(等待中) step5

thread1拿到信号 step6

thread1unlock结束 step7

3.4图解步骤

image.png

图 2 t1由于不在同步队列找中,Await一直park趴在这儿(unlock也不会执行)

image.png

图 3 t2发出信号

image.png

图 4 t2 unlock

image.png

图 5 进入同步队列中自旋

image.png

图 6移动同步队列的头结点指针,当前node作为头结点,前置节点位置null,让gc回收(首次即为始化的initNode

image.png

图 7 t1执行业务逻辑

image.png

图 8 t1释放当前节点,unpark后置等待节点,整个流程结束

4 参考

1. 不怕难之ReentrantLock及其扩展

https://www.jianshu.com/p/a43a059b9c0a

上一篇下一篇

猜你喜欢

热点阅读