线程学习->01AQS锁

2018-02-04  本文已影响0人  冉桓彬
AQS锁的代表是AbstractQueuedSynchronizer, 为什么要创建一种与内置锁如此相似的新加锁机制呢? 在大多数情况下, 内置锁都能很好的工作, 但在功能上存在一些局限性, 例如, 无法中断一个正在等待获取锁的线程, 或者无法在请求获取一个锁时无限等待下去,

关于AQS锁, 打算从ArrayBlockingQueue的源码进行入手分析

// 内部持有的ReentrantLock同时又持有公平锁与非公平锁
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
     /** Main lock guarding all access */
    final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;
}

一、ArrayBlockingQueue为入口

1.1 ArrayBlockingQueue.put生产者
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 1.当前线程通过这里先尝试获取锁;
    lock.lockInterruptibly();
    try {
        // 2.获取锁成功, 判断队列元素是否已满, 如果满了;
        while (count == items.length)
            // 3.通过Condition.await挂起当前线程, 并释放锁;
            notFull.await();
        // 4.将元素入队;
        enqueue(e);
    } finally {
        // 5.释放锁;
        lock.unlock();
    }
}
1.2 ArrayBlockingQueue.take消费者
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 1.获取锁;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 2.队列为空, 获取锁的线程释放锁;
            notEmpty.await();
        // 3.元素出队操作;
        return dequeue();
    } finally {
        // 4.操作完成后线程释放锁;
        lock.unlock();
    }
}

针对put和take操作中每次线程的挂起与唤醒, AQS是如何管理这些线程的?

1. put涉及到的方法:
   (1) ReentrantLock. lockInterruptibly();     模块<二>
   (2) Condition.await();                      模块<三>
   (3) Condition.signal();                     模块<三>
   (4) ReentrantLock.unlock();                 模块<二>
2. take涉及到的方法:
   (1) ReentrantLock. lockInterruptibly();  
   (2) Condition.await();   
   (3) Condition.signal();
   (4) ReentrantLock.unlock(); 

二、ArrayBlockingQueue.put(ReentrantLock.lock与unlock)

1. put涉及到以下几个方法:
   (1) ReentrantLock.lockInterruptibly();
   (2) Condition.await();
   (3) Condition.signal();
   (4) ReentrantLock.unlock();
2.1 ReentrantLock.lockInterruptibly获取锁
// NonfairSync: 非公平锁;
// FairSync: 公平锁;
public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

public void lockInterruptibly() {
    // 获取锁, 所以其实ReentrantLock锁的获取与释放操作其实是交给了Sync
    sync.acquireInterruptibly(1);
}
2.1.1 NonfairSync.acquireInterruptibly
public final void acquireInterruptibly(int arg) throws InterruptedException {
    // 如果当前线程已经被中断, 抛出异常;
    if (Thread.interrupted())
        throw new InterruptedException();
    // 如果线程处于正常运行状态, 此时首先是尝试获取锁, 与公平锁的区别就在这里;
    if (!tryAcquire(arg))
        // 如果当前线程获取锁失败, 则会进入到这里被挂起, 直到其他线程显示唤醒;
        doAcquireInterruptibly(arg);
}

公平锁与非公共平锁的区别就在这里了, 非公平锁是首先通过tryAcquire尝试获取锁.

