Java

Condition源码分析

2020-10-15  本文已影响0人  笔记本一号
Java对象都有一组监视器方法:wait,notify,notifyAll,而synchronized本身就是利用虚拟机提供的对象监视器(ObjectMonitor对象)实现同步,这些方法与synchronized配合实现了等待/通知模式,Condition提供了类似synchronized监视器的方法,利用AQS条件队列与Lock配合实现了等待/通知模式,下图是摘自《Java并发编程的艺术》 Java并发编程的艺术

代码演示:

public class ConditionTest implements Runnable {
    private final static Lock lock = new ReentrantLock();
    private final static Condition CONDITION = lock.newCondition();
    public void test1() {
        lock.lock();
        try {
            if (Thread.currentThread().getName().equals("t1")) {
                Thread.sleep(100);
                System.out.println(Thread.currentThread().getName() + "拿到锁了");
                System.out.println(Thread.currentThread().getName() + "释放锁,等待通知");
                CONDITION.await();
                System.out.println(Thread.currentThread().getName() + "重新获得锁");
                System.out.println(Thread.currentThread().getName() + "执行结束");
            } else if (Thread.currentThread().getName().equals("t2")){
                Thread.sleep(100);
                System.out.println(Thread.currentThread().getName() + "拿到锁了");
                System.out.println(Thread.currentThread().getName() + "发送通知");
                CONDITION.signal();
                System.out.println(Thread.currentThread().getName() + "执行结束,释放锁");
            }
        } catch (Exception e) {
            System.out.println(e);
        } finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ConditionTest test = new ConditionTest();
        Thread t1 = new Thread(test, "t1");
        Thread t2 = new Thread(test, "t2");
        t1.start();
        Thread.sleep(300);
        t2.start();

    }
    @Override
    public void run() {
        test1();
    }
}
image.png

Condition作用原理

Condition内部是AQS的内部类利用条件队列实现阻塞和通知线程的效果。当一个线程在调用了await方法以后会被阻塞,调用signal方法唤醒。这种方式为线程提供了更加简单的等待/通知模式。一个Condition的实例必须与一个Lock绑定,因为Condition是作为AQS的内部类实现的,而Lock内部其实就是使用AQS实现的同步功能,因此Condition作用需要与Lock或者实现了AQS的类配合使用。

等待:
Condition是AQS的内部类。每个Condition对象都包含一个队列(等待队列)。等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、以当前线程构造成节点并将节点从尾部加入等待队。如果不是通过 其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断interrupt(),则会抛出InterruptedException异常信息。

image.png

通知:
调用Condition的signal()方法,将会唤醒在等待队列中首节点,在唤醒节点前,会将节点移到同步队列中,在调用signal()方法之前必须先判断是否获取到了锁。接着获取等待队列的首节点,将其移动到同步队列并且利用LockSupport唤醒节点中的线程

通知

Condition源码分析

await

ConditionObject 是AQS的内部类,实现了Condition接口
await()方法对中断敏感,线程中断标志位为true时调用await()就会抛出异常

