深入解析AbstractQueuedSynchronizer源码

2018-08-06  本文已影响0人  hcy0411

简介和对比Object监视器方法对比

任何一组对象都用一组监视器方法,主要就是wait和notify方法,这些方法与synchronized关键字一起使用,实现等待/通知模式。juc也实现了类似Object的监视器方法,就是Condition,与lock可以配合使用实现等待/通知模式。功能还是有些差异,下面是Object监视器方法与Condition监视器方法功能对比。

对比项 Object Condition
实现条件 获取Synchronized锁 获取Lock锁
等待队列个数 只有一个,即一个Synchronized锁下,只有一个等待队列 多个,Lock可以new多个Condition
线程释放锁进入等待状态,是否支持中断 不支持 支持
线程释放锁进入等待状态,是否支持等待到具体某个时间 不支持 支持

以上是Object监视器方法与Condition监视器方法的主要区别,可以看出Condition监视器方法功能更多一点。

简单实现

分别用Object监视器方法和Condition监视器方法实现一个简单的阻塞队列

Object监视器方法

    private Object a = new Object();

    private Object b = new Object();

    private int count = 5;

    private List<String> list = new ArrayList<>(5);

    public void add(String str) {

        synchronized (a) {
            if (list.size() == count) {
                try {
                    a.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        synchronized (this) {
            list.add(str);
        }
        synchronized (b) {
            try {
                b.notifyAll();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public String get() {

        synchronized (b) {
            if (list.size() == 0) {
                try {
                    b.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        String str = "";
        synchronized (this) {
            str = list.get(0);
            list.remove(0);
        }
        synchronized (a) {
            try {
                a.notifyAll();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return str;
    }

Condition监视器方法

    private ReentrantLock lock = new ReentrantLock();

    private Condition a = lock.newCondition();

    private Condition b = lock.newCondition();

    private List<String> list = new ArrayList<>(5);

    private int count = 5;

    public void add(String str) {

        try {
            lock.lock();
            if (list.size() == count) {
                a.await();
            }
            list.add(str);
            b.signalAll();
        } catch (Exception e) {

        } finally {
            lock.unlock();
        }
    }

    public String get() {

        try {
            lock.lock();
            if (list.size() == 0) {
                b.await();
            }
            String str = list.get(0);
            list.remove(0);
            a.signalAll();
            return str;
        } catch (Exception e) {

        } finally {
            lock.unlock();
        }
        return null;
    }

从以上代码中可以看出Object监视器方法只有一个等待队列,因此需要定义两组Object监视器方法来实现生产者-消费者pv操作,而Condition支持多个等待队列,因此不用定义多组,直接用一个锁就行。

源码分析

本文主要分析aqs中的实现,即AbstractQueuedSynchronizer.ConditionObject
具体的一些方法即描述如下:

方法 描述
signal 唤醒一个等待在Condition上的线程
signalAll 唤醒所有等待在Condition上的线程
awaitUninterruptibly 线程进入等待队列等待被唤醒,不响应中断
await() throws InterruptedException 线程进入等待队列等待被唤醒,响应中断,抛出中断异常
awaitNanos(long nanosTimeout) throws InterruptedException 线程进入等待队列等待被唤醒,最多等待nanosTimeout纳秒,响应中断,抛出中断异常
awaitUntil(Date deadline) throws InterruptedException 线程进入等待队列等待被唤醒,最多等待到该具体时间,响应中断,抛出中断异常

简单介绍

深入解析方法实现前先简单介绍实现原理AbstractQueuedSynchronizer共有两个等待队列,Sync Queue和Condition Queue,Lock的加锁和wait,signal之间就是两个等待队列转换的过程,同Object监视器方法一样,Condition运行wait和signal方法时同样要先获取锁才能执行。
wait方法:把当前获的锁线程封装成Node节点放到Condition Queue中,然后释放锁资源。
signal方法:把Condition Queue首节点拿到Sync Queue中,完成之后释放锁资源,然后唤醒Sync Queue中的Node节点。

signal 和 signalAll

        public final void signal() {
             //健康检查,由Lock自己实现,比如ReentrantLock,就是检查当前线程是否是拥有锁的线程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //唤醒等待队列中的第一个节点
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }

doSignal 和doSignalAll

        private void doSignal(Node first) {
            do {
                // 这里的操作主要是把first节点从Condition Queue中摘出来,如果队列空了,设置lastWaiter为null
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) && // 唤醒该节点,即从Condition Queue转移到Sync Queue中
                     (first = firstWaiter) != null);
        }

        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            // 循环唤醒所有节点
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

transferForSignal

    final boolean transferForSignal(Node node) {
        /*
         * cas改变CONDITION到初始状态,如果没成功,唤醒失败
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // enq方法是将node节点插入到Sync Queue中
        Node p = enq(node);
        int ws = p.waitStatus;
        // 如果节点被取消即(ws > 0 )或者设置需要被唤醒状态SIGNAL失败,就唤醒线程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

以上是Condition的signal相关方法,唤醒操作,总结一点就是把Condition Queue中节点拿到Sync Queue中,设置成被需要被唤醒状态,等待被唤醒。

中断标志

简单介绍下中断标志,因为Condition的wait支持中断,所以中断很重要

        /**
        * 这两种的区别是是否是在signal后中断的,THROW_IE是唤醒前中断,需要抛出中断异常,REINTERRUPT是被唤醒后中断
        **/
        /** Mode meaning to reinterrupt on exit from wait 意思是wait退出前自我中断,不需要抛出中断异常 */
        private static final int REINTERRUPT =  1;
        /** Mode meaning to throw InterruptedException on exit from wait  退出前需要抛出中断异常*/
        private static final int THROW_IE    = -1;

awaitUninterruptibly

该方法不支持中断异常,实现比较简单,可以参照wait方法理解

        public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }

wait()throws InterruptedException

wait方法支持中断

        public final void await() throws InterruptedException {
        // 判断下中断状态
        if (Thread.interrupted())
            throw new InterruptedException();
        //封装node节点到Condition Queue中
        Node node = addConditionWaiter();
        // 释放锁资源,唤醒其他等待锁资源线程
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 等待是否在Sync Queue中,即是否被其他线程signal唤醒,如果没有一直进行阻塞,指导被唤醒进入Sync Queue队列中
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            // 判断下中断类型,如果有中断且已经被signal唤醒,即节点在Sync Queue中,是THROW_IE,否则是REINTERRUPT,或者是0即没有中断,代码不深入理解,可以自己去看
            // 这里被中断唤醒,而不是signal之后被唤醒,会在Condition Queue中和Sync Queue中都存在
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // acquireQueued是循环的独占方式获取锁资源,即已经被唤醒放到Sync Queue中,需要独占的方式再获取锁资源,返回值是是否被中断
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        // 这里判断是否是被中断唤醒的,如果是中断唤醒的,需要把Condition Queue中已被取消的节点清理掉,防止垃圾存在
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            // 最后就是如果有中断,处理下中断,如果是THROW_IE中断模型,抛出中断异常
            reportInterruptAfterWait(interruptMode);

checkInterruptWhileWaiting 和 transferAfterCancelledWait

       private int checkInterruptWhileWaiting(Node node) {
            // 这里就是判断下线程是否被中断,如果中断,判断中断类型
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
 // 这个函数的主要是判断节点被取消是否在signal唤醒之前,
final boolean transferAfterCancelledWait(Node node) {
       // 如果节点的状态是CONDITION,说明还没被signal唤醒,依然在Condition Queue中,然后返回插入到Sync Queue中返回true,这里说明了在signal唤醒之前被中断,两个等待队列都会存在节点
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        // 否则说明已经被signal唤醒,返回false
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

awaitNanos(long nanosTimeout)throws InterruptedException

该方法和wait方法基本差不多,就多了个超时等待

        public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            // 这里如果超时直接取消,然后退出等待
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime(); //返回值表明是否是超时还是被唤醒

剩下的两个与时间相关的方法 public final boolean awaitUntil(Date deadline)和public final boolean await(long time, TimeUnit unit)方法和 awaitNanos(long nanosTimeout)方法基本一样,不做深入理解。

总结

以上是全部的Condition理解,一句话,Condition就是锁状态的一种中间状态转换队列,实现了等待/通知模式

上一篇下一篇

猜你喜欢

热点阅读