多线程与并发(七):AQS原理

2021-09-09  本文已影响0人  lilykeke

1 AQS原理

全称是 AbstractQueuedSynchronizer ,阻塞式锁和相关同步工具的框架

1.1 特点

子类主要实现这样的一些方法(默认抛出UnsupportedOperationException)

获取锁的姿势

//如果获取锁失败
if (! tryAcquire(arg)) {
    //入队,可以选择阻塞当前线程 park unpark 机制
}

释放锁的姿势

//如果释放锁成功
if (tryRelease(arg)) {
    //让阻塞线程恢复运行
}

2. 自定义锁

自定义锁实现 Lock 接口,定义一个内部类继承 AbstractQueuedSynchronizer 类
自定义锁是一个不可重入锁

public class AqsTest {
    public static void main(String[] args) {
        MyLock lock = new MyLock();
        new Thread(()->{
            lock.lock();
            try{
                System.out.println("lock" + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }finally {
                System.out.println("unlock" + Thread.currentThread().getName());
                lock.unlock();
            }
        }).start();

        new Thread(()->{
            lock.lock();
            try{
                System.out.println("lock" + Thread.currentThread().getName());
            }finally {
                System.out.println("unlock" + Thread.currentThread().getName());
                lock.unlock();
            }
        }).start();
    }

}

/**
 * 自定义不可重入锁
 */
class MyLock implements Lock {

    class MySync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0,1)) {
                //表示加上锁了,设置owner为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {

            //注意两个方法顺序
            setExclusiveOwnerThread(null);
           //state 使用volatile修饰,禁止指令重排
            setState(0); //表示解锁

            return true;
        }

        /**
         * 是否持有独占锁
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {

            return getState() == 1;
        }

        public Condition newCondition(){
            return new ConditionObject();
        }
    }

    private MySync sync = new MySync();

    /**
     * 不成功进入等待队列等待
     */
    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
       sync.acquireInterruptibly(1);
    }

    /**
     * 尝试加锁,只会尝试一次
     * @return
     */
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    /**
     * 创建条件变量
     * @return
     */
    @Override
    public Condition newCondition() {
        return null;
    }
}

测试结果:
lockThread-0
unlockThread-0
lockThread-1
unlockThread-1


3. ReentrantLock 原理

ReentrantLock UML类图


reentrantlock_uml.png

3.1 非公平锁实现原理

3.1.1 加解锁流程

加锁流程

没有竞争时

非公平锁-1.png

第一个竞争出现时
Thread-1 执行了

  1. CAS 尝试将state 由 0 改为 1 ,结果失败
  2. 进入tryAcquire逻辑,这时 state 已经是 1 ,结果仍然失败
  3. 接下来进入addWaiter逻辑,构造Node队列
    • 途中黄色三角表示该Node的waitStatus 状态,其中 0 为默认正常状态
    • Node的创建时懒惰的
    • 其中第一个Node成为 Dummy(哑元) 或哨兵,用来占位,并不关联线程
非公平锁-3 (1).png
  1. 当前线程进入acquireQueued 逻辑
非公平锁-4.png 非公平锁-5.png 非公平锁-6.png

解锁流程

1.Thread-0 释放锁,进入tryRelease 流程,如果成功

非公平锁-解锁1.png

2.当前队列不为null , 并且 head 的waitStatus = -1,进入unparkSuccessor 流程

3.回到Thread-1 的acquireQueue流程

非公平锁-解锁2.png

如果加锁成功(没有竞争),会设置

4.如果这时候有其他线程来竞争(非公平的体现),例如这时Thread-4 来了

如果不巧被Thread-4占了先机

源码

先从构造器看,默认为非公平锁实现

public ReentrantLock() {
    sync = new NonfairSync();
}

NonfairSync 继承自 Sync , Sync 继承自AQS

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    } 
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor(); //获取前驱节点
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    //创建一个节点,waitStatus = null
    Node node = new Node(Thread.currentThread(), mode);
    //如果当前队列已经有节点了,直接将该节点加入到AQS队列尾部
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
   //如果当前队列为空,要先初始化,new Node()
    enq(node);
    return node;
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

3.1.2 可重入原理

具体分析见源码

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;


    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}


abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    abstract void lock();

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();

     
        if (c == 0) {
            //当前线程没有获得锁,尝试修改状态0->1
            if (compareAndSetState(0, acquires)) {
               //修改状态成功,表示获得锁。设置owner为当前线程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //如果已经获得了锁,线程还是当前线程,表示发生了锁重入
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            //设置当前的state, 同一个线程锁重入state++
            setState(nextc);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        //c==0时重新设置 owner 返回true,否则只更新state值,返回false
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    //是否独占锁
    protected final boolean isHeldExclusively() {
        // While we must in general read state before owner,
        // we don't need to do so to check if current thread is owner
        return getExclusiveOwnerThread() == Thread.currentThread();
    }
   ...
}

3.1.3 可打断原理

不可打断模式

在此模式下,即使它被打断,仍会驻留在AQS 队列中,等获得锁后方能继续运行(是继续运行,只是打断标记被设置为true)

private final boolean parkAndCheckInterrupt() {
    // 如果打断标记已经是true,则park 会失效
    LockSupport.park(this);
    // interrupted 会清除打断标记
    return Thread.interrupted();
}

//未获得到锁时,调用该方法
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                //还是需要获得锁后,才能返回打断状态
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 如果是因为interrupt 被唤醒,返回打断状态为true
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果打断状态为true
        selfInterrupt();
}

static void selfInterrupt() {
    // 重新产生一次中断
    Thread.currentThread().interrupt();
}

可打断模式

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        // 如果没有获得锁,进入
        doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //在park过程中如果被interrupt 会进入此
                // 这时抛出异常,而不会再次进入 for(;;)
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

4. 公平锁实现原理

非公平锁

final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        //如果还没有获得锁
        if (c == 0) {
            //尝试用CAS获得锁,这里体现非公平性:不去检查AQS队列
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果已经获得了锁,线程还是当前线程,表示发生了锁重入
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        //获取失败,回到调用处
        return false;
    }

公平锁实现

// 与非公平锁的主要区别在于tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 先检查AQS队列中是否有前驱节点
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

5. 条件变量的实现原理

每个条件变量其实就对应着一个等待队列,实现类是ConditionObject

await 流程

开始 Thread-0持有锁,调用await,进入 ConditionObject 的 addConditionWaiter 流程创建新的Node状态为-2(Node.CONDITION),关联 Thread-0,加入等待队列的尾部


条件变量.jpg
public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
       //调用ConditionObject.await() 先进入addConditionWaiter()流程
        Node node = addConditionWaiter();
       //接下来进入AQS的fullyRelease 流程,释放同步器上的锁
       //为什么这里要调用 fullyRelease(node)? 有可能一个线程重入好几次
        int savedState = fullyRelease(node);
        int interruptMode = 0;
       //进入一个循环,判断
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //给当前线程创建一个新节点,连接到链表上
            //waitStatus == Node.CONDITION
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

当前线程添加到 ConditionObject队列中后,当前线程要释放锁

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        //释放锁成功,返回当前状态;若释放失败,设置当前节点的状态为Node.CANCELLED
        //release(savedState) 还会唤醒等待队列中的下一个节点
        //unpark AQS队列的下一个节点,竞争锁,假设没有其他线程竞争,Thread-1竞争锁成功
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

signal 流程

假设Thread-1 要来唤醒 Thread-0

signal.png

进入ConditionObject 的doSignal()流程


/**
 * Moves the longest-waiting thread, if one exists, from the
 * wait queue for this condition to the wait queue for the
 * owning lock.
 *
 */
public final void signal() {
    //如果当前线程没有持有锁,调用signal()抛异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //取出等待队列的头结点
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

/**
 * Removes and transfers nodes until hit non-cancelled one or
 * null. Split out from signal in part to encourage compilers
 * to inline the case of no waiters.
 * @param first (non-null) the first node on condition queue
 */
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

执行 transferForSignal 流程,将该Node 加入 AQS队列尾部,将Thread-0 的waitStatus 改为 0;将Thread-3的 waitStatus 改为 -1

 /**
  * Transfers a node from a condition queue onto sync queue.
  * Returns true if successful.
  * @param node the node
  * @return true if successfully transferred (else the node was
  * cancelled before signal)
  */
//将这个节点从 condition队列转移到 sync队列,如果成功则返回true
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    //首先尝试cas操作将waitStatus 改为0,若失败,直接返回false
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //拼接将该节点到 aqs 队尾。调用 enq()返回该节点的前置节点
   
    Node p = enq(node);
    int ws = p.waitStatus;
   
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true
}

Node节点定义

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

上一篇下一篇

猜你喜欢

热点阅读