JUC并发包的源代码深入剖析程序员

Java同步器——AQS学习

2018-06-21  本文已影响7人  BrightLoong
AQS
阅读原文请访问我的博客BrightLoong's Blog

一. 概述

AQS(AbstractQueuedSynchronizer),队列同步器,Java中很多同步类都是基于AQS实现的,比如:ReentrantLock、Semaphore、CountDownLatch等。

AQS提供了一个实现阻塞锁和相关同步组件的框架,这个框架实现依赖于FIFO(first-in-first-out,先进先出)的等待队列来完成获取资源线程的排队工作。AQS是绝大多数同步器的基础,它的内部使用一个 int 型的变量表示同步状态(资源状态),AQS并不维护这个状态的值,只是提供了一系列的原子更新方法,getState、setState、compareAndSetState,而由继承AQS的子类去重写特定定的方法实现对共享资源的获取和释放,而其他的比如线程排队、线程挂起、线程唤醒出队等已经在AQS中实现好了(典型的设计模式中模板方法模式的使用)。

AQS定义了两种资源共享的方式:

上面提到过AQS需要继承它的子类去重新特定的方法,而不同方式(共享和独占)需要重写的方法也不一样,下面来看看AQS中定义的可以重写的方法:

看看tryAcquire(int arg),发现AQS中并没有把其定义为抽象方法,而是抛出UnsupportedOperationException异常,像上面所说的,不同的共享方式覆盖特定的方法,而不用实现其所不需要的方法,提供了灵活性。

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

二. FIFO等待队列的实现

在具体分析之前,先看看AQS是如何实现FIFO的等待队列的,AQS的等待队列是"CLH" (Craig, Landin, and Hagersten) 队列的一种变体,关于"CLH"锁可以参照——自旋锁&CLH锁&MCS锁学习记录。在每个节点中保存了前后节点的引用,节点中的“waitStatus”字段用于表示线程的状态。节点的前驱节点在释放资源的时候发出信号,通知节点可以竞争资源。如果线程是队列中的第一个线程,则可能尝试获取资源,但是并不保证一定成功,队列中的第一个线程只是具有了竞争资源的权利。

AQS注释中给出的CLH队列结构如下:

           +------+  prev +-----+       +-----+
      head |      | <---- |     | <---- |     |  tail
           +------+       +-----+       +-----+

下面是AQS中等待队列实现的部分代码:

static final class Node {
        //用于共享模式的节点声明
        static final Node SHARED = new Node();
        //用于独占模式的节点声明
        static final Node EXCLUSIVE = null;
        /**一下是waitStatus的值的状态*/
        //表示线程被取消
        static final int CANCELLED =  1;
        //等待触发
        static final int SIGNAL    = -1;
        //线程等待条件
        static final int CONDITION = -2;
        //状态需要向后传播
        static final int PROPAGATE = -3;
        //线程状态,具有上面4个状态
        volatile int waitStatus;
        //前驱节点
        volatile Node prev;
        //后继节点
        volatile Node next;
        //当前线程
        volatile Thread thread;
        Node nextWaiter;

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

三. 独占式

在AQS类注释中,有一段独占锁代码的实现,以下面的例子来分析AQS的源码:

package io.github.brightloong.concurrent.aqs;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Created by BrightLoong on 2018/6/19.
 */
public class Mutex implements Lock, java.io.Serializable {

    // Our internal helper class
    private static class Sync extends AbstractQueuedSynchronizer {
        // Reports whether in locked state
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // Acquires the lock if state is zero
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // Releases the lock by setting state to zero
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // Provides a Condition
        Condition newCondition() { return new ConditionObject(); }

        // Deserializes properly
        private void readObject(ObjectInputStream s)
                throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    // The sync object does all the hard work. We just forward to it.
    private final Sync sync = new Sync();

    public void lock()                { sync.acquire(1); }
    public boolean tryLock()          { return sync.tryAcquire(1); }
    public void unlock()              { sync.release(1); }
    public Condition newCondition()   { return sync.newCondition(); }
    public boolean isLocked()         { return sync.isHeldExclusively(); }
    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

lock——加锁

看上面的代码,加锁lock()函数,调用的是sync.acquire(1),从acquire(int arg)函数入手,依次分析加锁过程中涉及到的函数代码。

acquire(int arg)

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  1. tryAcquire(arg)获取资源,获取失败执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))进行等待
  2. addWaiter(Node.EXCLUSIVE)将独占模式节点添加到等待队列中,添加成功后返回节点
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))节点在队列中获取资源,获取不成功,阻塞节点,直到被唤醒,返回中断状态
  4. 如果发生了中断,进行中断补偿

