不怕难之ReentrantLock及其扩展

2018-06-14  本文已影响0人  天外流星for

一、问题导读

1.  ReentrantLock公平锁和非公平锁有什么区别

2.  obj.wait() 和 condition.await() 有什么区别

3.  Condition中同步队列和条件队列是如何交互的

二、ReentrantLock实现原理

ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。

Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁),默认为NonfairSync(非公平锁)

public ReentrantLock(boolean fair) {

    sync = fair ? new FairSync() : new NonfairSync();

}

线程抢锁之公平锁

static final class FairSync extends Sync {

    final void lock() {

        acquire(1);  // 请求获取锁

    }

  // 因为有可能直接就成功了呢,也就不需要进队列排队了,

  // 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待

    public final void acquire(int arg) {

        if (!tryAcquire(arg) &&

             acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 入同步队列

            selfInterrupt();

    }

    protected final boolean tryAcquire(int acquires) {

        final Thread current = Thread.currentThread();

        int c = getState();

        if (c == 0) { // 没有线程占用锁

            // 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待           

        if (!hasQueuedPredecessors() &&

                compareAndSetState(0, acquires)) {

                setExclusiveOwnerThread(current);

                return true;

            }

        }

        else if (current == getExclusiveOwnerThread()) { // 重入过程

            int nextc = c + acquires;

            if (nextc < 0)

                throw new Error("Maximum lock count exceeded");

            setState(nextc);

            return true;

        }

        return false;

    }

}

线程抢锁之非公平锁

static final class NonfairSync extends Sync {

    final void lock() {

        // 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了       

      if (compareAndSetState(0, 1))

            setExclusiveOwnerThread(Thread.currentThread());

        else           

      acquire(1);

    }

    // AbstractQueuedSynchronizer.acquire(int arg)   父类中方法

      public final void acquire(int arg) {

        if (!tryAcquire(arg) &&

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

    }

    protected final boolean tryAcquire(int acquires) {

        return nonfairTryAcquire(acquires);

    }

}

boolean nonfairTryAcquire(int acquires) {

    final Thread current = Thread.currentThread();

    int c = getState();

    if (c == 0) {

        if (compareAndSetState(0, acquires)) {  // 这里直接抢锁

            setExclusiveOwnerThread(current);

            return true;

        }

    }

    else if (current == getExclusiveOwnerThread()) {

        int nextc = c + acquires;

        if (nextc < 0) // overflow           

            throw new Error("Maximum lock count exceeded");

        setState(nextc);

        return true;

    }

    return false;

}

总结:公平锁和非公平锁只有两处不同

非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。

非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。

公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。

相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。


同步队列入队流程

线程解锁

// 唤醒的代码还是比较简单的,跟加锁类似

public void unlock() {

    sync.release(1);

}

public final boolean release(int arg) {

    // 往后看吧 

    if (tryRelease(arg)) {

        Node h = head;

        if (h != null && h.waitStatus != 0)

            unparkSuccessor(h); // 通知后置节点

        return true;

    }

    return false;

}

// 回到ReentrantLock看tryRelease方法

protected final boolean tryRelease(int releases) {

    int c = getState() - releases;

    if (Thread.currentThread() != getExclusiveOwnerThread())

        throw new IllegalMonitorStateException();

    // 是否完全释放锁   

      boolean free = false;

    // 其实就是重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉   

if (c == 0) {

        free = true;

        setExclusiveOwnerThread(null);

    }

    setState(c);

    return free;

}

// 唤醒后继节点// 从上面调用处知道,参数node是head头结点

private void unparkSuccessor(Node node) {

   int ws = node.waitStatus;

    // 如果head节点当前waitStatus<0, 将其修改为0 

    if (ws < 0)

        compareAndSetWaitStatus(node, ws, 0);

   // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)   

    // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的   

Node s = node.next;

    if (s == null || s.waitStatus > 0) {

        s = null;

        // 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况       

        for (Node t = tail; t != null && t != node; t = t.prev)

            if (t.waitStatus <= 0)

                s = t;

    }

    if (s != null)

        // 唤醒线程       

     LockSupport.unpark(s.thread);

}

三、Condition实现原理

同步队列与条件队列交互流程

阻塞await实现原理

// 首先,这个方法是可被中断的,这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断

public final void await() throws InterruptedException {

    if (Thread.interrupted())

        throw new InterruptedException();

    // 添加到 condition 的条件队列中   

      Node node = addConditionWaiter();

    // 释放锁,返回值是释放锁之前的 state 值   

   int savedState = fullyRelease(node);

    int interruptMode = 0;

    // 这里退出循环有两种情况,之后再仔细分析   

   // 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了   

   // 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断   

  while (!isOnSyncQueue(node)) {

        LockSupport.park(this);

        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

            break;

    }

    // 被唤醒后,将进入阻塞队列,等待获取锁   

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

        interruptMode = REINTERRUPT;

    if (node.nextWaiter != null) // clean up if cancelled       

         unlinkCancelledWaiters();  // 清除队列中已经取消的节点

    if (interruptMode != 0)

        reportInterruptAfterWait(interruptMode);

}

整个await的过程如下:  

1.将当前线程加入Condition锁队列。特别说明的是,这里不同于AQS的队列,这里进入的是Condition的FIFO队列。

2.释放锁。这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。

3.自旋(while)挂起,直到被唤醒(signal把他重新放回到AQS的等待队列)或者超时或者CACELLED等。

4.获取锁(acquireQueued)。并将自己从Condition的FIFO队列中释放,表明自己不再需要锁(我已经拿到锁了)。


唤醒signal实现原理

// 唤醒等待了最久的线程// 其实就是,将这个线程对应的 node 从条件队列转移到同步队列

