Condition

2018-05-14  本文已影响0人  囧囧有神2号

生产者消费者实例:

public class ProductorCustom<T> {

    private final ReentrantLock lock = new ReentrantLock();
    private Condition putButFull = lock.newCondition();
    private Condition tackButEmpty = lock.newCondition();
    private int head, tail, count;
    private final T[] items;

    public ProductorCustom() {
        this(10);
    }

    public ProductorCustom(int maxSize) {
        items = (T[]) new Object[maxSize];
    }

    public void put(T item) throws InterruptedException {
        lock.lock();
        try {
            while(count == items.length) {
                putButFull.await();
            }
            items[tail] = item;
            if (++tail == items.length) {
                tail = 0;
            }
            ++count;
            tackButEmpty.signal();
        }
        finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                tackButEmpty.await();
            }
            T item = items[head];
            if (++head == items.length) {
                head = 0;
            }
            --count;
            putButFull.signal();
            return item;
        }
        finally {
            lock.unlock();
        }
    }
}

Condition是Object的wait/notify的替代,更灵活;Condition接口定义的方法,await对应于Object.wait,signal对应于Object.notify,signalAll对应于Object.notifyAll。
ReentrantLock 的newCondition():

    public Condition newCondition() {
        return sync.newCondition();
    }

最终调用的是AQS中的内部类ConditionObject,它实现了Condition接口。

概述

AQS维护了一个锁队列;ConditionObject同样维护了一个条件队列,该队列里的Node等待着signal信号;两个队列间的关系是这样的:

  1. 比如节点A是锁队列的head,acquire成功,B等待在A后;
  2. A线程执行await,会把A移到条件队列中,release唤醒B
  3. B执行signal,会将A放回到锁队列,但并没有被唤醒;

大致上就是一个线程正在运行,但是因为某些条件未满足需要等待,我们就把他放到条件队列中等待,其他线程就有机会执行,当条件满足就从条件队列中取出一个或全部等待的线程,把他放回锁队列中等待获取执行资格;
接下来看看实现细节:

ConditionObject

AQS里的内部类,实现了Condition,ReentrantLock 调用的就是它;
所以才说AQS是JUC包之基;

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** 条件队列的头节点. */
        private transient Node firstWaiter;
        /**条件队列的尾节点. */
        private transient Node lastWaiter;
await
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
//创建当前线程的Node节点,放入条件队列
            Node node = addConditionWaiter();
//锁队列中的当前线程release,唤醒后面的线程
            int savedState = fullyRelease(node);
            int interruptMode = 0;
//isOnSyncQueue检测节点是否在锁队列;
//这里的逻辑是signal会将node放回到锁队列,如果node不在锁队列,
//说明条件没满足即没有收到signal,需要继续等待
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
//唤醒后,执行acquireQueued,尝试获取锁即改变同步状态,失败就在锁队列中等待
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters(); //清理waitStatus为CONDITION的节点
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

如上面注释,一个原本锁队列中正在运行的HEAD节点,执行await,会在条件队列中创建它的Node节点,锁队列中的它执行release操作,之后就LockSupport.park挂起等待signal唤醒

signal
        public final void signal() {
//返回true代表当前线程正式获取锁的线程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
//这里利用循环直到将firstWaiter放回锁队列为止
            } while (!transferForSignal(first) &&  
                     (first = firstWaiter) != null);
        }

    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node); //将节点加入到锁队列
        int ws = p.waitStatus;
//如果该结点的状态为cancel 或者修改waitStatus失败,则直接唤醒。
//这里一般ws=0,所以会执行更改节点状态,改为SIGNAL,更改成功放回锁队列
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

signal主要逻辑就是将条件队列的第一个节点firstWaiter加入到锁队列。
超时方法分析在我的CyclicBarrier文章里

上一篇 下一篇

猜你喜欢

热点阅读