Java 并发编程之 AQS 基础
1. AQS
AQS 全称 AbstractQueuedSynchronizer 抽象同步队列,简称抽象队列同步器
1.1 锁与同步器
锁是面向使用者,它定义了使用者与锁交互的接口,隐藏了实现细节;
同步器是面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态的管理,线程的排队,等待和唤醒等底层操作。
锁和同步器隔离了使用者和实现者所需关注的领域
1.2 分析
AQS是一个FIFO的双向队列,其内部通过节点 head 和 tail 记录队首和队尾元素,队列元素的类型为 Node
2. 源码分析
- 一个状态变量 state
- 两个内部类
- 静态内部类 Node
- 内部类 ConditionObject
/**
* 从注释开始:
*
* Provides a framework for implementing blocking locks and related synchronizers that rely on first-in-first-out (FIFO) wait queues ...
*
* Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class ...
*/
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 队首与队尾
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
// getter / setter
static final class Node {
}
public class ConditionObject implements Condition, java.io.Serializable {
}
}
2.1 成员变量 state
- 在AQS中维持了一个单一的状态信息state,可以通过getState、setState、compareAndSetState函数修改其值
- 对于AQS来说,线程同步的关键是对状态值 state 进行操作
独占式锁:
void acquire(int arg)独占式获取同步状态,如果获取失败则插入同步队列进行等待;
void acquireInterruptibly(int arg) 同 acquire(),但在同步队列中进行等待的时候可以检测中断
boolean release(int arg)
(1)当一个线程调用acquire(int arg)方法获取独占资源时,会首先使用tryAcquire方法尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为Node.EXCLUSIVE的Node节点后插入到AQS阻塞队列的尾部,并调用LockSupport.park(this)方法挂起自己
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
selfInterrupt();
}
非公平锁 NonfairSync 的实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
共享式锁:
void acquireShared(int arg)共享式获取同步状态,与独占式的区别在于同一时刻有多个线程获取同步状态;
void acquireSharedInterruptibly(int arg)
boolean releaseShared(int arg)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
运用:
- 对于 ReentrantLock,state可以用来表示当前线程获取锁的可重入次数;
- 对于读写锁 ReentrantReadWriteLock,state的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数;
- 对于 semaphore,state用来表示当前可用信号的个数;
- 对于 CountDownlatch,state用来表示计数器当前的值
2.2 静态内部类 Node
static final class Node {
// 节点状态
volatile int waitStatus;
volatile Node prev;
volatile Node next;
// 存放进入AQS队列里面的线程
volatile Thread thread;
}
waitStatus 记录当前线程等待状态
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
2.3 内部类 - 条件变量 ConditionObject
- ConditionObject 实现 Condition接口
- ConditionObject 中的容器就是单向链表队列
- void signal()
- void await()
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
3. 实现原理
java.util.concurrent 包是基于AQS 框架的,AQS 框架借助于两个类:
- Unsafe(提供CAS操作)
- LockSupport(提供park/unpark操作)
3.1 Unsafe 类
3.2 LockSupport 工具类
- 主要作用是挂起和唤醒线程
- park 挂起
- unpark 唤醒
- park/unpark的设计原理核心是“许可”:park是等待一个许可,unpark是为某线程提供一个许可
@Test
public void demo() {
LockSupport.park();
// 当前线程获得许可
LockSupport.unpark(Thread.currentThread());
}