1.AbstractQueuedSynchronizer源码分析

2020-09-11  本文已影响0人  致虑

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer是什么,这么说吧,如果你想自己实现一个同步锁,那么基于AbstractQueuedSynchronizer去做那就再方便不过了;所有要考虑的线程如何阻塞等待(类似于synchronized的等待队列)、独占锁、共享锁、如何唤醒等待的线程重新去竞争等逻辑都帮你封装好了,而你大致只需要去实现如何释放锁获取锁的逻辑即可。

至于AbstractQueuedSynchronizer后续我们简称AQS

简而言之,AbstractQueuedSynchronizer就是一个队列同步器,内部维护了一个共享变量(int state)和等待线程队列(Node链表)帮助实现者去自定义自己的同步工具,而我们具体要做的就只需要实现其指定的几个方法即可:

方法 描叙
protected boolean tryAcquire(int arg) 独占式的获取同步状态,成功返回true,否则false
protected boolean tryRelease(int arg) 独占式的释放同步状态,成功返回true,否则false
protected int tryAcquireShared(int arg) 共享式的获取同步状态,成功返回true,否则false
protected boolean tryReleaseShared(int arg) 共享式的释放同步状态,释放成功返回true,否则false
protected boolean isHeldExclusively() 是否已经独占式获取到同步状态

我们只需要实现以部分上方法即可实现自己的同步锁了,而内部如何进入等待、如何唤醒完全由AQS自己完成,所以整体是非常方便的。


源码分析

我们先从一个小的demo代码段入手

public class MySyncLock extends AbstractQueuedSynchronizer {

    // 直接设置为1
    @Override
    protected boolean tryAcquire(int arg) {
        return compareAndSetState(0, 1);
    }

    // 直接设置为0
    @Override
    protected boolean tryRelease(int arg) {
        setState(0);
        return true;
    }

    // 直接判断是否等于1
    @Override
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }
  
    // 获取锁
    public boolean lock(){
        // 真正获取锁
        this.acquire(1);
    }
  
    // 释放锁
    public boolean unlock(){
        // 真正释放锁
        this.release(1);
    }
}

上面的代码是不是非常简单,这个时候去获取同步锁的时候,直接调用lock或者unlock就可以了。

在进入分析之前,这里提前简要介绍一下AQS里的两个重要结构

private volatile int state;                                     // 同步变量(线程竞争的就是它)
private transient volatile Node head;                   // 指向等待队列的头节点
private transient volatile Node tail;                   // 指向等待队列的尾节点
...
static final class Node {
        static final Node SHARED = new Node();// 共享
        static final Node EXCLUSIVE = null;     // 独占
        
        static final int CANCELLED =  1;            // 节点对应的线程已经被取消了
        static final int SIGNAL    = -1;            // 表示后边的节点对应的线程处于等待状态
        static final int CONDITION = -2;            // 表示节点在等待队列中
        static final int PROPAGATE = -3;            // 表示下一次共享式同步状态获取将被无条件的传播下去

        volatile int waitStatus;                            // 当前状态
        volatile Node prev;                                     // 上一个Node
        volatile Node next;                                     // 下一个Node
        volatile Thread thread;                             // 绑定Node的线程

        Node nextWaiter;                                            // 下一个需要唤醒的Node    
}

获取锁

接下来看一下acquire(int arg) 的代码

public final void acquire(int arg) {
  // 1.限制性用户自定义的同步逻辑,比如上面demo中的compareAndSetState(0, 1);
  // 2.若tryAcquire没成功,则执行addWaiter逻辑进行入队
  // 3.再次尝试执行同步逻辑,若依然没有执行成功,则进行阻塞
  if (!tryAcquire(arg) && 
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

addWaiter具体逻辑-加入队列

private Node addWaiter(Node mode) {
    // 根据当前线程及执行模式构造一个Node节点
    Node node = new Node(Thread.currentThread(), mode);
  
    // 判断当前队列是否为空,若为空,则执行enq逻辑(不用想肯定是初始化队列)
    Node pred = tail;
  
    // 若队列不为空,则CAS将自身插入队列尾部(线程安全的哦)
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 若不为空则执行enq逻辑(初始化队列并执行插入)
    enq(node);
    return node;
}

enq(初始化队列并执行插入)

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 若此时还没有节点,则直接初始化,构造一个空的头节点(头尾指向这个空节点)
            if (t == null) { 
                if (compareAndSetHead(new Node()))
                    tail = head;
            } 
            // CAS再次循环将当前node插入尾部。
            else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued具体逻辑

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;                        // 中断标识,因为下面还要尝试获取一次锁
            for (;;) {
                final Node p = node.predecessor();  // 获取上一个节点
                if (p == head && tryAcquire(arg)) { // 如果上一个节点就是头节点,那再次尝试获取下锁,也省的进入中断等待状态
                    setHead(node);                                  // 如果获取成功,则将自己设置为头节点
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;                         // 返回false,这样在开始逻辑里就不会执行终端了
                }
              
                // 到这里就真的要等待了,返回中断标识
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire

/**  
 * 这段逻辑就是判断是否需要中断,同时这个过程也会移除waitStatus>0的一些节点
 * 1.首先检查上一个节点的waitStatus是否等于-1(前面讲过-1代表这个节点后面的节点都是等待中)
 * 2.如果上一个节点的waitStatus>0(前面也讲过,大于0代表节点已经取消),则移除
 * 3.不断检测上个节点的waitStatus,直到waitStatus==-1,否则一直移除
 * 4.将上个节点状态置为-1
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt

/** 逻辑很简单,就是调用LockSupport进行等待,再往下就是调用UNSAFE的逻辑了 */
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

以上都执行完之后,就执行acquire(int arg)中的selfInterrupt()逻辑了,进行线程中断标识设置。

以上代码注释都很清晰,那下面进行一个总结:

1)acquire尝试获取锁,这里会调用用户自定义同步逻辑tryAcquire进行处理

2)如果tryAcquire失败,就代表其他线程已经将同步状态state占有,那么进行节点构造并加入等待队列

3)加入等待队列的同时,判断如果当前队里就一个节点(即上一个节点是头节点),那再尝试CAS获取state状态

