AQS(AbstractQueueSynchronizer)同步
2019-05-09 本文已影响0人
Wi1ls努力努力再努力
AQS 内部使用一个 int status 表示同步状态,内置一个 FIFO 队列完成资源获取线程的工作。
- 可重写的
方法名称 | 描述 |
---|---|
boolean tryAcquire(int arg) | 独占式获取同步状态。先查询同步状态是否符合预期,而后 CAS 设置 |
boolean tryRelease(int arg) | 独占式释放同步状态。 |
int tryAcquireShared(int arg) | 共享式获取同步状态,返回值大于 0 则成功 |
boolean tryReleaseShared(int arg) | 共享式释放同步状态 |
boolean isHeldExclusively() | 表示锁是否被当前资源独占 |
- AQS 提供的 final method(独占式获取和释放同步状态、共享式获取和释放同步状态、查询同步队列中的等待队列情况)
方法名称 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态。如获取同步状态成功,则返回。否则进入同步等待队列。内部调用 tryAcquire(int arg) |
void acquireInterruptibly(int arg) | 同上。响应中断 |
boolean tryAcquireNanos(int arg, long nanos) | 同上,增加超时 |
boolean release(int arg) | 独占式释放同步状态。释放同步状态后,唤醒同步队列第一个线程 |
void acquireShared(int arg) | 共享式获取同步状态。如果获取失败则进入同步队列。同一时刻可以有多个线程获得同步状态。 |
void acquireSharedInterruptibly(int arg) | 同上,支持中断 |
boolean tryAcquireSharedNanos(int arg, long nanos) | 同上,增加超时限制 |
boolean releaseShared(int arg) | 共享式的释放同步状态 |
Collection<Thread> getQueueThreads( ) | 获得等待在同步列表上的现场集合 |
- 节点
static final class Node{
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
volatile nextWaiter;
}
- 同步队列
- 如果一个线程获取同步失败后,会进入同步队列的队尾。
- 以下是独享式说明。
private Node addWaiter(Node node){
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//先采取 CAS 进行入队,如果失败,则用enq()
if(pred!=null){
node.prev=pred;
if(compareAndSetTail(pred, node)){
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private final boolean compareAndSetTail(Node expect, Node update){
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
//使用 CAS 入队
private Node enq(final Node node){
for(;;){
Node t = tail;
if(t == null){
if(compareAndSetHead(new Node)){
tail = head;
}
}else{
node.prev = t;
if(compareAndSetTail(t, node)){
t.next = node;
return t;
}
}
}
}
private final boolean compareAndSetHead(Node node){
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
- 进入同步队列后,Node 边进行自旋尝试获取同步状态
final boolean acquireQueue(final Node node, int arg){
boolean failed = true;
try{
boolean interrupted = false;
//死循环,只有前驱节点是 head 才会尝试获取同步状态
for(;;){
final Node p = node.predecessor();
if(p == head && tryAcquire(arg)){
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if(shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()){
interrupted = true;
}
}
}finally{
if(failed){
cancelAcquire(node);
}
}
}
当一个同步线程退出同步时
public final boolean release(int arg){
if(tryRelease(arg)){
Node h = head;
if( h!=null && h.waitStatus != 0){
unparkSuccessor(h);
}
return true;
}
return false;
}
private void unparkSuccessor(Node node){
int ws = node.waitStatus;
if(ws < 0){
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next;
if( s == null || s.waitStatus > 0){
s = null;
for(Node t = tail; t != null && t != node; t = t.prev){
if(t.waitStatus <= 0){
s = t;
}
}
}
if(s!=null){
//唤醒线程
LockSupport.unpadk(s.thread);
}
}
@LockSupport
public static void unpark(Thread thread){
if(thread!=null){
UNSAFE.unpark(thread);
}
}
-
等待队列
-
这个队列是用于 Condition 的
-
Object 监视器方法与 Condition 接口对比
-
对比项 | Object Monitor Methods | Condition |
---|---|---|
前置条件 | 获取对象的锁 | |
调用方式 | 直接调用 如:object.waite() | 直接调用 如:condition.await() |
等待队列个数 | 一个 | 多个 |
当前线程释放锁并且进入等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态,在等待状态中不响应中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
- 每个 Condition 内部都包含一个等待队列。
- 下面分析的是 ConditionObject
@AQS$ConditionObject
//await()是将自己放入 ConditionObject 关联的等待队列,同时释放锁
public final void await() throws InterruptedException{
if(Thread.interrupted()){
throw new InterruptedException();
}
//将当前节点即线程加入等待队列
Node node = addConditionWaiter();
//释放同步锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while(!isOnSyncQueue(node)){
LockSupport.park(this);
if((interruptMode = checkInterruptWhileWaiting(node))!=0)}
break;
}
if(acquireQueue(node, savedState) && interruptMode != THROW_IE){
interruptMode = REINTERRUPT;
}
if(node.nextWaiter !=null ){
unlinkCanceledWaiters();
}
if(interruptMode != 0){
reportInterruptAfterWait(interruptMode);
}
}
//加入队列的方法没有做 CAS 是因为能调用此方法说明必然已经拿到了锁,因此不存在竞争。这里的同步是靠锁来完成的
private Node addConditionWaiter(){
Node t = lastWaiter;
if(t!=null && t.waitStatus != Node.CONDITION){
unlinkCanceledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if(t==null){
firstWaiter = node;
}else{
t.nextWaiter = node;
}
lastWaiter = node;
return node;
}
//调用了 release(savedState)释放了锁,于是其他竞争的线程便可以获取锁
final int fullyRelease(Node node){
boolean failed = true;
try{
int savedState = getState( );
if(release(savedState)){
failed = false;
return savedState;
}else{
throw new IllegalMonitorStateException();
}
}finally{
if(failed){
node.waitStatus = Node.CANCELLED;
}
}
}
唤醒等候在同一 Condition 的线程
//唤醒同一个 Condition 下等候线程的队首线程
public final void signal(){
if(!isHeldExclusively()){
// 当前线程必须依据获得了锁,才可以调用 signal( );
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if(first!=null){
doSignal(first);
}
}
private void doSignal(Node first){
do{
if((firstWaiter = first.nextWaiter) == null){
lastWaiter = null;
}
first.nextWaiter = null;
}while(!transferForSignal(first) && (first = firstWaiter) != null)
}
final boolean ransferForSignal(Node node){
if(!compareAndSetWaitStatus(node, Node.CONDITION, 0)){
return false;
}
//将在Condition 等候队列的这个 Node 移动到同步队列的队尾,然后唤醒他。
//在 Condition 等候队列的现场在调用await()后就会退出 while(!isOnSyncQueue(node))的循环,然后尝试去获取同步状态
Node p =enq(node);
int ws = p.waitStatus;
if(ws > 0 || !compareAndSetWaiteStatus(p, ws, Node.SIGNAL)){
LockSupport.unpark(node.thread);
}
return true;
}
//在 await( )调用后进入一个 while(!isOnSyncQueue(node))的死循环,当其他线程调用 signal() 并且唤醒的是当前线程的话,会将当前线程从 Condition 的等待队列移到同步队列,于是退出死循环,进入同步状态的竞争。
final boolean acquireQueued(final Node node, int arg){
boolean failed = true;
try{
boolean interrupted = false;
for(;;){
final Node p = node.predecessor();
if(p == head && reyAcquire(arg)){
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if(shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()){
interrupted = true;
}
}
}finally{
if(failed){
cancelAcquire(node);
}
}
}
SignalAll( ) 就是对 Condition 等待队列的所有节点使用一遍 Signal( )的含义
public final void signalAll(){
if(!isHeldExclusively()){
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if(first != null){
doSignalAll(first);
}
}
private void doSignalAll(Node first){
lastWaiter = firstWaiter = null;
do{
//循环所有的等候队列,将其加入到同步队列并且开始同步状态的竞争
Node next = first.nextWaiter;
first.nextWaiter = null;
transferSignal(first);
first = next;
}while(first != null);
}
同步队列与等待队列