AbstractQueuedSynchronizer源码解析
大家都亲切地称呼这玩意为AQS,作者写了注释哒:
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state. Subclasses
* must define the protected methods that change this state, and which
* define what that state means in terms of this object being acquired
* or released. Given these, the other methods in this class carry
* out all queuing and blocking mechanics. Subclasses can maintain
* other state fields, but only the atomically updated {@code int}
* value manipulated using methods {@link #getState}, {@link
* #setState} and {@link #compareAndSetState} is tracked with respect
* to synchronization.
我来翻译一下就是,提供了一个用来实现阻塞锁和相应的synchronizer(信号量,事件等等)的框架,这个框架基于先进先出的等待队列,这个类是被设计来作为大多数synchronizer的基础,它依靠一个具有原子性的int值去代表状态(这个状态我理解的就是当前被控制的对象的状态,比如你一个锁的状态),它的子类必须定义一个protected的方法来改变这个状态,这个状态它定义了几个,比如什么值代表它是正在被占有,什么值代表它已经被释放等等,子类也可以自己定义一些自己的状态。但是关于这个状态的改变还是很重要的,因为它的值也是应该受到保护的,不能让多个线程同时操作以免意外情况的发生。
看看它有哪些字段:
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
啊,就这些,其中Node这个数据结构也是定义在AQS里面的,这个Node就是用来存储等待的线程们的:
static final classNode {
static finalNodeSHARED=newNode();
static finalNodeEXCLUSIVE=null;
static final intCANCELLED=1;
static final intSIGNAL = -1;
static final intCONDITION= -2;
static final intPROPAGATE= -3;
volatile intwaitStatus;
volatileNodeprev;
volatileNodenext;
volatileThreadthread;
NodenextWaiter;
final booleanisShared() {
returnnextWaiter==SHARED;
}
finalNodepredecessor()throwsNullPointerException {
Node p =prev;
if(p ==null)
throw newNullPointerException();
else
returnp;
}
Node() {// Used to establish initial head or SHARED marker
}
Node(Thread thread,Node mode) {// Used by addWaiter
this.nextWaiter= mode;
this.thread= thread;
}
Node(Thread thread, intwaitStatus) {// Used by Condition
this.waitStatus= waitStatus;
this.thread= thread;
}
}
很明显的一个双向链表结构,这里我们可以看到有两个特别定义的Node,一个是SHARED,一个是EXCLUSIVE,他们代表了线程对于资源控制的两种不同的模式,SHARED代表共享模式,EXCLUSIVE代表独占模式。
首先要说一下为什么需要分这些模式,这要涉及到程序对于某些对象的操作其实分为读和写,读的话,多少个线程来读都OK,对吧,大家读的都是一样的东西,没必要只让一个线程读,其他线程还得等,性能多差啊,这就是共享模式的作用,让大家在执行读操作的时候一起~
独占模式就更容易理解了,你的操作涉及到对对象的修改(并且这部分操作是原子操作,也就是它的执行逻辑上是得要么一起做完要么没开始做的),那么多线程是不是会破坏它的原子性?这也是我们使用锁的原因,而且这个时候只能让一个线程先做完这部分操作,再让其他线程接着做,这就是独占模式啦。
第一部分 独占模式
好,那么我们来分析一波独占模式是怎么独占到资源的,入口函数:
public final voidacquire(intarg) {
if(!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
这个tryAcquire():
protected booleantryAcquire(intarg) {
throw newUnsupportedOperationException();
}
可以看到是个只会抛异常的空的函数,里面什么也没有,因为我们AQS只是个基础类嘛,不会把所有东西都实现,这个定制化的任务交给实现它的子类,简而言之,它的功能是去尝试获取资源啦,然后是acquireQueued()方法:
final booleanacquireQueued(finalNode node, intarg) {
booleanfailed =true;
try{
booleaninterrupted =false;
for(;;) {
finalNode p = node.predecessor();
if(p ==head&& tryAcquire(arg)) {
setHead(node);
p.next=null;// help GC
failed =false;
returninterrupted;
}
if(shouldParkAfterFailedAcquire(p,node) &&
parkAndCheckInterrupt())
interrupted =true;
}
}finally{
if(failed)
cancelAcquire(node);
}
}
这个方法呢,做的就是当一个线程节点获取资源成功(它必须是在线程等待队列的第二个节点,因为有判断node.predecessor()==head),至于为什么,可能是为了公平吧,毕竟先进队列的等的更久, 成功后,就把当前线程节点node设置为头节点然后将之前的p也就是头结点的next设为null, 目的是让它不被任何其他对象引用,下次GC时就会被回收掉,并且返回interrupted,如果否,这样在acquire(intarg)方法中就不会去调用selfInterrupt()方法了。如果获取资源失败了,就会首先判断执行shouldParkAfterFailedAcquire(p,node)这个方法(用来确定是否是要中断线程):
private static booleanshouldParkAfterFailedAcquire(Node pred,Node node) {
intws = pred.waitStatus;
if(ws == Node.SIGNAL)
return true;
if(ws >0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do{
node.prev= pred = pred.prev;
}while(pred.waitStatus>0);
pred.next= node;
}else{
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred,ws,Node.SIGNAL);
}
return false;
}
这个方法是去判断他的前驱节点的状态,如果是Signal的话,代表了它前面那个节点release的时候是会正常通知下个节点的所以它就能放心的阻塞自己。
如果说前驱节点状态是Cancel那么就得忽略掉前面的节点,把前驱节点的前驱节点作为当前节点的前驱节点(好拗口,反正就是如果前驱节点是Cancel的,就跳过它再往前,这个连等是自右向左赋值的,大家自己看代码)
然后如果前驱节点的状态不是以上两种(那有可能是CONDITION或PROPAGATE),就讲这个前驱节点的状态设置为Node.SIGNAL。
如果这个方法返回的是true那么则会执行parkAndCheckInterrupt()方法:
private final booleanparkAndCheckInterrupt() {
LockSupport.park(this);
returnThread.interrupted();
}
这个方法很简单,就是将线程先停止一下,具体呢?大家可以看我写的另一篇关于LockSupport类的文章哈(https://www.jianshu.com/p/6bffd19cb900不是我打广告,反正你们也不给钱,而是你不看你都不知道LockSupport.park(this)这个方法干了些什么,如果我在这来讲一遍,那就又跑题啦)~
然后就返回这个线程是否被暂停,那么返回acquireQueued()方法,如果parkAndCheckInterrupt()返回的是true,那么在acquireQueued()方法中就会把interrupted这个变量赋值为true,在acquireQueued()中是一个循环,就是循环去tryAquire()获取资源,方法返回值就是这个interrupted, 如果是true代表线程已经被暂停了,如果是false代表没有被暂停。
然后注意到还有
finally{
if(failed)
cancelAcquire(node);
}
这部分代码,表示最终如果都没能获取这个资源的话,就执行cancelAcquire()方法:
private voidcancelAcquire(Node node) {
// 如果节点是null的话就忽略
if(node ==null)
return;
node.thread=null;
// 跳过已经是cancel状态的前置节点
Node pred = node.prev;
while(pred.waitStatus>0)
node.prev= pred = pred.prev;
Node predNext = pred.next;
node.waitStatus= Node.CANCELLED;
if(node ==tail&& compareAndSetTail(node,pred)) {
compareAndSetNext(pred,predNext, null);
}else{
intws;
if(pred !=head&&
((ws = pred.waitStatus) == Node.SIGNAL||
(ws <=0&&compareAndSetWaitStatus(pred,ws,Node.SIGNAL))) &&
pred.thread!=null) {
Node next = node.next;
if(next !=null&& next.waitStatus<=0)
compareAndSetNext(pred,predNext,next);
}else{
unparkSuccessor(node);
}
node.next= node;// 帮助GC
}
}
哎哟那么多行代码看得脑壳疼,看看它都做了什么,一句话概括,它完了,它会把它自己从这个Node链中移除,让自己等着被GC回收,大家可以看到这里面调用了两次compareAndSetNext()方法:
private static final booleancompareAndSetNext(Node node,
Node expect,
Node update) {
returnunsafe.compareAndSwapObject(node,nextOffset,expect,update);
}
其实就是调用Unsafe的方法通过CAS的方式来设置Node的next节点,就是把当前节点的前面节点指向当前节点的后面节点:
这里注意还有一个方法unparkSuccessor(),这个方法激活它后面的节点:
private voidunparkSuccessor(Node node) {
intws = node.waitStatus;
if(ws <0)
compareAndSetWaitStatus(node,ws,0);
Node s = node.next;
if(s ==null|| s.waitStatus>0) {
s =null;
for(Node t =tail;t !=null&& t != node;t = t.prev)
if(t.waitStatus<=0)
s = t;
}
if(s !=null)
LockSupport.unpark(s.thread);
}
可以看到,如果当前节点的状态不是CANCELLED,那就设置当前的状态为0,然后下一步,操作它的下一个节点,前提是这个节点不为null并且它的状态不为CANCELLED, 如果为null或者CANCELLED,又要进行下一波操作,将Node由尾向前遍历,找到最靠前的那个不为null并且不是当前节点本身并且状态不为CANCELLED的节点,这个由后向前遍历的操作是有点骚的啊,大家体会一下,因为它的next都为null啦,所以没法从next开始遍历,于是往前遍历了。
言归正传,找到下一个满足条件的后节点后,就会通过LockSupport.unpark(s.thread)方法将其唤醒~也就是轮到下个节点的线程进行工作的时候啦。
走到了这里,我们就可以回到acquire()方法啦, 我们是不是沿着acquire()的脚步走了一圈?咦,漏了addWaiter(Node.EXCLUSIVE)这个方法:
privateNodeaddWaiter(Node mode) {
Node node =newNode(Thread.currentThread(),mode);
Node pred =tail;
if(pred !=null) {
node.prev= pred;
if(compareAndSetTail(pred,node)) {
pred.next= node;
returnnode;
}
}
enq(node);
returnnode;
}
这个方法返回的值是一个Node节点,这个返回值是作为acquireQueued()参数传入的,它接受的参数也是个Node,我们来看看这个方法的输入和输出有啥区别~
简而言之当这个AQS实例的tail这个节点不为空时,它会将参数这个节点加入到AQS实例的Node链表中(加到末尾),你可以看到通过compareAndSetTail()方法(我不再细讲这个方法了哈,其实就是调用的Unsafe的方法,本质是CAS来保证原子性)来重新设置了AQS实例的尾节点,并返回这个节点。
如果这个AQS实例的tail是空的话,就会执行一个方法enq(node):
privateNodeenq(finalNode node) {
for(;;) {
Node t =tail;
if(t ==null) {// Must initialize
if(compareAndSetHead(newNode()))
tail=head;
}else{
node.prev= t;
if(compareAndSetTail(t,node)) {
t.next= node;
returnt;
}
}
}
}
可以看到这个方法里有一个循环,在这里面又会去取到AQS实例的tail节点,再去判断它是否为空,如果是空的,新建一个Node节点赋给头结点,再把头节点赋给尾节点,其实就是初始化head和tail,然后tail就不为null了嘛,就会走进else这个分支,所以这里的for(;;)操作也很骚气~它会一直等到tail不是null的时候return。else做的操作就很浅显易懂了,就是讲要添加的node节点赋给tail作为整个链表的尾节点然后返回的就是当前节点的前一个节点(但这里调用enq()时并没有接这个返回值,只是调用而已)。
然后是acquire()方法中的最后一个方法selfInterrupt():
static voidselfInterrupt() {
Thread.currentThread().interrupt();
}
其实就是设置中断的状态为true,因为acquire里有个判断acquireQueued(addWaiter(Node.EXCLUSIVE), arg),这里返回的是线程是否被中断了,如果被中断了才执行selfInterrupt,目的是将这个中断状态传递出去,关于线程的中断机制,我理解了老半天,感觉网上搜到文章都不太地道,所以我决定写一篇文章专门讲这个,敬请期待~
那么独占模式下AQS获取资源的整个流程就走完了,是不是还晕晕的,是的!正常!那我们来总结一下流程(从http://www.javarticles.com/2012/10/abstractqueuedsynchronizer-aqs.html#prettyPhoto盗的一张图~懒得自己画了,谅解一下~):
一句话总结就是:线程会去尝试独占资源,如果成功就是将threadowner设为它自己然后接着做想做的事情,如果不成功就将这个线程放入一个先入先出的队列等待,这是站在线程的角度看,它被放进了一个队列,站在队列的角度看,它还会让队列里满足条件的线程去持续尝试独占资源,如果占成功了那就行了呗,没有的话就看要不要把线程给暂停了,如果暂停了就等待被唤醒啦,如果没暂停的就再尝试占有资源,一直就这样~(这一句话貌似有点长=。=)
好~获取资源部分终于差不多了,获取了之后总要释放呗,下面就讲独占资源的释放~
资源的释放呢是调用的release方法:
public final booleanrelease(intarg) {
if(tryRelease(arg)) {
Node h =head;
if(h !=null&& h.waitStatus!=0)
unparkSuccessor(h);
return true;
}
return false;
}
其中tryRelease(arg)这个就跟tryAcquire()一样是由继承它的子类实现的(如果想看例子请看ReentrantLock中的tryRelease方法),想想,这个方法也是当前线程来执行的对吧,那就意味着,当前线程此刻是运行的状态,这个就会通过unparkSuccessor(h)方法唤醒头节点的下一个节点,这个unparkSuccessor(h)方法前面已经分析过了,反正就是唤醒下个节点。
然后释放的工作就结束啦~
说完独占模式后该说共享模式啦~期待得搓手手~
第二部分 共享模式
还是按照套路,将获取资源的方法走一遍~
共享模式获取资源调用的是acquireShared方法:
public final voidacquireShared(intarg) {
if(tryAcquireShared(arg) <0)
doAcquireShared(arg);
}
tryAcquireShared大家应该能猜到是由子类实现了,也就是我们自己写实现,反正只要它小于0,我们实现的时候就要注意了,小于0在这里就要代表没有获取成功,这样就会执行下面的操作 doAcquireShared(arg):
private voiddoAcquireShared(intarg) {
finalNode node = addWaiter(Node.SHARED);
booleanfailed =true;
try{
booleaninterrupted =false;
for(;;) {
finalNode p = node.predecessor();
if(p ==head) {
intr = tryAcquireShared(arg);
if(r >=0) {
setHeadAndPropagate(node,r);
p.next=null;// help GC
if(interrupted)
selfInterrupt();
failed =false;
return;
}
}
if(shouldParkAfterFailedAcquire(p,node) &&
parkAndCheckInterrupt())
interrupted =true;
}
}finally{
if(failed)
cancelAcquire(node);
}
}
首先同样会执行addWaiter(Node.SHARED),这个方法之前讲了嘛,独占模式也有,就是讲当前线程所在节点加入到等待队列里面,唯一不同的是这里设置的nextWaiter是Node.SHARED而已。然后进入循环,如果当前节点的前置节点是头节点的话,会去接着尝试tryAcquireShared(arg)试图获取资源,如果成功,则执行setHeadAndPropagate方法:
private voidsetHeadAndPropagate(Node node, intpropagate) {
Node h =head;// 保存旧的head用于下面检查
setHead(node);
if(propagate >0|| h ==null|| h.waitStatus<0||
(h =head) ==null|| h.waitStatus<0) {
Node s = node.next;
if(s ==null|| s.isShared())
doReleaseShared();
}
}
它这个方法里有一串判断条件:propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0
翻译:以下条件满足其一:
1. propagate也就是tryAcquireShared方法的返回值要大于0,也就是要获取资源成功
2. 之前的头节点是空
3. 之前的头节点的等待状态不是CANCELLED
4. 当前头节点为空
5. 当前头节点等待状态不是CANCELLED
好,满足这几个要求中的一个就能走到if里面去啦,看看它做了什么呢?
首先找到了当前节点的下一个节点,如果它是空的或者它的模式是共享模式,那么执行doReleaseShared():
private voiddoReleaseShared() {
for(;;) {
Node h =head;
if(h !=null&& h !=tail) {
intws = h.waitStatus;
if(ws == Node.SIGNAL) {
if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))
continue;
unparkSuccessor(h);
}
else if(ws ==0&&
!compareAndSetWaitStatus(h,0,Node.PROPAGATE))
continue; // CAS失败后继续循环
}
if(h ==head)// 如果头节点不一样了则循环继续,否则跳出循环
break;
}
}
哇哦,又是一个循环,我们可以看到,跳出循环的条件是h == head, 然鹅,h = head,那么我们就有两个猜想,第一,这个head可能是在多线程的环境下发生了变化,不然在单线程的条件下,h == head是永远成立的, 第二,在上面这段:
if(h !=null&& h !=tail) {
int ws = h.waitStatus;
if(ws == Node.SIGNAL) {
if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))
continue;
unparkSuccessor(h);
}
else if(ws ==0&&!compareAndSetWaitStatus(h,0,Node.PROPAGATE))
continue; // CAS失败后继续循环
}
操作中可能有对head的修改,那么我们先来看看这段代码有没有修改head。
当h不是空并且也不是最后一个节点的情况下,我们拿到他的等待状态,如果是SIGNAL(代表了它后面的节点需要被唤醒),然后我们就去将它的等待状态设为0(初始状态,表示后面没有节点等待被唤醒),并unparkSuccessor(唤醒后面的节点),这个方面前面也讲了,记不住翻上去看,然后如果节点的等待状态是0的话就回去设置它的状态是Node.PROPAGATE,但是这个方法也没有改变过head节点。
那么就意味着我们第一个猜想是正确的,head可能在多个线程的执行过程中发生了变化,那么head没变就代表了已经没有多个线程在这同时执行这段代码了(多个线程试图获取资源),也就可以退出循环了。
跳出之后,我们可以回到doAcquireShared方法了,我们已经唤醒了后面的共享模式的节点,然后我们就把p(头节点从等待队列中删除)
if(shouldParkAfterFailedAcquire(p,node) &&
parkAndCheckInterrupt())
interrupted =true;
这段代码很熟悉了,独占模式下也有,就是判断当获取资源失败时是否将线程中断,并检查线程是否被中断,赋值给interrupted。
如果是被中断了的话,就执行selfInterrupt方法重置线程的中断标志为true,然后就可以跳出循环了。反正不管跳不跳出循环,最终是要执行下面这段代码的:
finally{
if(failed)
cancelAcquire(node);
}
cancelAcquire()这个方法我们前面也讲了,就是将当前线程给取消掉,一定条件下去唤醒后面的节点。然后就完事~是不是很简单,过程基本上和独占的差不多,只是它会执行setHeadAndPropagate这个方法,也就是如果它的下个节点也是SHARED模式的话,它会将它的下个节点也唤醒。
好的大概讲一下流程,白话讲哈,比较容易理解,就是线程首先会去获取资源嘛,如果获取到了就占有它,如果没获取到就加入等待队列的末尾嘛,然后队列中的线程都会去判断一下,它前面那个节点是不是头节点,因为头节点就是占有着资源的那个节点嘛,头节点完了就可以轮到它了,所以它就不挂起自己,接着请求,重复请求等着前面节点释放资源嘛,释放了资源它就紧接着占有资源啦,占有后,共享模式会把自己线程的节点设为头节点了嘛,之前的头已经完成移出队列了,然后现在就它自己是头啦,它会去检查自己后面那个节点是不是也是SHARED共享模式的,如果是,它就把它唤醒!当然也有线程挂起的时候啦,比如它前面的节点并不是头节点的时候,那么这个时候要把它前面那个节点的状态设为Node.SIGNAL,这样子当他前面节点被唤醒,执行完成要将资源交出去的时候才会去唤醒它后面的节点,不然的话后面的节点就不能够被唤醒哦,所以Node.SIGNAL很重要啦~
整个过程大概就是这样子的,共享模式的释放也很简单跟独占模式类似所以也不讲啦~
呼~好累,但是还是要多说两句,这个AbstractQueuedSynchronizer所有方法我们都要考虑到是很多个线程同时在执行的,虽然有的阻塞,有的在执行,时刻这样想着有助于我们理解这个代码的一些设计~
最后就是,我也是自己试图去把自己的理解写出来的,有什么问题和建议都请留言,批评就算了,写这个很累的,况且仙女不接受批评~