   public final void signal() {

    // 调用 signal 方法的线程必须持有当前的独占锁   

    if (!isHeldExclusively())

        throw new IllegalMonitorStateException();

    Node first = firstWaiter;

    if (first != null)

        doSignal(first);  //唤醒操作

}

// 从条件队列队头往后遍历,找出第一个需要转移的 node// 因为前面我们说过,有些线程会取消排队,但是还在队列中

private void doSignal(Node first) {

    do {

  if ( (firstWaiter = first.nextWaiter) == null)

            lastWaiter = null;

        // 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉       

         first.nextWaiter = null;

    } while (!transferForSignal(first) &&

            (first = firstWaiter) != null);

final boolean transferForSignal(Node node) {

    // CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消,   

     // 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点    // 否则,将 waitStatus 置为 0   

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

        return false;

    //enq(node): 自旋进入阻塞队列的队尾    // 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点   

     Node p = enq(node);

    int ws = p.waitStatus;

    // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。

    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

        // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程,之后的操作看下一节        LockSupport.unpark(node.thread);

    return true;

}

整个signal过程如下

从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。

对每个节点执行唤醒操作时,将节点加入同步队列,再等待同步队列被唤醒。


案例剖析

public class TestCondition {

public static void main(String[] args) {

final ReentrantLock reentrantLock =new ReentrantLock();

        final Condition condition = reentrantLock.newCondition();

        new Thread(()-> {

               reentrantLock.lock();

                System.out.println(Thread.currentThread().getName() +"拿到锁了");

                System.out.println(Thread.currentThread().getName() +"等待信号");

                try {

                  condition.await();

                }catch (InterruptedException e) {

                 e.printStackTrace();

                }

                System.out.println(Thread.currentThread().getName() +"拿到信号");

                reentrantLock.unlock();

        }, "线程1").start();

        new Thread(()-> {

                 reentrantLock.lock();

                System.out.println(Thread.currentThread().getName() +"拿到锁了");

                try {

                  Thread.sleep(3000);

                }catch (InterruptedException e) {

                 e.printStackTrace();

                }

                System.out.println(Thread.currentThread().getName() +"发出信号");

                condition.signal();

                reentrantLock.unlock();

                System.out.println(Thread.currentThread().getName() +"unlock结束");

        }, "线程2").start();

    }

}

同步队列和条件队列交互过程

AQS的同步队列,Condition的条件队列开始交互:

1. 线程1调用reentrantLock.lock时,尝试获取锁。如果成功,则返回,从AQS的队列中移除线程;否则阻塞,保持在AQS的等待队列中。

2. 线程1调用await方法被调用时,对应操作是被加入到Condition的等待队列中,等待signal信号;同时释放锁。

3. 锁被释放后,会唤醒AQS队列中的头结点,所以线程2会获取到锁。

4. 线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。注意,这个时候,线程1 并没有被唤醒,只是被加入AQS等待队列。

5. signal方法执行完毕,线程2调用unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,线程1被唤醒,线程1恢复执行。

所以:

发送signal信号只是将Condition队列中的线程加到AQS的等待队列中。只有到发送signal信号的线程调用reentrantLock.unlock()释放锁后,这些线程才会被唤醒。

可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。

总结:

ConditionObject由AQS提供,它实现了类似wiat、notify和notifyAll类似的功能。Condition必须与一个独占锁绑定使用,在await或signal之前必须现持有独占锁。Condition队列是一个单向链表,他是公平的,按照先进先出的顺序从队列中被“唤醒”,所谓唤醒指的是完成Condition对象上的等待,被移到Sync锁等待队列中,有参与竞争锁的资格(Sync队列有公平&非公平两种模式,注意区别)。

上一篇 下一篇

猜你喜欢

热点阅读