JUC并发相关

15. 并发终结之AQS

2020-09-26  本文已影响0人  涣涣虚心0215

AQS是抽象队列同步器,基于模板方法模式提供了很多锁相关的方法,前面我们分析的Condition,ReentrantLock,CountDownLatch、Semaphore等都是基于AQS来实现的。
AQS中维护了一个volatile int state变量,以及基于Node对象的FIFO线程等待队列(sync queue)。

NonFair非公平锁具体实现

abstract static class Sync extends AbstractQueuedSynchronizer {
       //NonFair提供lock的实现
        abstract void lock();
       //NonFaire尝试获取锁
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        //尝试释放锁
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    }
    static final class NonfairSync extends Sync {
        //lock的具体实现
        final void lock() {
            //这里不管是不是有前继节点,上来就开始CAS设置state,设置成功则抢锁成功
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //如果开始抢锁不成功,则调用AQS的acquire方法
                acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

========
//AQS实现acquire方法 -- lock
public final void acquire(int arg) {
     //这里tryAcquire是NonFair里面的实现,nonfairTryAcquire还是直接开始CAS更新state抢锁
     //如果抢锁失败,则addWaiter新建一个EXCLUSIVE的node加到sync queue的末尾
     //接着调用acquireQueued来循环尝试加锁
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}   
//Condition里面也有分析,再次看下这个acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
                         //拿到当前node的前继节点,如果前继节点是head,才会尝试获取锁
            if (p == head && tryAcquire(arg)) {
                                //获取锁成功后,表示当前线程可以执行,就将当前node设置为head节点,并剔除原来的head
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }  
                        //如果前继节点不是head,或者tryAcquire获取锁失败
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
//在前继节点不是head,或者tryAcquire获取锁失败,就要看前继节点的状态,来确定是否需要挂起当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
        //如果前继节点是SIGNAL,则表示当前线程可以挂起等待前继节点唤醒
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
          //ws>0表示cancelled,则需要循环遍历剔除cancelled的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
                //尝试将前继节点设置为SIGNAL,因为是再循环里,这里设置成功,下次循环就会挂起当前线程,否则会继续循环判断前继节点
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
//AQS实现release方法 -- unlock
public final boolean release(int arg) {
    //调用ReentrantLock里面Sync提供的tryRelease方法
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
           //拿到头结点,并且头结点的waitStatus不是0,则唤醒一个后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}
//唤醒后继节点
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
                //head节点状态<0可能SIGNAL或者CONDITION状态,设置到0
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
        //那到后继节点,如果状态>0则是cancelled状态, 需要遍历剔除这些node,直到找到<=0的node
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
                //找到状态<=0的node之后,unpark唤醒这个node所对应的线程
        LockSupport.unpark(s.thread);
}

在下面栗子里,lock1-thread线程第一个获取锁,获得锁之后sleep一段时间,然后lock2-thread以及lock3-thread也开始获取锁,获取锁失败。

public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();
        new Thread(()->{
            lock.lock();
            try{
                Thread.sleep(100000); //获取锁之后sleep,但是不会释放锁
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "lock1-thread").start();
        Thread.sleep(10000);
        new Thread(()->{
            lock.lock();
            try{
                System.out.println("lock2 invoked");
            } finally {
                lock.unlock();
            }
        }, "lock2-thread").start();
        Thread.sleep(10000);
        new Thread(()->{
            lock.lock();
            try{
                System.out.println("lock3 invoked");
            } finally {
                lock.unlock();
            }
        }, "lock3-thread").start();
    }
运行结果
可以看到这里lock2-thread和lock3-thread都获取不到锁,变成WAITING状态(这个是与synchronized有区别的,同样的逻辑用synchronized,获取不到锁时BLOCKED状态),直到lock1-thread调用unlock方法,找到sync queue里不是cancelled状态的后继节点进行unpark唤醒。
CLH FIFO队列
上面的图是AQS加锁过程中FIFO队列生成情况,我们总结一下加锁的过程:
  1. lock1-thread线程调用lock()方法,因为默认是NonFair非公平锁实现,所以调用NonFair.lock()方法,进行CAS设置state的值,设置成功表示获取锁成功,然后进入sleep()。
  2. lock2-thread线程开始调用lock()方法,同样调用NonFair.lock(),此时lock1-thread持有锁,那么此时CAS设置state失败。
    2.1 进入acquire(1)方法, acquire方法会调用tryAcquire()->nonfairTryAcquire()方法再次尝试CAS更新state,同样还是失败。
    2.2 进入acquireQueued(addWaiter(Node.EXCLUSIVE)流程,这里首先调用addWaiter()方法添加一个包含lock2-thread的Node,并添加到FIFO的尾部tail,注意这里会先new Node()作为head。
    2.3 节点添加完之后,acquireQueued()方法开始for循环,拿到当前节点的前继节点,如果前继节点是head,就开始尝试tryAcquire()获取锁。如果获取锁成功,就将当前节点设置为head节点,并剔除原来的head节点,当前线程获取锁之后继续执行;如果获取锁失败,调用shouldParkAfterFailedAcquire()方法,查看前继节点的waitStatus是不是SIGNAL,如果是,则挂起lock2-thread;如果不是还要看状态是不是CANCELLED,如果是,就遍历下一个节点直到找到waitStatus<=0的节点;如果不是CANCELLED,就CAS设置前继节点的状态到SIGNAL;然后开始下一次for循环,下一次循环还不能获取到锁,基本上都会走到挂起当前线程的代码(acquireQueued过程是不响应中断的,只有获取资源后才能判断当前线程是否中断)。
  3. lock3-thread同理lock2-thread,获取不到锁之后会CAS设置前继节点的状态到SIGNAL,然后挂起当前线程,等待unlock唤醒。

Synchronized和AQS的区别

Synchronized:

  1. Synchronized关键字是底层C++实现,存在两个重要的数据结构Entry Set和Wait Set。
  2. Wait Set中存放的是调用了Object.wait()方法的线程对象(被封装成C++的Node对象)。
  3. Entry Set中存放的是争抢锁失败陷入到阻塞状态,需要获取monitor的那些对象。
  4. 当一个线程被notify之后,它就会从Wait Set中移动到Entry Set中。
  5. 当进入到Entry Set后,该线程依然需要与其他线程争抢monitor对象(ReentrantLock的非公平实现)。
  6. 如果争抢到,表示该线程获取到了对象的锁,它就可以以排他方式执行对应的同步代码。

AQS:

  1. AQS也存在两种队列,分别是ConditionObject上的条件队列condition queue,以及AQS本身的阻塞队列sync queue。
  2. 这两个队列中的每一个对象都是Node实例(里面封装了Thread对象)
  3. 当位于Condition条件队列的线程被其他线程signal之后,该线程就会从condition queue中移动到sync queue,这里await方法会继续尝试获取锁(公平或者非公平)。
  4. 位于AQS阻塞队列中的Node对象本质是由一个双向链表来构成。
  5. 在获取锁时,这些进入到sync queue中的线程会按照在队列中排序先后尝试获取。
  6. 当sync queue的线程获取到锁后,就表示该线程已经可以正常执行了。
  7. 陷入到阻塞状态的线程,依然需要进入到操作系统的内核态,进入阻塞(LockSupport.park方法实现)
上一篇 下一篇

猜你喜欢

热点阅读