马桶Java :5.Java AQS 机制 (Reentrant
马桶🚽Java 上厕所就能看完的小知识! 欢迎关注、点赞 持续更新!
AQS 机制 (ReentrantLock)
概念
如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程。将共享资源设置为锁定状态。
如果被请求的共享资源被占用就需要一套线程阻塞等待以及被唤醒锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的
即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten) 队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系 就是类似于 链表节点之间联系的感觉)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。
图中就可以看出公平锁的概念,如果设置为公平锁,那么他就会形成双线链表。按顺序获取资源。等待时间久的排在前面,越早的获取锁。
工作流程 (获取锁 到进入阻塞队列)
来个图
图从lock.lock();
方法开始讲
调用 sync.acquire(1);
public void lock() {
sync.acquire(1);
}
AbstractQueuedSynchronizer
类中的acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先在ReentrantLock创建时设置公平锁和非公平锁。(默认为非公平锁)
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
然后通过new NonfairSync() 找到了 NonfairSync 其内部继承了Sync 并复写了tryAcquire方法
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
其次找到 nonfairTryAcquire 方法 这里面涉及到 如何 设置
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取状态值
int c = getState();
// 如果当前状态值为0 及没有线程正在获取资源 我们通过cas操作 来 设置状态为1
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 设置排他线程 传入当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入锁
// 如果获取出来的当前排他锁的线程为当前执行的线程
else if (current == getExclusiveOwnerThread()) {
// 就在源有值上增加
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 并设置新状态
setState(nextc);
return true;
}
// 如果状态值不为0 或者过来申请锁的不是当前持有锁的对象 那么返回false
return false;
}
如果能获取资源,设置为1 那么就将其设置为排他线程。
然后其他线程进入 就会返回false。继续看方法的后半段 ,因为这里返回为false ! 所以为true,那么会运行短路与的右边
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先是添加等待队列
private Node addWaiter(Node mode) {
// 首先创建一个Node 并将将传入的node 设置为当前线程 THREAD.set(this, Thread.currentThread());
Node node = new Node(mode);
for (;;) {
// 获取 尾node节点
Node oldTail = tail;
// 如果尾部节点不为空 我们就把当前节点设置为尾部节点 连接上之前的尾部节点
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
// 乐观锁设置 保证 准确
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
//设置成功 返回给外界
return node;
}
} else {
//如果尾节点为空 那么创建一个node 节点设置为头 和 尾
initializeSyncQueue();
}
}
}
然后看一下cquireQueued
final boolean acquireQueued(final Node node, int arg) {
// 设置这个中断默认为false
boolean interrupted = false;
try {
for (;;) {
// 获取当前节点的前一任
final Node p = node.predecessor();
// 如果前一任节点为头节点 并且能够获取到锁
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为新的头节点
setHead(node);
// 将获取到的头节点 的后置节点取消 移除阻塞队列
p.next = null; // help GC
// 返回中断标识 循环结束
return interrupted;
}
// 验证前节点是否处于等待状态
if (shouldParkAfterFailedAcquire(p, node))
// 使当前线程进入waiting状态
// 猜测: 按位或的意思等到waiting状态成功返回true
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
shouldparkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 获取其前节点 是否处于等待状态 如果处于等待状态则放置的位置正确 返回true
return true;
if (ws > 0) {
//如果其前节点放弃锁的竞争 那么就一直去前面找 找到前面没放弃锁竞争的thread 加在他的后面
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//调用前节点的方法设置前节点的状态为SIGNAL
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 设置暂停当前节点 等待其他线程唤醒
LockSupport.park(this);
return Thread.interrupted();
}