AQS(AbstractQueueSynchronizer)同步

2019-05-09  本文已影响0人  Wi1ls努力努力再努力

AQS 内部使用一个 int status 表示同步状态,内置一个 FIFO 队列完成资源获取线程的工作。

方法名称 描述
boolean tryAcquire(int arg) 独占式获取同步状态。先查询同步状态是否符合预期,而后 CAS 设置
boolean tryRelease(int arg) 独占式释放同步状态。
int tryAcquireShared(int arg) 共享式获取同步状态,返回值大于 0 则成功
boolean tryReleaseShared(int arg) 共享式释放同步状态
boolean isHeldExclusively() 表示锁是否被当前资源独占
方法名称 描述
void acquire(int arg) 独占式获取同步状态。如获取同步状态成功,则返回。否则进入同步等待队列。内部调用 tryAcquire(int arg)
void acquireInterruptibly(int arg) 同上。响应中断
boolean tryAcquireNanos(int arg, long nanos) 同上,增加超时
boolean release(int arg) 独占式释放同步状态。释放同步状态后,唤醒同步队列第一个线程
void acquireShared(int arg) 共享式获取同步状态。如果获取失败则进入同步队列。同一时刻可以有多个线程获得同步状态。
void acquireSharedInterruptibly(int arg) 同上,支持中断
boolean tryAcquireSharedNanos(int arg, long nanos) 同上,增加超时限制
boolean releaseShared(int arg) 共享式的释放同步状态
Collection<Thread> getQueueThreads( ) 获得等待在同步列表上的现场集合

static final class Node{
  volatile int waitStatus;
  volatile Node prev;
  volatile Node next;
  volatile Thread thread;
  volatile nextWaiter;
}
private Node addWaiter(Node node){
  Node node = new Node(Thread.currentThread(), mode);
  Node pred = tail;
  //先采取 CAS 进行入队,如果失败,则用enq()
  if(pred!=null){
    node.prev=pred;
    if(compareAndSetTail(pred, node)){
      pred.next = node;
      return node;
    }
  }
  enq(node);  
  return node;
}