2.1.2 NonfairSync.tryAcquire尝试获取锁
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // c代表的是锁被同一个线程获取的次数, 可以看成是锁重入的问题;
    int c = getState();
    // 如果当前锁还没有被线程所持有;
    if (c == 0) {
        // 到这里其实线程并没有持有锁, 所以使用cas, 防止在这个过程中其他线程获取了锁;
        if (compareAndSetState(0, acquires)) {
            // 将current也就是当前线程赋值给exclusiveOwnerThread, 如果成功, 可以认为
            // exclusiveOwnerThread就是锁的持有线程;
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 执行到这里说明情况c != 0, c != 0其实是有两种情况:
    // 1. 锁被其他线程持有;
    // 2. 锁已经被当前线程持有;
    // 所以如果锁是被当前线程持有, 根据锁重入的特点, 直接让c++即可;
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 执行到这里说明当前锁被其他线程持有;
    return false;
}
2.1.3 NonfairSync.doAcquireInterruptibly获取锁失败的线程将被挂起
// 这里有几个关键点需要注意一下:
// 1. 线程入队时与Node绑定, 被挂起的线程对应的Node的状态值;
// 2. Head节点的作用;

// 这段代码稍微复杂一些, AQS的一个核心就是在这里, 对于获取锁失败的线程的处理, 所以打算分
// 三段进行分析: addWaiter、addWaiter~ shouldParkAfterFailedAcquire、
// shouldParkAfterFailedAcquire;
private void doAcquireInterruptibly(int arg) throws InterruptedException {
    // 1. 这里先将获取锁失败的线程入队;
    // 2. 返回的这个node其实就是传进来的node;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            // 获取当前线程对应的node(为了简化, 称为node(thread)) 的前置节点, ;
            final Node p = node.predecessor();
            // 通过对addWaiter的分析可知, node(thread)会被插入到当前Node队列的尾端, 而此时
            // 如果node(thread).pre = node(head), 也就是说当前Node队列中只有node(thread)
            // 这一个有效节点, 所以再次考虑获取锁, 如果获取锁成功;
            if (p == head && tryAcquire(arg)) {
                // 如果获取锁成功, 则将node(thread)置为node(head), 其实到这里可以明白了
                // node(head)其实只是占位的作用;
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            // 1.如果p != head: 当前队列中有多个等待线程, 那么node(thread)进行挂起, 这个也是
            //   AQS锁的一个特点;
            // 2.如果p == head 且 tryAcquire = false也就是说node(head)再次获取锁失败;
            // 3. parkAndCheckInterrupt线程被挂起;
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这段代码稍微复杂一些, AQS的一个核心就是在这里, 对于获取锁失败的线程的处理, 所以打算分三段进行分析:

2.1.3.1 NonfairSync.addWaiter获取锁失败的线程进行入队操作
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // pred != null表示当前Node队列已经有值了, 此时要做的就是将当前线程对应的Node插入
    // 到当前Node队列的尾端即可;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }  
    // 如果当前队列没有值, 此时一定要注意创建队列时head与tail节点;
    enq(node);
    return node;
}

private Node enq(final Node node) {
    // 注意下面创建Node队列的过程, 会先创建一个Node(head)节点, 当前线程对应的Node
    // 并没有插入到Node(head)的位置, 而只是插入到Node(head).next位置, 这里先不考虑原因,
    // 只记住这个过程就好;
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
线程进入Node队列的结构如下所示:
2.1.3.2 Node出队操作
for (;;) {
    // 获取当前节点的前置节点;
    final Node p = node.predecessor();
    // 1.这里需要考虑的一个问题是为何当前节点的前置节点是Node(Head)时才会再次尝试获取锁?
    // 2.其实结合上面的图大致可以看出Node(Head仅仅是一个占位节点), 如果当前节点的前置节点
    //   是Node(Head)也是可以看出当前节点前面已经没有等待线程了, 所以当前线程再次尝试获取锁,
    //   这个算是线程被挂起之前的最后一次挣扎了.
    if (p == head && tryAcquire(arg)) {
        // 如果当前线程成功的获取了锁, 此时将当前线程至于Node队列的Head位置, 注意此时成功获取
        // 锁的线程被放置在了Head位置
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return;
    }
}

稍微总结一下, 获取锁失败的线程首先会进入到Node队列, 被添加在Node队列的tail位置, 然后在被挂起之前, 如果当前线程对应的Node位于Head.next位置, 则会再次尝试获取一次锁, 如果仍然失败, 那么此时就会尝试被挂起.
到这里, 其实发现是有自旋锁的影子的, 获取锁失败的线程再次尝试获取锁, 失败之后才会被挂起

2.1.3.3 NonfairSync.shouldParkAfterFailedAcquire线程挂起前的操作
// 1. 需要注意的是每个Node的waitStatus默认值都是0;
// 2.每一个node.waitStatus被设置为Node.SIGNAL其实是根据他的后置节点来触发的.
// 3.一定要注意该方法是在for(;;)中被调用, 所以其实最终会触发return true的执行.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // ws默认值为0;
    int ws = pred.waitStatus;
    // 1.首次进入这里直接跳过该if语句向下执行.
    // 2.到目前为止还是没有发现哪里有对Node node.waitStatus进行赋值的地方.
    if (ws == Node.SIGNAL)
        return true;
    // 在什么情况下ws才会>0????
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
            pred.next = node;
    } else {
        // 这里是将Node pred.waitStatus设置为Node.SIGNAL.
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

对这个方法总结一下:

2.1.4 NonfairSync.parkAndCheckInterrupt线程被挂起
private final boolean parkAndCheckInterrupt() {
    // 线程被挂起;
    LockSupport.park(this);
    return Thread.interrupted();
}
2.2 ReentrantLock.unlock唤醒ReentrantLock.lock失败被挂起的线程

ReentrantLock.lock()被挂起的线程进入沉睡状态以后, 需要ReentrantLock.unlock()进行唤醒.

public void unlock() {
    // 释放锁;
    sync.release(1);
}
2.2.1 NonFairSync.release
public final boolean release(int arg) {
    // 这里需要考虑锁重入的问题, 锁可以被同一个线程持有多次, 所以tryRelease会触发
    // c--操作, 如果c == 0, 表示锁完成被释放, 此时才会考虑唤醒Node队列中等待线程;
    if (tryRelease(arg)) {
        Node h = head;
        // 在前面的分析中也已经知道, 线程被挂起之前waitStatus会被设置为Node.SIGNAL = -1;
        if (h != null && h.waitStatus != 0)
            // 唤醒等待的线程, 这里传入的是head节点;
            unparkSuccessor(h);
        return true;
    }
    return false;
}
2.2.2 NonFairSync.unparkSuccessor
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    //1.唤醒线程时, 其实并没有改变head节点的waitStatus值.
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    //unlock唤醒的其实是Head.next节点, 所以也对应前面lock中所说的, Head节点其实是一个占位节点;
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 仅仅只是唤醒线程, 并没有改变线程对应节点的waitStatus值.
        LockSupport.unpark(s.thread);
}

总结:

三、ArrayBlockingQueue.put(Condition.await与signal)

分析之前需要考虑以下几个问题:

先列出总结:

3.1 Condition.await线程被挂起
public final void await() throws InterruptedException {
    // 线程在被挂起时, 也会进入到一个Node队列中, 这个队列与通过lock方式的队列是两个队列.
    Node node = addConditionWaiter();
    // 执行锁的释放操作, 同时唤醒lock失败被挂起的线程.
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 线程被挂起时waitStatus为Node.CONDITION.
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 线程被唤醒之后, 执行到这里尝试获取锁.
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
3.1.1 Condition.addConditionWaiter线程进入Node(firstWaiter)队列
private Node addConditionWaiter() {
    Node t = lastWaiter;
    Node node = new Node(Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
这段代码执行完成以后, Node队列如下:
3.1.2 Condition.fullyRelease
final int fullyRelease(Node node) {
    try {
        int savedState = getState();
        if (release(savedState))
            return savedState;
        throw new IllegalMonitorStateException();
    } catch (Throwable t) {
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

释放锁, 如果锁被完全释放, 唤醒Node(Head)队列.

3.1.3 NonfairSync.isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
    // 默认情况下node.waitStatus = Node.CONDITION, 然后进入if内部被挂起.
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    return findNodeFromTail(node);
}
3.2 Condition.signal唤醒Condition.await挂起的线程
public final void signal() {
    Node first = firstWaiter;
    if (first != null)
        // 与unlock有点儿类似, 拿到firstWaiter节点进行唤醒.
        doSignal(first);
}
3.2.1 Condition.doSignal
private void doSignal(Node first) {
    do {
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
      //在执行while中的逻辑之前会先将Node从Node(firstWaiter)队列中移除.
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}
3.2.2 NonFairSync.transferForSignal
final boolean transferForSignal(Node node) {
    // 首先将waitStatus更改为Node.CONDITION.
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;
    // 将该Node添加到Node(Head)尾端, 而且此时Node.waitStatus被修改为了0.
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
上一篇下一篇

猜你喜欢

热点阅读