addWaiter(Node mode)

用于将当前线程加入到等待队列中,并返回当前节点。

private Node addWaiter(Node mode) {
        //按照给定的方式构造队列,上面提到的EXCLUSIVE(独占模式)和Share(共享模式)
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //快速入队,尝试将节点放到队尾,获取前驱节点,如果获取到了表示有线程占用了资源。
        Node pred = tail;
        if (pred != null) {
            //将新建节点的前驱节点设置为获取到的前驱节点
            node.prev = pred;
            //原子更新tail,将tail更新为当前节点,可能更新失败,因为在执行step1前可能有其他线程更新了tail
            if (compareAndSetTail(pred, node)) {//step1
                pred.next = node;
                return node;
            }
        }
        //如果上面的操作执行失败,执行enq(node)
        enq(node);
        return node;
    }
  1. 按照给定的方式构造节点,上面提到的EXCLUSIVE(独占模式)和Share(共享模式)。
  2. 快速入队,如果失败通过 enq(node) 入队(full enq)。

enq(final Node node)

将当前线程对应的节点添加到等待队列中,不断循环直到添加成功。

private Node enq(final Node node) {
        //不断循环,直到节点成功添加到队列中。
        for (;;) {
            Node t = tail;
            //tail等于null,表示当前资源没有被占用
            if (t == null) { 
                //原子操作,初始化head节点,操作可能失败,因为可能有其他线程在这个时候已经初始化成功了。
                if (compareAndSetHead(new Node()))
                    //成功后将tail指向head
                    tail = head;
            } else {
                //如果tail不等于null,将当前节点的前驱节点设置为tail。
                node.prev = t;
                //将tail原子更新成当前节点,可能失败,因为tail可能被其他线程更新。
                if (compareAndSetTail(t, node)) {
                    //构建为双向队列
                    t.next = node;
                    //添加成功,放回当前节点。
                    return t;
                }
            }
        }
    }
  1. 判断tail节点是否为空,如果为空执行1,不为空执行2。
  2. 如果tail节点为空,初始化head节点,并将tail节点指向head节点,执行1。
  3. 如果tail节点不为空,尝试将当前节点添加到队尾,添加成功返回当前节点,否则执行1,直到添加成功。

acquireQueued(final Node node, int arg)

在队列中尝试获取资源,获取失败后判断是否真正需要进入阻塞状态,如果是将阻塞线程,直到被唤醒,并返回中断状态。不断循环,直到获取到资源或者进入阻塞状态等待被唤醒。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取前驱节点
                final Node p = node.predecessor();
                //如果前驱节点为head节点,表示具有竞争资源的机会,使用tryAcquire(arg)获取同步状态
                //如果成功,表示获取到资源,将head设置成当前节点(所以可以认为head其实是当前获取到资源的线程节点,最后始终要执行到这里),返回中断状态为false。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //如果获取资源过程中执行失败,停止获取资源
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire(Node pred, Node node)