private final boolean compareAndSetTail(Node expect, Node update){
  return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

//使用 CAS 入队
private Node enq(final Node node){
  for(;;){
    Node t = tail;
    if(t == null){
      if(compareAndSetHead(new Node)){
        tail = head;
      }
    }else{
      node.prev = t;
      if(compareAndSetTail(t, node)){
        t.next = node;
        return t;
      }
    }
  }
}

private final boolean compareAndSetHead(Node node){
  return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
final boolean acquireQueue(final Node node, int arg){
     boolean failed = true;
     try{
          boolean interrupted = false;
          //死循环,只有前驱节点是 head 才会尝试获取同步状态
          for(;;){
               final Node p = node.predecessor();
               if(p == head && tryAcquire(arg)){
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return interrupted;
               }
               if(shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()){
                    interrupted = true;
                    }
          }
     }finally{
          if(failed){
               cancelAcquire(node);
          }
     }
}

当一个同步线程退出同步时

public final boolean release(int arg){
     if(tryRelease(arg)){
          Node h = head;
          if( h!=null && h.waitStatus != 0){
               unparkSuccessor(h);
          }
          return true;
     }
     return false;
}

private void unparkSuccessor(Node node){
     int ws = node.waitStatus;
     if(ws < 0){
          compareAndSetWaitStatus(node, ws, 0);
     }
     Node s = node.next;
     if( s == null || s.waitStatus > 0){
          s = null; 
          for(Node t = tail; t != null && t != node; t = t.prev){
               if(t.waitStatus <= 0){
                    s = t;
               }
          }
     }
     if(s!=null){
          //唤醒线程
          LockSupport.unpadk(s.thread);
     }
}

@LockSupport
public static void unpark(Thread thread){
  if(thread!=null){
    UNSAFE.unpark(thread);
  }
}
对比项 Object Monitor Methods Condition
前置条件 获取对象的锁
调用方式 直接调用 如:object.waite() 直接调用 如:condition.await()
等待队列个数 一个 多个
当前线程释放锁并且进入等待状态 支持 支持
当前线程释放锁并进入等待状态,在等待状态中不响应中断 不支持 支持
当前线程释放锁并进入超时等待状态 支持 支持
当前线程释放锁并进入等待状态到将来的某个时间 不支持 支持
唤醒等待队列中的一个线程 支持 支持
唤醒等待队列中的全部线程 支持 支持

@AQS$ConditionObject
//await()是将自己放入 ConditionObject 关联的等待队列,同时释放锁
public final void await() throws InterruptedException{
     if(Thread.interrupted()){
          throw new InterruptedException();
     }
     //将当前节点即线程加入等待队列
     Node node = addConditionWaiter();
     //释放同步锁
     int savedState = fullyRelease(node);
     int interruptMode = 0;
     while(!isOnSyncQueue(node)){
          LockSupport.park(this);
          if((interruptMode = checkInterruptWhileWaiting(node))!=0)}
          break;
     }
     if(acquireQueue(node, savedState) && interruptMode != THROW_IE){
          interruptMode = REINTERRUPT;
     }
     if(node.nextWaiter !=null ){
          unlinkCanceledWaiters();
     }
     if(interruptMode != 0){
          reportInterruptAfterWait(interruptMode);
     }
}

//加入队列的方法没有做 CAS 是因为能调用此方法说明必然已经拿到了锁,因此不存在竞争。这里的同步是靠锁来完成的
private Node addConditionWaiter(){
     Node t = lastWaiter;

     if(t!=null && t.waitStatus != Node.CONDITION){
          unlinkCanceledWaiters();
          t = lastWaiter;
     }
     Node node = new Node(Thread.currentThread(), Node.CONDITION);
     if(t==null){
          firstWaiter = node;
     }else{
          t.nextWaiter = node;
     }
     lastWaiter = node;
     return node;
}

//调用了 release(savedState)释放了锁,于是其他竞争的线程便可以获取锁
final int fullyRelease(Node node){
     boolean failed = true;
     try{
          int savedState = getState( );
          if(release(savedState)){
               failed = false;
               return savedState;
          }else{
               throw new IllegalMonitorStateException();
          }
     }finally{
          if(failed){
               node.waitStatus = Node.CANCELLED;
          }
     }
}

唤醒等候在同一 Condition 的线程

//唤醒同一个 Condition 下等候线程的队首线程
public final void signal(){
     if(!isHeldExclusively()){
          // 当前线程必须依据获得了锁,才可以调用 signal( );
          throw new IllegalMonitorStateException();
     }
     Node first = firstWaiter;
     if(first!=null){
          doSignal(first);
     }
}

private void doSignal(Node first){
     do{
          if((firstWaiter = first.nextWaiter) == null){
               lastWaiter = null;
          }
          first.nextWaiter = null;
     }while(!transferForSignal(first) && (first = firstWaiter) != null)
}

final boolean ransferForSignal(Node node){
     if(!compareAndSetWaitStatus(node, Node.CONDITION, 0)){
          return false;
     }
    //将在Condition 等候队列的这个 Node 移动到同步队列的队尾,然后唤醒他。
    //在 Condition 等候队列的现场在调用await()后就会退出 while(!isOnSyncQueue(node))的循环,然后尝试去获取同步状态
     Node p =enq(node);
     int ws = p.waitStatus;
     if(ws > 0 || !compareAndSetWaiteStatus(p, ws, Node.SIGNAL)){
          LockSupport.unpark(node.thread);
     }
     return true;
}

//在 await( )调用后进入一个 while(!isOnSyncQueue(node))的死循环,当其他线程调用 signal() 并且唤醒的是当前线程的话,会将当前线程从 Condition 的等待队列移到同步队列,于是退出死循环,进入同步状态的竞争。
final boolean acquireQueued(final Node node, int arg){
     boolean failed = true;
     try{
          boolean interrupted = false;
          for(;;){
               final Node p = node.predecessor();
               if(p == head && reyAcquire(arg)){
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return interrupted;
               }
               if(shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()){
                         interrupted = true;
               }
          }
     }finally{
          if(failed){
               cancelAcquire(node);
          }
     }
}

SignalAll( ) 就是对 Condition 等待队列的所有节点使用一遍 Signal( )的含义

public final void signalAll(){
     if(!isHeldExclusively()){
          throw new IllegalMonitorStateException();
     }
     Node first = firstWaiter;
     if(first != null){
          doSignalAll(first);
     }
}

private void doSignalAll(Node first){
     lastWaiter = firstWaiter = null;
     do{
          //循环所有的等候队列,将其加入到同步队列并且开始同步状态的竞争
          Node next = first.nextWaiter;
          first.nextWaiter = null;
          transferSignal(first);
          first = next;
     }while(first != null);
}
同步队列与等待队列
上一篇 下一篇

猜你喜欢

热点阅读