AQS

Java并发——Condition接口

2017-08-08  本文已影响0人  Q南南南Q

任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

Object的监视器方法与Condition接口的对比

1 Condition方法描述

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的。

Condition的(部分)方法以及描述

2 Condition的实现分析

ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。

等待队列

等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。

事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。

一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。

等待队列的基本结构

在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列,如下图所示:

同步队列与等待队列

等待

调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。

如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 同步队列的首节点并不会直接加入等待队列,而是通过addConditionWaiter()
    // 方法把当前线程构造成一个新的节点并将其加入等待队列中
    Node node = addConditionWaiter();
    // 释放同步状态,也就是释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 检测节点是否在同步队列中
    // 若节点没有在同步队列中则一直阻塞
    while (!isOnSyncQueue(node)) {
        // 阻塞线程
        LockSupport.park(this);
        // 检查阻塞过程中是否发生中断,若有中断则退出while循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 调用signal()方法将当前线程从等待队列放到同步队列中
    // 线程被唤醒,退出while循环
    
    // acquireQueued()方法尝试获取同步状态
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

如果从队列的角度去看,当前线程加入Condition的等待队列,该过程如图所示:

当前线程加入等待队列

通知

调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

public final void signal() {
    // 如果当前线程不是获取锁的线程,则抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。过程如图所示:

节点从等待队列移动到同步队列

被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。

成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁。

3 Object的wait()、notify()、notifyAll()实现分析

Java中每个对象都有一个monitor对象,底层实现对应ObjectMonitor。而synchronized会生成monitor_enter以及monitor_exit指令来获取和释放monitor。

ObjectMonitor有两个队列:_WaitSet(等待队列)和_EntrySet(同步队列)。处于wait状态的线程将会加入_WaitSet中,处于等待状态的线程将会加入_EntrySet。

wait()方法

  1. 将当前线程封装成ObjectWaiter对象node,并通过ObjectMonitor::AddWaiter方法将node添加到_WaitSet列表中
  2. 通过ObjectMonitor::exit方法释放当前的ObjectMonitor对象,这样其它竞争线程就可以获取该ObjectMonitor对象
  3. 最终底层的park()方法会挂起线程

notify()方法

  1. 通过ObjectMonitor::DequeueWaiter方法,获取_WaitSet列表中的第一个ObjectWaiter节点
  2. 根据不同的策略,将取出来的ObjectWaiter节点,加入到_EntryList或则通过Atomic::cmpxchg_ptr指令进行自旋操作cxq
上一篇下一篇

猜你喜欢

热点阅读