4)如果再次获取失败,则设置中断状态进行中断

5)否则获取同步状态(设置state)成功,获得锁成功

OK,接下来分析一下获取锁的过程


图解一

假如此时队列还是空

[图片上传失败...(image-7589be-1599813925954)]

注意这里waitStatus状态的变化,waitStatus变为-1,代表他的下一个节点处于等待状态。

就是这样一个队列,将没有获取到锁的线程进行了等待阻塞维护,同时当前获取到锁的线程在释放锁的时候,会进行下一个节点的唤醒,让其去竞争获取锁。

acquireInterruptiblyacquire的区别是它是可中断的,也就是说在一个线程调用acquireInterruptibly而发生阻塞之后,别的线程可以对它进行中断,则acquireInterruptibly方法会抛出InterruptedException异常并返回。

tryAcquireNanos也是支持中断的,只不过多了一个超时机制而已。


释放锁

既然有获取锁,当然就要释放锁了,对于线程节点来说,那就是移除当前节点,通知下一个节点成为头节点。

@Override
protected boolean tryRelease(int arg) {
    setState(0);
    return true;
}

// 释放锁
public boolean unlock(){
    // 真正释放锁
    this.release(1);
}

release(int arg) AQS中的逻辑

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

逻辑很简单,先调用用户自定义的释放同步状态逻辑,若没成功,直接返回false,否则进一步处理(返回true),此时如果head节点不为空且状态不为0 ,则进行unparkSuccessor(head);

其实这里应该能想到,当前head节点就是在释放锁的节点,那么如果waitStatus=-1的话,就代表它后面还有节点在等待,因此就需要它尝试去唤醒对方了,所以unparkSuccessor可想而知就是干这事的。

其实这里head节点有没有可能大于0呢?其实不可能的,因为大于0代表被取消,在每次入队的过程中都会进行移除的,前面的入队逻辑有讲。

那我们来看看unparkSuccessor的逻辑:

unparkSuccessor(head)

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
  
            // 设置当前节点状态为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
  
            // 获取下一个节点(正常来讲就是要唤醒的节点)
        Node s = node.next;
  
            // 总有些意外,如果下一个节点正好状态被取消了呢~,那就从尾部反过来遍历吧
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从尾部开始遍历,找到离head节点最近的那个节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;  // 找到了
        }
  
            // 找到了找到离head节点最近的那个节点,进行唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }

上面的逻辑正如上面所述,找到下一个状态waitStatus<0的节点,进行唤醒,注意:这里的下一个节点不一定是next指向的节点哦。

同时要干的一件事就是将下一个节点设置为头节点,移除当前头节点。那么现在队列就成这样了:

AQS6

注意:这里好像release过程没有看到重新设置head节点,并且怎么头节点的thread属性变为null了。

这个过程其实很好理解,再回到入队是的逻辑:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

上面逻辑里有一个parkAndCheckInterrupt()的方法,那么线程就是在这被unsafe阻塞的,既然是唤醒了,那么该唤醒的线程会继续执行这个for循环,继续tryAcquire(arg),这里会发现有个setHead(node)方法:

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

那这里就很清楚,下一个节点被唤醒,在尝试重新去获取state状态时,设置了head为自己,同时将thread、pre等属性置为null。完成了释放唤醒、队列的变更。

以上就是独占方式的讲解


引申

那么共享方式锁只需要与独占方式作出区别就很好理解了。我们知道AQS的主要的核心是state状态的竞争,独占只需要将state资源保持一个,控制只要同时只有一个线程能够获取state即可。而共享就是将state设置为多个,利用消耗机制,来控制共享线程数量即可。

典型的CountDownLatch内部实现逻辑如下,就是判断当前state是否被消耗完了。这里不在做详细介绍。

Sync(int count) {
    setState(count);
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}
上一篇 下一篇

猜你喜欢

热点阅读