Java并发编程 AQS
1.为什么需要AQS
如果没有AQS,就需要每个协作工具自己实现:
- 同步状态的原子性管理
- 线程的阻塞与解除阻塞
- 队列的管理
在并发场景下,自己正确且高效实现这些内容,是相当有难度的,所以我们用AQS来帮我们把这些事情搞定,我们只关注业务逻辑就够了。
2.AQS的作用
AQS是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了AQS以后,更多的协作工具类都可以很方便的被写出来。
有了AQS,构建线程协作类就容易多了。
比如:
Semaphore和AQS的关系
Semaphore内部有一个Sync类,Sync类继承了AQS
3.AQS的重要性、地位
AbstractQueuedSynchronizer是Doug Lea写的,从JDK1.5加入的一个基于FIFO等待队列实现的一个用于实现同步器的基础框架,我们用IDE看AQS的实现类,可以发现实现类有以下这些:
image.png
4.AQS内部原理解析
AQS最核心的就是三大部分:
- state(在不同实现子类中,有不同的含义)
- 控制线程抢锁和配合的FIFO队列
- 期望协作工具类去实现的获取/释放等重要方法
4.1state状态
- 这里的state的具体含义,会根据具体实现类的不同而不同,比如在Semaphore里,它表示“剩余的许可证的数量”,而在CountDownLatch里,他表示“还需要倒数的数量”
- state是volatile修饰的,会被并发地修改,所以所有修改state的方法都需要保证线程安全,比如getState、setState以及compareAndSetState操作来读取和更新这个状态。这些方法依赖于j.u.c.atomic包的支持
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
4.1 state状态
- 在ReentrantLock中
- state用来表示“锁”的占用情况,包括可重入计数
- 当state的值为0的时候,标识改Lock不被任何线程所占用
4.2 控制线程抢锁和配合的FIFO队列
- 这个队列用来存放“等待的线程”,AQS就是“排队管理器”,当多个线程争用同一把锁时,必须由排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程占有这个刚刚释放的锁。
- AQS会维护一个等待的线程队列,把线程都放到这个队列里
- 这是一个双向形式的队列
4.3 期望协作工具类实现的获取/释放等重要方法
这里的获取和释放方法,是利用AQS的协作工具类里最重要的方法,是由协作类自己去实现的,并且含义各不相同
获取方法
- 获取操作会依赖state变量,经常会阻塞(比如获取不到锁的时候)
- 在Semaphore中,获取就是acquire方法,作用是获取一个许可证
- 在CountDownLatch里面,获取就是await方法,作用是“等待,直到倒数结束”
释放方法
- 释放操作不会阻塞
- 在Semaphore中,释放就是release方法·,作用是释放一个许可证
- CountDownLatch里面,获取就是countDown方法,作用是“倒数1个数”
还需要重写tryAcquire方法和tryRelease等方法
5.AQS应用实例、源码解析
AQS在ReentrantLock的应用
第一步:写一个来,想好协作的逻辑,实现获取/释放方法
第二步:内部写一个Sync类继承AbstractQueuedSynchronizer
第三步:根据是否独占重写tryAcquire/tryRelease或tryAcquireShared() 和tryReleaseShared()等方法,在之前写的获取/释放方法中调用AQS的acquire/release或者Shared方法
AQS在CountDownLatch的应用
* 构造函数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
调用AbstractQueuedSynchronizer
protected final void setState(int newState) {
state = newState;
}
* getCount
public long getCount() {
return sync.getCount();
}
调用AbstractQueuedSynchronizer
protected final int getState() {
return state;
}
* countDown
public void countDown() {
sync.releaseShared(1);
}
CountDownLatch内部Sync类实现tryReleaseShared
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
调用AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
把所有等待的线程去唤醒
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
* await
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
CountDownLatch内部Sync类实现tryAcquireShared
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
调用AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
doAcquireSharedInterruptibly把当前线程放在阻塞队列中进行等待
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
parkAndCheckInterrupt方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
把当前线程挂起 进入阻塞状态
AQS在CountDownLatch的总结
调用CountDownLatch的await方法时,便会尝试获取“共享锁”,不过一开始是获取不到该锁的,于是线程被阻塞。
“共享锁”可获取到的条件,就是“锁计数器”的值为0
“锁计数器”的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1
AQS在Semaphore的应用
* acquire
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
调用AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
Semaphore中内部类
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
nonfairTryAcquireShared 检查剩余许可证数量够不够
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
* release
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
调用AbstractQueuedSynchronizer 中release方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
CountDownLatch内部Sync类实现tryReleaseShared
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
cas设置state值
AQS在Semaphore的总结
- 在Semaphore中,state表示许可证的剩余数量
- 看tryAcquire方法,判断nonfairTryAcquireShared大于等于0的话,代表成功
- 这里会先检查剩余许可证数量够不够这次需要的,用减法来计算,如果直接不够,那就返回负数,表示失败,如果够了,就用自旋加compareAndSetState来改变state状态,直到改变成功就返回正数;或者是期间如果被其他人修改了导致剩余数量不够了,那也返回负数代表获取失败。
AQS在ReentrantLock的应用
- 分析释放锁的方法tryRelease
ReentrantLock
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
调用AbstractQueuedSynchronizer 中release方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock 中内部类Sync类
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor方法表示唤醒其他线程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
getState 代表重入的次数
free = true; 表示锁是自由状态
- 分析释放锁的方法tryAcquire
public void lock() {
sync.lock();
}
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
setExclusiveOwnerThread 表示设置互斥独有线程
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
添加等待节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
acquireQueued 获取的线程处于互斥的不可中断模式队列。用于条件等待方法以及获取。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
ReentrantLock释放锁分析
由于是可重入的,所以state代表重入的次数,每次释放锁,先判断是不是当前持有的锁的线程释放的,如果不是就抛异常,如果是的话,重入次数就减1,如果见到了0,就说明完全释放了,于是free就是true,并且把state设置为0.
ReentrantLock加锁分析
第一步:调用子类实现的获取锁的方法 tryAcquire 成功则结束if
Reentrantlock中两个子类的尝试修改锁状态方法
非公平 不判断当前是否还有其他节点在队列中 hasQueuedPredecessors
第二步:失败 进去 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
addWaiter 将当前线程节点入队 先cas快速入队 失败进入循环cas入队
第三步 acquireQueued 尝试将线程阻塞 需要循环向前查找非取消的节点 然后才能进行阻塞 即 前一个节点非cancel 然后设置成SIGNAL 状态 这样前节点的线程如果释放了同步状态或者被取消时候会通知本线程
调用lock方法,存在竞争的时候,T2会去入队,首先会初始化一个空节点,t2节点实际上存放的是第二个位置,t3进来的时候继续在后面排队,
t2和t3都是调用park方法进行阻塞。入队的时候会将前面的节点的waitstatus状态由0改为-1。在调用unlock的时候会将waitstatus不等于0的释放。
6.AQS实现一个自己的Latch门闩
public class OneShotLatch {
private final Sync sync = new Sync();
public void signal() {
sync.releaseShared(0);
}
public void await() {
sync.acquireShared(0);
}
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected int tryAcquireShared(int arg) {
return (getState() == 1) ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
setState(1);
return true;
}
}
public static void main(String[] args) throws InterruptedException {
OneShotLatch oneShotLatch = new OneShotLatch();
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
oneShotLatch.await();
System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
}
}).start();
}
Thread.sleep(5000);
oneShotLatch.signal();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
oneShotLatch.await();
System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
}
}).start();
}
}
Thread-2尝试获取latch,获取失败那就等待
Thread-3尝试获取latch,获取失败那就等待
Thread-0尝试获取latch,获取失败那就等待
Thread-1尝试获取latch,获取失败那就等待
Thread-4尝试获取latch,获取失败那就等待
开闸放行Thread-2继续运行
开闸放行Thread-0继续运行
开闸放行Thread-3继续运行
开闸放行Thread-4继续运行
开闸放行Thread-1继续运行
Thread-5尝试获取latch,获取失败那就等待
开闸放行Thread-5继续运行