AQS源码分析

2019-04-14  本文已影响0人  HannahLi_9f1c

一、AQS是什么

AQS(AbstratctQueuedSynchronizer)是一个抽象类的同步器,使用了模板方法的设计模式,也就是在抽象类AQS中定义算法的骨架,然后具体的子类需要重写AQS的一些方法,这样就能实现不同场景所需要的同步器,java中的ReetrantLock,ReentrantReadWriteLock等很多锁都是基于AQS实现的。AQS简单来说就是有一个当前工作线程并且维护一个被锁定的线程队列,并且实现独占锁或者是共享锁。
大概原理是这样的


image

二、AQS怎么使用

1. 模板方法

2. 自定义AQS

isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

三、学习源码之前需要了解

1. 线程的状态

线程状态转换

image
image

当线程进入到synchronized方法或者synchronized代码块时,线程切换到的是BLOCKED状态,而使用java.util.concurrent.locks下lock进行加锁的时候线程切换的是WAITING或者TIMED_WAITING状态,因为lock会调用LockSupport的方法。

2. CAS+volatile实现线程安全

(1)对变量的写操作不依赖于当前值。
(2)该变量没有包含在具有其他变量的不变式中。

三、AQS源码分析

1. 数据结构

        static final Node SHARED = new Node();//共享模式的标记
        static final Node EXCLUSIVE = null;//独占模式的标志
        static final int CANCELLED =  1;//表明线程已经取消
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        //一种线程等待状态,需要工作线程唤醒
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;//一种等待状态,表明线程有条件性等待
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:状态域,包括以下值
         *   SIGNAL:     * 这个节点的继任者被阻塞(通过park),所以当前线程当释放或者取消的时候必须要唤醒它的继任者
         为了避免竞争,获取方法需要先表明signal,然后原子性获取。失败之后进入bolck状态
         *   CANCELLED: 
         这个节点由于超时或者中断被取消。节点不释放它的资源。尤其,一个个被取消的节点不会再一次blocks
         *   CONDITION:  
         这个节点当前在一个条件等待队列
         *   PROPAGATE: 
         非负表明节点不需要唤醒。所以,大多数代码不需要检查特点值,只是用来标记
         
         这个参数初始化为0
         condition会初始化为CONDITION,通过CAS修改值,或者无条件地volatile写**/
        volatile int waitStatus;

       
         /*当前工作线程的前继线程
         由于head线程总是不取消,所以一个节点只能通过aquire获取。一个取消的线程不能aquire。一个线程总是自我取消,而不是其他节点来取消*/
        volatile Node prev;

      
        volatile Node next;

    
         /*入队的线程.构造函数初始化,使用后置为null*/
        volatile Thread thread;
        Node nextWaiter;
/**
     等待队列头节点,除了初始化,通过setHead修改。注意:如果头存在,它的等待状态不能是CANCELLED.
     */
    private transient volatile Node head;

    /**
     尾节点,通过enq方法添加新的等待节点
     */
    private transient volatile Node tail;

    /**
     * 资源state.
     */
    private volatile int state;
    /**
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

2. 独占模式下的获取acquire

2.1. acquire

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

我们可以看到这个入口函数是final修饰的,说明设计的时候不希望子类修改这个方法,它先是用tryAcquire获取,如果获取成功就返回;addWaiter将该线程加入等待队列的尾部,并标记为独占模式;acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false;如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

2.2 tryAquire

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

这个方法通过protected修饰,只有子类访问,也就是在创建同步器的时候自定义获取方式。这个AQS是抽象类,所以只抛出异常

2.3 addWaiter

  private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

这个方法先尝试快速入队,如果成功就返回,否则通过enq入队

  private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq主要是通过自旋一直尝试将节点插入尾部,所以可以在快速尝试失败后调用。这个就是CAS自旋volatile的经典使用

2.4 acquiredQueued

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
                    //如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

由于尝试获取资源失败,于是进行等待队列等待别的线程结束唤醒自己。如果自己的前任节点是head,并且获取成功,返回;不然就在waiting中一直等待

2.5

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;//如果已经设置了前结点释放资源后会唤醒它,那么就可以返回休息了
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
             /*
         * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。     */
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

不是SIGNAL,需要加入等待队列等待,同时尝试下看有没有机会轮到自己

2.6 总结

流程如下


image

3. 独占模式下的release

3.1 release

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

直接利用tryRelease释放线程,因为独占模式下只有head线程占有资源,因此直接释放即可。然后要唤醒一个等待线程

3.2 tryRealse

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

这个方法也是需要在子类重写的,父类直接抛出异常。重写的时候如果释放资源成功返回true,否则返回false

3.3 unparkSuccessor

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        //头节点的线程状态<0,那么利用cas把它状态设置为0
        if (ws < 0)
           compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        //从tail往前遍历,寻找最前面需要唤醒的节点,然后unpark唤醒
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

这个函数就是寻找继任线程,并将它唤醒

4. acquireShared

4.1 acquireShared

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

通过tryAcquiredShared获取资源,如果没有获取到,执行doAcquireShared

4.2 tryAcquireShared

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

类似于tryacquire,抽象类中直接抛出异常

4.3 doAcquireShared

 private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        //共享节点加到队列
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            //获取node的前继节点
                final Node p = node.predecessor();
                if (p == head) {
                如果前继是head,看能不能获取资源
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                       //如果还有资源,要唤醒下一个等待节点 setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                20             //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()

                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这个函数试图获取资源,并且将head设置为signal,表示等他释放资源了就能唤醒它的next。这里的资源只能是head.next获取,体现了公平性
4.4 setHeadAndPropagate

 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);//node设置为head
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
            //释放head的资源
                doReleaseShared();
        }
    }

4.5 总结
这个非独占式的获取跟acquire区别不是很大,无非就是线程拿到资源之后如果发现资源有剩余,会唤醒下一个正在等待的线程,而且唤醒的是临近等待的线程,体现了公平性,但是性能会差一点

5. releaseShared

上一篇下一篇

猜你喜欢

热点阅读