java并发编程—— concurrent包底层框架
1. 概述
java.util.concurrent包中大部分的同步器(例如锁,屏障等等)都是基于AbstractQueuedSynchronizer类(下称AQS类)构建的。
AQS负责管理同步器类中的状态,通常是一个整数状态信息State,并且通过getState、setState和compareAndSetState等方法进行操作管理。这个整数在不同的同步器中表示不同意义,RetrantLock用它来表示所有者线程重复获取该锁的次数,Semaphore用它表示剩余许可数量。
整个类维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)可以用下图表示
AQS定义两种资源共享方式:Exclusive(在同一时间只有一个线程可以通过阻塞点,如ReentrantLock)和Share(共享,可以同时有多个线程在执行,如Semaphore/CountDownLatch)。
2. 同步器基础功能
同步器一般包含两种方法,一种是acquire,另一种是release。
- acquire操作阻塞调用的线程,直到或除非同步状态允许其继续执行。
- release操作则是通过某种方式改变同步状态,唤醒等待线程继续执行。
在不同的类中,acquire和release操作的名字和形式会各有不同。例如:Lock.lock,Semaphore.acquire,CountDownLatch.await和FutureTask.get,在这个框架里,这些方法都是acquire操作。
除此之外,通常支持以下常用选项:
- 可选的超时设置,让调用者可以放弃等待
- 通过中断实现的任务取消
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。下面以独占方式的获取和释放状态进行详细描述。
3. AQS构建基础功能
3.1 AQS获取操作流程
以ReentrantLock中代码为例分析
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
查看上述函数源码可以看出,通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。那么该线程进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。
acquireQueued作为关键方法,删除处理线程在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// “自旋”过程
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 如果前驱是head,即该结点已成老二
那么便有资格去尝试获取资源
(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
// 拿到资源后,将head指向该结点,所以head
所指的标杆结点,就是当前获取到资源的那个结点或null。
setHead(node);
// 拿完资源的结点出队了,方便GC回收置为null
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果自己可以休息了,就进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
其中,shouldParkAfterFailedAcquire方法主要用于检查状态,看看自己是否真的可以去休息了进入waiting状态。而parkAndCheckInterrupt方法让线程去休息,真正进入等待状态,通过LockSupport.park(this)实现。
整体来看acquireQueued主要包含以下三个步骤:
- 结点进入队尾后,检查状态,找到安全休息点;
- 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
- 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。
3.2 AQS释放操作流程
这里同样以ReentrantLock中释放代码为例进行说明
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到头结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待队列里的下一个线程
return true;
}
return false;
}
查看代码release方法,它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
4. AQS框架分析
查阅Doug Lea 的The j.u.c Synchronizer Framework文章可以看出,AQS框架底层实现主要由以下三个组件组成:
- 同步状态的原子性管理;
- 线程的阻塞与解除阻塞;
- 队列的管理;
同步状态
AQS类使用单个int(32位)来保存同步状态,并暴露出getState、setState以及compareAndSet操作来读取和更新这个状态。
线程阻塞
这里选择使用LockSupport.park阻塞当前线程除非/直到有个LockSupport.unpark方法被调用(unpark方法被提前调用也是可以的)。
队列
这里选择了CLH锁,其结构中用前一节点某一属性表示当前节点的状态,可以更容易地去实现“取消(cancellation)”和“超时”功能。同时,CLH锁的优点在于其入队和出队操作是快速、无锁的,以及无障碍的(即使在竞争下,某个线程总会赢得一次插入机会而能继续执行);且探测是否有线程正在等待也很快(只要测试一下head是否与tail相等)。
原子操作入队新增一个节点
do {
pred = tail;
} while(!tail.compareAndSet(pred, node));
每一个节点的“释放”状态都保存在其前驱节点中。因此,自旋锁的“自旋”操作就如下:
while (pred.status != RELEASED); // spin
// 自旋后的出队操作只需将head字段指向刚刚得到锁的节点:
head = node;
5. CAS操作
5.1 CAS概述
引用网上一幅图表示整个concurrent包实现依赖关系图:
concurrent实现图可以看出作为整个底层基础volatile变量的读/写和CAS是整个包实现的基石。这里重点介绍一下CAS相关概念:
CAS的全称为Compare-And-Swap,是一条CPU的原子指令,CAS是靠硬件实现的,JVM只是封装了汇编调用。
CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。可以看出这个操作是多步组成,但是现代处理器利用硬件特性保证了这条指令在多处理器环境中的原子性。
5.2 乐观锁
乐观锁( Optimistic Locking)其实是一种思想。相对悲观锁而言,乐观锁假设认为数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则让返回用户错误的信息,让用户决定如何去做。
CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。
以AtomicInteger代码为例进行分析
public class AtomicInteger {
private volatile int value;
public final int getAndIncrement() {
// 不断尝试直到成功
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return current;
}
}
在没有锁的机制下需要字段value要借助volatile原语,保证线程间的数据是可见的。getAndIncrement采用了CAS操作,每次从内存中读取数据然后将此数据和+1后的结果进行CAS操作,如果成功就返回结果,否则重试直到成功为止。
参考
http://www.cnblogs.com/zengdan-develpoer/p/3389006.html
https://my.oschina.net/u/140462/blog/490182
http://www.cnblogs.com/waterystone/p/4920797.html
http://ifeve.com/aqs-1/
http://ifeve.com/aqs-2/
http://blog.sina.com.cn/s/blog_ee34aa660102wsuv.html
http://www.importnew.com/20472.html