并发编程

AQS

2019-10-21  本文已影响0人  今年五年级

流程图

未命名文件 (7).png

入口

直接new一个ReentrantLock,可以发现创建了一个非公平锁

    public ReentrantLock() {
        sync = new NonfairSync();
    }

而这个非公平锁继承了这个sync

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

即ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的,Sync类继承自AQS

abstract static class Sync extends AbstractQueuedSynchronizer {
}

Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁)

FairSync

调用公平锁的lock方法

static final class FairSync extends Sync {
        final void lock() {
            acquire(1);
        }
// ...
    }

AbstractQueuedSynchronizer# acquire

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

ReentrantLock.FairSync# tryAcquire

尝试获取锁,返回值为true,代表获取成功,可能有两种情况:

  1. 没有现存等待锁
  2. 重入锁,线程本来就持有锁,可以理所当然直接获取
protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
// 1. 到这里说明此事没有线程池有锁,但是因为是公平锁,需要先看看是否CLH同步队列中有线程在等待
                if (!hasQueuedPredecessors() &&
// 2. 如果没有线程在等待,则CAS获取锁,成功了就获取了
// 没有CAS成功的话,说明刚才尝试CAS的时候,另一个线程也在CAS,并且另一个线程抢先获得了锁,则直接走到最下面返回false
                    compareAndSetState(0, acquires)) {
// 3. 获取到锁以后,通知大家,现在是我获取到了锁
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
// 这里说明已经有线程获取到锁了,且获取到锁的线程是当前线程
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

如果没有获取到锁,则直接继续进入AQS的代码,去进入等待队列

image.png

AbstractQueuedSynchronizer# addWaiter

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

AbstractQueuedSynchronizer# enq

    private Node enq(final Node node) {
// 自旋的方式入队
        for (;;) {
            Node t = tail;
// CLH同步队列未初始化
            if (t == null) { // Must initialize
// 初始化一个空的头结点
                if (compareAndSetHead(new Node()))
// 让头指针和尾指针指向这个空节点,这个初始化执行完,因为没有return,就继续自旋
                    tail = head;
            } else {
// 初始化完以后进入这里,这里不断通过CAS尝试,将自己设置为新的尾结点,不成功的话,就继续自旋,继续让前驱指针指向新的尾结点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

执行完上面的操作,开始准备执行acquireQueued这个方法

image.png

关键词

  1. AQS维护的同步队列是一个FIFO先进先出队列
  2. CLH同步队列中的节点的waitStatus等待状态默认是0

CLH同步队列中节点获取锁步骤

假设当前有两个线程,线程A和线程B,线程A已经获得锁,线程B获取锁失败,构建成Node节点进入CLH同步队列,同时通过enq()方法以后,CLH队列中如下图


image.png
final boolean acquireQueued(final Node node, int arg) {
    // 这个时候node节点是上图中的tail节点,也就是线程B所在的节点,arg=1
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 查看当前node节点的前置节点p
            if (p == head && tryAcquire(arg)) {
               // 如果前置节点是头部节点,B线程进行尝试获取锁
               // 如果!是如果!
               // 尝试获取成功了,也就是A线程执行完成,然后设置当前当前可执行的节点是B线程
                setHead(node);
                // 把node节点的线程设置为null,然后head指到node上
                p.next = null; // help GC
                // 回收操作
                failed = false;
                // 返回,然后将执行的将会是B线程
                return interrupted;
            }
            // 按照我们的线程ABC的样例,上面的if肯定不会执行
            // 1、因为即使node的前节点是head,但是也无法获取到锁,因为一直被A线程占有者
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 如果shouldParkAfterFailedAcquire返回true,则挂起,否则不挂起线程
                interrupted = true;
            // 2、第一次循环设置了p节点状态位-1,然后B线程循环至这里被挂起
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// 返回true,挂起,返回false则不挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    
    if (ws == Node.SIGNAL)
        // 前节点的状态=-1,返回true,直接挂起即可
        return true;
    if (ws > 0) {
        // 前置节点状态有>0,说明前驱节点取消了排队,移除掉该节点
        do {
            node.prev = pred = pred.prev;
            // 拆分为2步
            // pred = pred.prev
            // node.prev = pred
        } while (pred.waitStatus > 0);
        // 当前节点开始往前扫描,从双向队列中除去>0的节点,直到<=0
        pred.next = node;
    } else {
        // 利用CAS设置前节点状态位-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

上面代码acquireQueued中如果线程B获取锁失败,则判断是否挂起B,通过B的前驱节点head的等待状态,默认为0,然后将其设置为-1,在第二次获取失败的时候(因此锁还在被线程A占用),直接shouldParkAfterFailedAcquire返回true,并将B线程挂起,直到线程A释放锁,调用unpark唤醒CLH同步队列中第一个等待的有效节点B,此时线程B从836行醒过来,继续执行837行返回,并最终在accquireQueued中获得锁

image.png

节点入同步队列变化

  1. 如果线程A获取到锁,B线程在acquireQueued方法的时候,线程B初始化了head头结点,此时head==tail,并且由于此时的waitStatus没设置,因此java默认设置waitStatus==0
image.png
  1. 线程B入队


    image.png

我们可以看到在线程B入队列以后,此时head结点的waitStatus发生了变化。在
shouldParkAfterFailedAcquire被执行的时候,线程B将前驱结点,即head结点的waitStatus设置为-1

  1. 如果线程C此时再进来,直接插到线程B后面,此时线程C的waitStatus==0
image.png

然后C同样需要执行acquireQueued()。此时的节点是线程C,其前驱节点是线程B,不是head,线程C直接运行到shouldParkAfterFailedAcquire方法中然后通过CAS方法设置B节点waitStatus为-1,并且之后又经过1次循环操作,最后返回true,此时队列为

image.png

总结:enq方法让新的获取锁失败线程进队,acquireQueued方法让新的获取锁失败线程节点的前驱节点的等待状态为-1,并且挂起该新的获取锁失败线程

waitStatus中signal(-1)的意思是:代表后继节点需要被唤醒,即waitStatus记录的不是当前线程节点的状态,而是后继节点的状态。每个node入队的时候,都会将前驱节点的等待状态设置为SIGNAL,然后阻塞,等待被前驱节点唤醒

释放锁

ReentrantLock# unlock

public void unlock() {
        sync.release(1);
    }

AbstractQueuedSynchronizer# release

   public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
// 如果头节点存在,且头节点的waitStatus不为0,即说明后面有节点需要唤醒
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

AbstractQueuedSynchronizer# tryRelease

        protected final boolean tryRelease(int releases) {
// 当前的permit许可数量 - 要释放的数量
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
// 释放许可以后,不再有线程持有锁的话
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

执行完回到下面代码这里,开始执行unparkSuccessor

image.png

AbstractQueuedSynchronizer# unparkSuccessor(唤醒后继节点)

// 参数是头节点,从上图可以看到
  private void unparkSuccessor(Node node) {
        /*
         说明有后继节点需要唤醒,唤醒前,先清除-1的等待状态
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
    // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)那么就从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
 // 唤醒线程
            LockSupport.unpark(s.thread);
    }

节点出同步队列变化

同步队列最开始没有元素的时候,构造了空的头节点,后面构建的新的获取锁失败线程节点等在该空节点后,之前持有锁的线程释放锁以后,唤醒该空的头节点后面的第一个线程B,并且清除该头节点,将唤醒线程所在的节点设置为新的头节点,当唤醒线程B执行结束释放锁的时候,此时清空的之前B节点改造的头节点信息,此时队列中仅仅存在一个C节点,并设置C节点为新的头节点,C节点线程C开始执行

image.png

最后,当同步队列没有线程等待的时候,仅仅剩下了一个thread为空,head,tail指针都指向的空节点,之前的一系列操作等于初始化了该同步队列,下一次再来线程的时候,直接不用进enq,直接入队列CAS-tail

上一篇 下一篇

猜你喜欢

热点阅读