公平锁和非公平锁

2019-03-14  本文已影响0人  柴崎越

AQS内部维护着一个FIFO队列,该队列就是CLH同步队列。
CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next)

 static final class Node {
        /** Marker to indicate a node is waiting in shared mode共享 */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode 独占*/
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled 
因为超时或者中断,节点会被设置为取消状态,
被取消的节点时不会参与到竞争中的,
他会一直保持取消状态不会转变为其他状态;
     */
*/
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking
后继节点的线程处于等待状态,
而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
 */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition
节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
*/
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
表示下一次共享式同步状态获取将会无条件地传播下去
         */
        static final int PROPAGATE = -3;

   /** 等待状态 */
        volatile int waitStatus;

      /**前驱结点*/
        volatile Node prev;

    /**后继结点*/
        volatile Node next;

        /** 获取同步状态的线程 */
        volatile Thread thread;

图片.png

入队操作

  private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
图片.png

end()方法,反复尝试向后添加

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

在上面代码中,两个方法都是通过一个CAS方法compareAndSetTail(Node expect, Node update)来设置尾节点,该方法可以确保节点是线程安全添加的。在enq(Node node)方法中,AQS通过“死循环”的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。
AQS的结构

/**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     */
    private volatile int state;
    // 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的
    private transient volatile Node head;
    // 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个隐视的链表
    private transient volatile Node tail;
    // 这个是最重要的,不过也是最简单的,代表当前锁的状态,0代表没有被占用,大于0代表有线程持有当前锁
    // 之所以说大于0,而不是等于1,是因为锁可以重入嘛,每次重入都加上1
    private volatile int state;
    // 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
    // reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
    // if (currentThread == getExclusiveOwnerThread()) {state++}
    private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

公平锁和非公平锁

 /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();默认使用非公平锁
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */


    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        根据true或者false选择使用公平锁和非公平锁
    }
图片.png

相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态

Condition

public class BoundedBuffer {
    //在使用Condition时,必须要持有相应的锁
    final Lock lock = new ReentrantLock();
    // condition 依赖于 lock 来产生
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();
    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    // 生产
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();  // 队列已满,等待,直到 not full 才能继续生产
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去
        } finally {
            lock.unlock();
        }
    }

    // 消费
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去
            return x;
        } finally {
            lock.unlock();
        }
    }

}

1、我们可以看到,在使用 condition 时,必须先持有相应的锁。这个和 Object 类中的方法有相似的语义,需要先持有某个对象的监视器锁才可以执行 wait(), notify() 或 notifyAll() 方法。
2、ArrayBlockingQueue 采用这种方式实现了生产者-消费者,所以请只把这个例子当做学习例子,实际生产中可以直接使用 ArrayBlockingQueue
condition 是依赖于 ReentrantLock 的,不管是调用 await 进入等待还是 signal 唤醒,都必须获取到锁才能进行操作。

final ConditionObject newCondition() {
            return new ConditionObject();
        }

ConditionObject


图片.png
 public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

条件队列介绍


图片.png
//可以被中断的方法
  public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //将Node添加到条件队列中去
            Node node = addConditionWaiter();
            //释放锁,返回值时释放锁之前的state值
            //因为在释放之前是持有值的,所以在这里要释放掉
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //退出强情况 1,当转移到阻塞队列中去2,线程中断时 
          while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
 /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        //将当前的线程对应的结点加入队尾
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //node 在初始化的时候,指定为Node.CONDITION
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
清除已经取消等待的结点
private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

完全释放独占锁

回到 wait 方法,节点入队了以后,会调用 int savedState = fullyRelease(node); 方法释放锁,注意,这里是完全释放独占锁(fully release),因为 ReentrantLock 是可以重入的
考虑一下这里的 savedState。如果在 condition1.await() 之前,假设线程先执行了 2 次 lock() 操作,那么 state 为 2,我们理解为该线程持有 2 把锁,这里 await() 方法必须将 state 设置为 0,然后再进入挂起状态,这样其他线程才能持有锁。当它被唤醒的时候,它需要重新持有 2 把锁,才能继续下去。

 final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();//获取释放锁之前的值
            if (release(savedState)) {//调用release释放
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
//判断结点是否进入了阻塞队列
 final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
//从阻塞队列的队尾向前遍历,如果找到,就返回true
//回到前面的循环,isOnSyncQueue(node) 返回 false 的话,那么进到 LockSupport.park(this); 这里线程挂起
        return findNodeFromTail(node);
    }

signal唤醒

   public final void signal() {
            if (!isHeldExclusively())//调用此方式时,是否持有锁
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;//指向条件队列的第一个
            if (first != null)
                doSignal(first);
        }
private void doSignal(Node first) {
     //唤醒完第一个,向后移动一个位置,如果第一个唤醒不成功,直接移动         
    do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
 final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        //进入阻塞队列,并自旋等待
        Node p = enq(node);
        //p表示node的前驱结点
        int ws = p.waitStatus;
        // ws > 0 说明 node 在阻塞队列中的前驱节
点取消了等待锁,直接唤醒 node 对应的线程。
       // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用,节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1)
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
 // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程,之后的操作看下一节
            LockSupport.unpark(node.thread);
        return true;
    }

唤醒之后获取锁

 while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }

先解释下 interruptMode。interruptMode 可以取值为 REINTERRUPT(1),THROW_IE(-1),0

REINTERRUPT: 代表 await 返回的时候,需要重新设置中断状态
THROW_IE: 代表 await 返回的时候,需要抛出 InterruptedException 异常
0 :说明在 await 期间,没有发生中断

有以下三种情况会让 LockSupport.park(this); 这句返回继续往下执行:

常规路径。signal -> 转移节点到阻塞队列 -> 获取了锁(unpark)
线程中断。在 park 的时候,另外一个线程对这个线程进行了中断
signal 的时候我们说过,转移以后的前驱节点取消了,或者对前驱节点的CAS操作失败了
假唤醒。这个也是存在的,和 Object.wait() 类似,都有这个问题
// 1. 如果在 signal 之前已经中断,返回 THROW_IE
// 2. 如果是 signal 之后中断,返回 REINTERRUPT
// 3. 没有发生中断,返回 0
  private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
 final boolean transferAfterCancelledWait(Node node) {
// 用 CAS 将节点状态设置为 0 
    // 如果这步 CAS 成功,说明是 signal 方法之前发生的中断,因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0       
 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 将节点放入阻塞队列
        // 这里我们看到,即使中断了,依然会转移到阻塞队列           
 enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0
    // signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成
    // 当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

获取独占锁

//当前线程获取 了锁
//如果返回 true,说明被中断了,而且 interruptMode != THROW_IE,说明在 signal 之前就发生中断了,这里将 interruptMode 设置为 REINTERRUPT,用于待会重新中断
 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;

如果有节点取消,也会调用 unlinkCancelledWaiters 这个方法

 private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
}        

到这里,我们终于可以好好说下这个 interruptMode 干嘛用了。

0:什么都不做,没有被中断过;
THROW_IE:await 方法抛出 InterruptedException 异常,因为它代表在 await() 期间发生了中断;
REINTERRUPT:重新中断当前线程,因为它代表 await() 期间没有被中断,而是 signal() 以后发生的中断
上一篇下一篇

猜你喜欢

热点阅读