获取资源失败后判断线程是否需要真正进入阻塞,只有在前驱节点waitStatus值为SIGNAL,当前节点的线程才需要进入阻塞。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取前驱节点的waitStatus值
        int ws = pred.waitStatus;
        
        //如果状态为SIGNAL,表示当前节点可以进入等待的状态,返回true。
        if (ws == Node.SIGNAL)
    
            return true;
        if (ws > 0) {
        
            //4个状态中大于0的状态是CANCELLED,如果线程已经放弃了,那就是所谓的占着厕所不拉屎(话糙理不糙,哈哈)
            //这个时候就往前找,一直找到,直到找到状态正常的那个节点,并让自己排在它的后面。
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
             //如果状态不是CANCELLED,也就是说状态正常,将前驱节点的状态设置为SIGNAL,有可能失败,
             //前驱状态有可能发生了改变
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt()

终于,在这里线程进入了阻塞,调用 LockSupport.park(this)阻塞线程

private final boolean parkAndCheckInterrupt() {
        //调用LockSupport.park()使线程进入waiting状态。
        LockSupport.park(this);
        //当线程被唤醒,返回中断状态()
        return Thread.interrupted();
    }

流程图

获取资源的流程如下所示,可以看到流程中有两个循环。

AQS

releaseLock——释放锁

从release(int arg)函数开始,一步步分析独占方式锁的释放。

release(int arg)

public final boolean release(int arg) {
        //调用使用者重写的tryRelease方法,若成功,唤醒其后继结点,失败则返回false
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

unparkSuccessor(Node node)

唤醒后继节点

private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        //修改当前节点的状态为0,允许失败
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        
         //一般来说需要唤醒的就是下一个节点,但是下一个节点可能是null
         //或者其状态是取消状态,所以从tail开始先前查找,一直找到状态正常的节点。
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //唤醒线程
            LockSupport.unpark(s.thread);
    }

四. 共享式

AQS类注释中同样提供了一个简单的实现:

package io.github.brightloong.lab.concurrent.cas;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * BooleanLatch class
 *
 * @author BrightLoong
 * @date 2018/6/21
 */
public class BooleanLatch {

    private static class Sync extends AbstractQueuedSynchronizer {
        boolean isSignalled() { return getState() != 0; }

        protected int tryAcquireShared(int ignore) {
            return isSignalled() ? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignore) {
            setState(1);
            return true;
        }
    }

    private final Sync sync = new Sync();
    public boolean isSignalled() { return sync.isSignalled(); }
    public void signal()         { sync.releaseShared(1); }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
}

和CountDownLatch很类似可以多个线程await(),但是只需要调用一次signal() 就可以启动阻塞的线程。

共享模式从 acquireShared(int arg)入手来进行分析,与独占模式不同的是,共享模式下同一时刻可以有多个线程获取到资源执行。

acquireShared(int arg)

获取同步状态。

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
  1. 使用tryAcquireShared(arg) 获取资源状态

    • 返回值大于0,获取同步状态成功,同时还有剩余同步状态可供其他线程获取;
    • 返回值等于0时,获取同步状态成功,但没有可用同步状态了;
    • 返回值小于0时,表示获取同步状态失败。
  2. 获取失败执行doAcquireShared(arg)

doAcquireShared(int arg)

private void doAcquireShared(int arg) {
        //添加共享模式的节点到等待队列中,添加成功后返回当前节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取前驱节点
                final Node p = node.predecessor();
                //如果前驱节点是head,表示可能具有竞争资源的机会,可能head释放资源后来唤醒自己
                if (p == head) {
                   //尝试获取资源,获取同步状态。
                    int r = tryAcquireShared(arg);
                    //大于等于0表示资源获取成功
                    if (r >= 0) {
                        //更新头节点,如果还有资源可用,向后传播,唤醒后继节点。
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            //中断补偿
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //获取资源失败后判断线程是否真正需要挂起,和独占方式相同
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  1. 使用addWaiter(Node.SHARED),添加到等待队列中。
  2. 在队列中获取资源,获取成功后因为是共享式,如果还有资源可用,向后传播,唤醒后继节点。
  3. 获取失败,和独占式一样,检查是否真正需要进入阻塞等待被唤醒或者中断,如果是进入阻塞,如果不是继续循环。

setHeadAndPropagate(Node node, int propagate)

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        //将当前节点设置为head
        setHead(node);
         //同步状态大于0,表示资源还可以被获取,唤醒后继节点
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

因为获取资源成功,将当前节点设置为head,并唤醒后继节点

doReleaseShared()

相对的分析一下 acquireShared(int arg)

 private void doReleaseShared() {
        //因为在共享模式下,获取同步状态和释放同步状态可能同时进行,用CAS保证原子性
        for (;;) {
            //获取head
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //节点状态为SIGNAL,可以唤醒下一个节点
                if (ws == Node.SIGNAL) {
                    //设置waitStatus为初始状态0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //唤醒后继节点
                    unparkSuccessor(h);
                }
                //设置为PROPAGATE,表示可以向后传播
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //head变化了继续循环,共享模式下每唤醒一个后继节点,head//就会指向他,这样就可以保证唤醒所有的能获取到资源的后继节点
            if (h == head)                   // loop if head changed
                break;
        }
    }
  1. compareAndSetWaitStatus(h, Node.SIGNAL, 0)执行失败后继续循环,因为这个函数本身就会被多个线程调用(release和Acquire也是同时),所以状态很有可能被其他线程更改。
  2. compareAndSetWaitStatus(h, 0, Node.PROPAGATE)和上面同理

releaseShared(int arg)

 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            //唤醒后继节点,并且检查是否可以向后传播
            doReleaseShared();
            return true;
        }
        return false;
    }
  1. 调用tryReleaseShared(arg)释放资源,成功返回true
  2. 调用doReleaseShared()唤醒后继节点并进行传播

其他

关于AQS中的ConditionObject,之后再分析

参考

上一篇下一篇

猜你喜欢

热点阅读