并发

AQS之CyclicBarrier源码分析(1)

2019-12-21  本文已影响0人  loveFXX

AQS的同步组件CyclicBarrier,内部实现调用了Condition条件等待和释放的方法。所以需要首先介绍下Condition的使用及原理

Condition

示例代码
A线程打印A,B线程打印B,C线程打印C。循环打印ABCABC...

public class ConditionTest {

    final ReentrantLock lock = new ReentrantLock(  );
    final Condition conditionA = lock.newCondition();
    final Condition conditionB = lock.newCondition();
    final Condition conditionC = lock.newCondition();

    String falg="";

    public static void main(String[] args) {
        ConditionTest ct = new ConditionTest();

        for (int i = 0; i <10 ; i++) {
            ct.new A().start();
            ct.new C().start();
            ct.new B().start();
        }

    }

    class  A extends Thread{
        @Override
        public void run() {

            lock.lock();
                try {
                    while ("A".equals( falg )||"B".equals( falg )) {
                        conditionA.await();
                    }
                    falg = "A";
                    System.out.print( "A" );

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if ("A".equals( falg )) {
                    conditionB.signal();
                }
                lock.unlock();
            }

    }
    class  B extends Thread{
        @Override
        public void run() {
                lock.lock();
                try {
                    while ("".equals( falg )||"B".equals( falg )||"C".equals( falg )) {
                        conditionB.await();
                    }
                    falg = "B";
                    System.out.print( "B" );
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if ("B".equals( falg )) {
                    conditionC.signal();
                }
                lock.unlock();
            }

    }

    class  C extends Thread{
        @Override
        public void run() {
            lock.lock();
            try {
                while ("".equals( falg )||"C".equals( falg )||"A".equals( falg )) {
                    conditionC.await();
                }
                falg = "C";
                System.out.println( "C" );
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if ("C".equals( falg )) {
                conditionA.signal();
            }
            lock.unlock();
        }

    }
}

代码中调用了conditionA.await()和conditionA.signal()方法、ReentrantLock.lock()和ReentrantLock.unlock()方法

await()

使当前线程等待,直到被调用signal唤醒等待的线程或Thread#interrupt方法被打断
AbstractQueuedSynchronizer.ConditionObject#await()
实现可中断的条件等待


image.png

AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter
当调用await(),首先添加等待者到等待队列


image.png
如果尾结点等待状态不等于Node.CONDITION则调用unlinkCancelledWaiters方法去掉取消的等待者
image.png
之后就把当前线程,Node.CONDITION封装成node节点加入队列
AbstractQueuedSynchronizer#fullyRelease

调用fullyRelease方法返回当前状态值


image.png
进入while循环,当前节点的node.waitStatus是CONDITION则一直park等待被唤醒
AbstractQueuedSynchronizer#isOnSyncQueue
image.png
当被唤醒,不是被interrupted打断。返回0
image.png

signal

移除等待时间最长的线程。
AbstractQueuedSynchronizer.ConditionObject#signal


image.png

AbstractQueuedSynchronizer.ConditionObject#doSignal
移除和转换节点直到命中没被取消或是null的。对等待队列重新设置


image.png
AbstractQueuedSynchronizer#transferForSignal
对当前移除的节点进行CAS操作,此时是CONDITION状态。调用enq(node)方法,把这个节点加入到AQS队列中。当前线程节点CAS操作设置SIGNAL状态失败,会唤醒当前线程。在await方法中,isOnSyncQueue将返回true,退出阻塞。
image.png
上一篇下一篇

猜你喜欢

热点阅读