//这个ConditionObject 是AQS的内部类
 public class ConditionObject implements Condition, java.io.Serializable {
 public final void await() throws InterruptedException {
//这里表明await()方法对中断敏感,线程中断为true时调用await()就会抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();
//将当前线程封装成节点并且设置为CONDITION加入到Condition队列中去,这里如果lastWaiter不为CONDITION状态,那么会把它踢出Condition队列。
            Node node = addConditionWaiter();
//释放node节点线程的锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
//判断节点是否在同步队列中,在则使用LockSupport.park将其挂起
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

将当前线程封装成节点并且设置为CONDITION加入到Condition队列中去,这里如果lastWaiter不为CONDITION状态,那么会把它踢出Condition队列

   private Node addConditionWaiter() {
            Node t = lastWaiter;
         // 遍历队列,将状态不为CONDITION的节点剔除出队列
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
//将当前线程封装成节点并且设置为CONDITION加入到Condition队列中去
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
//尾节点为空则将节点表明队列为空,将新节点设置为头节点
            if (t == null)
                firstWaiter = node;
            else
//尾节点不为空则将节点表明队列不为空,将新节点设置为尾节点的后续节点
                t.nextWaiter = node;
//将新节点设置为尾节点
            lastWaiter = node;
            return node;
        }

遍历队列,将状态不为CONDITION的节点剔除出队列

private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            //遍历队列
            while (t != null) {
                //遍历队列
                Node next = t.nextWaiter;
                //将状态不为CONDITION的节点清除
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

使用release确保线程释放

  final int fullyRelease(Node node) {
        boolean failed = true;
        try {
//获取到锁代表getState()大于0
            int savedState = getState();
//这里会确保线程释放
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

release利用tryRelease先进行释放锁,tryRelease是ReentrantLock继承AQS实现的方法,可以确保线程是获取到锁的,并且进行释放锁,unparkSuccessor主要是利用LockSupport.unpark(s.thread)唤醒线程,这里我之前在AQS的源码分析讲过,不懂的可以去看看

  public final boolean release(int arg) {
        //释放锁,这个方法是ReentrantLock继承AQS实现的方法
        if (tryRelease(arg)) {
            Node h = head;
//如果节点状态不是CANCELLED,也就是线程没有被取消,也就是不为0的,就进行唤醒
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

这个方法主要是确保了当前线程持有的锁,不是则抛出异常,确保线程一定是获取到锁的线程,并且进行相应的释放锁

   protected final boolean tryRelease(int releases) {
//将线程的state计时器减-1,state为0代表没有线程持有锁,大于0则代表有线程持有锁了
            int c = getState() - releases;
//判断是否是当前线程持有的锁,不是则抛出异常,确保线程一定是获取到锁的线程
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
//将记录持有线程的变量置为空
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

isOnSyncQueue主要用于判断node是否在同步队列中

final boolean isOnSyncQueue(Node node) {
   //判断节点的状态,如果状态是CONDITION,说明节点肯定不在同步队列中,同时哪怕同步队列是刚刚初始化的,也会有一个冗余的头节点存在,
//所以节点的前驱节点如果为null,那么节点也肯定不在同步队列中,返回fasle
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
   //节点的后继节点不为null,说明节点肯定在队列中,返回true,
//这里很重要的一点要明白,prev和next都是针对同步队列的节点
    if (node.next != null)
        return true;
    //调用findNodeFromTail,查找node是否在同步队列中
    return findNodeFromTail(node);
}


private boolean findNodeFromTail(Node node) {
    //取得同步队列的队尾元素
    Node t = tail;
    //无限循环,从队尾元素一直往前找,找到相等的节点就说明节点在队列中,
//node为null了,说明前面已经没有节点可以找了,那就返回false
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

我们看到Condition挂起线程的手段是和AQS一样的,使用的依旧是 LockSupport的park方法,那么我们可以猜到signal使用的肯定是LockSupport的unpark方法

signal

signal的作用就是要将await()中Condition队列中第一个Node唤醒(signalAll唤醒全部Node)唤醒

isHeldExclusively是需要子类继承的,在lock中判断当前线程是否是获得锁的线程,证明了signal调用是需要在获取锁的情况下,这里是先会判断整个condition队列是否为空,不为空则获取Condition队列中第一个Node进行唤醒

  public final void signal() {
//isHeldExclusively是需要子类继承的,在lock中判断当前线程是否是获得锁的线程,是则返回true,如何当前线程不是获取锁的线程则抛出异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
//获取Condition队列中第一个Node
            Node first = firstWaiter;
//判断Condition队列是否为空
            if (first != null)
                doSignal(first);
        }

这是ReentrantLock中的实现方法,判断当前线程是否是获得锁的线程,是则返回true

   protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

这个方法主要逻辑在transferForSignal

   private void doSignal(Node first) {
            do {
//头节点为空则,队列为空
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
//将头结点从等待队列中移除
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

transferForSignal把头节点扔到同步队列中,然后使用LockSupport.unpark唤醒节点线程,enq(node)在AQS源码分析中讲过过来了,这里省略

   final boolean transferForSignal(Node node) {  
//通过CAS将状态为CONDITION节点的状态修改为0
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
//将该节点移入到同步队列中去,p是node的前缀节点
        Node p = enq(node);
        int ws = p.waitStatus;
//以下情况进行唤醒节点
//1、node的前缀节点状态为0或者节点状态不为零
//2、node的前缀节点状态修改为SIGNAL状态失败
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
上一篇 下一篇

猜你喜欢

热点阅读