多线程篇二(CAS深度剖析)
前言
在JDK 5之前Java语言是靠synchronized关键字保证同步的,这会导致有锁。
锁机制存在以下问题:
(1)在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。
(2)一个线程持有锁会导致其它所有需要此锁的线程挂起。
(3)如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。
volatile是不错的机制,但是volatile不能保证原子性。因此对于同步最终还是要回到锁机制上来。
独占锁是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。乐观锁用到的机制就是CAS,Compare and Swap。
正文
一、什么是CAS
CAS是compare and swap的缩写,中文解释成比较并交换。
在Java发展初期,java语言是不能够利用硬件提供的这些便利来提升系统的性能的。而随着java不断的发展,Java本地方法(JNI)的出现,使得java程序越过JVM直接调用本地方法提供了一种便捷的方式,因而java在并发的手段上也多了起来。而在Doug Lea提供的cucurenct包中,CAS理论是它实现整个java包的基石。
CAS操作包含三个操作数:内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置的新值设置为新值。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。(在 CAS 的一些特殊情况下将仅返回 CAS 是否成功,而不提取当前值)CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”
通常将 CAS 用于同步的方式是从地址 V 读取值 A,执行多步计算来获得新 值 B,然后使用 CAS 将 V 的值从 A 改为 B。如果 V 处的值尚未同时更改,则 CAS 操作成功。
类似于 CAS 的指令允许算法执行读-修改-写操作,而无需害怕其他线程同时修改变量,因为如果其他线程修改变量,那么 CAS 会检测它(并失败),算法 可以对该操作重新计算。
二、CAS底层原理
上面说的读-修改-写操作类似于count++,那么是如何将这三步变成原子操作的呢?
这归功于硬件指令集的发展,实际上,我们可以使用同步将这两个操作变成原子的,但是这么做就没有意义了。所以我们只能靠硬件来完成,硬件保证一个从语义上看起来需要多次操作的行为只通过一条处理器指令就能完成。这类指令常用的有:
1)、测试并设置(Tetst-and-Set)
2)、获取并增加(Fetch-and-Increment)
3)、交换(Swap)
4)、比较并交换(Compare-and-Swap)
5)、加载链接/条件存储(Load-Linked/Store-Conditional)
其中,前面的3条是20世纪时,大部分处理器已经有了,后面的2条是现代处理器新增的。而且这两条指令的目的和功能是类似的,在IA64,x86 指令集中有 cmpxchg 指令完成 CAS 功能,在 sparc-TSO 也有 casa 指令实现,而在 ARM 和 PowerPC 架构下,则需要使用一对 ldrex/strex 指令来完成 LL/SC 的功能。
CPU 实现原子指令有2种方式:
(1)、通过总线锁定来保证原子性。
总线锁定其实就是处理器使用了总线锁,所谓总线锁就是使用处理器提供的一个 LOCK# 信号,当一个处理器咋总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占共享内存。但是该方法成本太大。因此有了下面的方式。
(2)、通过缓存锁定来保证原子性。
所谓 缓存锁定 是指内存区域如果被缓存在处理器的缓存行中,并且在Lock 操作期间被锁定,那么当他执行锁操作写回到内存时,处理器不在总线上声言 LOCK# 信号,而时修改内部的内存地址,并允许他的缓存一致性机制来保证操作的原子性,因为缓存一致性机制会阻止同时修改两个以上处理器缓存的内存区域数据(这里和 volatile 的可见性原理相同),当其他处理器回写已被锁定的缓存行的数据时,会使缓存行无效。
注意:有两种情况下处理器不会使用缓存锁定。
(1)、当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行时,则处理器会调用总线锁定。
(2)、有些处理器不支持缓存锁定,对于 Intel 486 和 Pentium 处理器,就是锁定的内存区域在处理器的缓存行也会调用总线锁定。
三、CAS的目的
利用CPU的CAS指令,同时借助JNI来完成Java的非阻塞算法。其它原子操作都是利用类似的特性完成的。而整个J.U.C都是建立在CAS之上的,因此对于synchronized阻塞算法,J.U.C在性能上有了很大的提升。
四、CAS存在的问题
CAS虽然很高效的解决原子操作,但是CAS仍然存在三大问题。ABA问题,循环时间长开销大和只能保证一个共享变量的原子操作
1、ABA问题:因为CAS需要在操作值的时候检查下值有没有变化,如果没有变化则更新,现在假如一个值原来是A,之后变成B,然后又变回A,那么使用CAS检查的时候会发现值没有变化,实则呢这个值是发生过变化的。
那么如何解决ABA问题呢?大家这么聪明肯定第一时间就会想到在变量前面追加一个版本号来解决,这样每次变量更新的时候把版本号加1,那么A-B-A就会变成1A-2B-3A。
2、循环时间长开销大:自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。
3、只能保证一个共享变量的原子操作:当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。
五、自旋锁与互斥锁的选择
1、如果是多核处理器,如果预计线程等待锁的时间很短,短到比线程两次上下文切换时间要少的情况下,使用自旋锁是划算的。
2、如果是多核处理器,如果预计线程等待锁的时间较长,至少比两次线程上下文切换的时间要长,建议使用互斥量。
3、如果是单核处理器,一般建议不要使用自旋锁。因为,在同一时间只有一个线程是处在运行状态,那如果运行线程发现无法获取锁,只能等待解锁,但因为自身不挂起,所以那个获取到锁的线程没有办法进入运行状态,只能等到运行线程把操作系统分给它的时间片用完,才能有机会被调度。这种情况下使用自旋锁的代价很高。
4、如果加锁的代码经常被调用,但竞争情况很少发生时,应该优先考虑使用自旋锁,自旋锁的开销比较小,互斥量的开销较大。
说了这么多,我们来看看并发大师Doug Lee在使用CAS的吧。
六、原子操作类的使用
在java.util.concurrent.atomic包下定义了很多的原子操作类
原子操作类.png
1、原子更新基本类型类
AtomicBoolean、AtomicInteger、AtomicLong
(1)、AtomicInteger API的使用
/**
* 原子操作测试类
*
* @Author YUBIN
* @create 2018-01-14
*/
public class AtomicIntegerTest {
private static AtomicInteger atomicInteger = new AtomicInteger();
public static void main(String[] args) {
// 原子的将原值加1,并且返回原值 0 此时新值为1
System.out.println(atomicInteger.getAndIncrement());
// 原子的将原值加1,并且返回新值 2
System.out.println(atomicInteger.incrementAndGet());
// 原子的将原值增加指定的值,并且返回相加后的值 12
System.out.println(atomicInteger.addAndGet(10));
// 原子的将原值增加指定的值,并且返回相加前的值 12 此时新值为22
System.out.println(atomicInteger.getAndAdd(10));
// 如果当前值V == 为预期值A,则将该值原子设置为给定的更新值B。 由于当前值是22 预期值是11 所以什么都不错 返回false
System.out.println(atomicInteger.compareAndSet(11, 10));
// 原子的将原值减1,并且返回新值 21
System.out.println(atomicInteger.decrementAndGet());
// 原子设置为给定值并返回旧值。
System.out.println(atomicInteger.getAndSet(2));
// 获取当前值
System.out.println(atomicInteger.get());
}
}
当然在AtomicInteger初始化的时候就可以给定一个初始值micInteger(int initialValue),如果没有指定则默认为0(对应成员变量value)
private volatile int value;
观察getAndIncrement()方法的源码
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
调用了Unsafe类中的getAndAddInt()方法
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
上面的do/while语句就是一个自旋CAS
2、原子更新数组类
AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
(1)、AtomicIntegerArray API的演示
/**
* @Author YUBIN
* @create 2018-01-14
*/
public class AtomicIntegerArrayTest {
private static int[] intArray = {1, 2, 3};
// 定义一个AtomicIntegerArray类型的原子类 其中它会将原先的intArray数组克隆岛AtomicIntegerArray类中的int[] array数组中去
private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(intArray);
public static void main(String[] args) {
// 将位置 i的元素原子设置为给定值并返回旧值。
System.out.println(atomicIntegerArray.getAndSet(0, 3));
// 调用atomicIntegerArray.get(0)此时返回的是3
System.out.println(atomicIntegerArray.get(0));
// 返回结果是1
System.out.println(intArray[0]);
System.out.println(atomicIntegerArray.compareAndSet(0, 3, 5));
System.out.println(atomicIntegerArray.get(0));
}
}
上面具体的API操作的原理和AtomicInteger类似,这里就不再进行描述了,但是值得注意的是在new AtomicIntegerArray(intArray);这一步操作的时候观察源码你会发现他是将intArray复制到AtomicIntegerArray类中的int[] array数组中去的,因此你在对AtomicIntegerArray对象进行操作的时候是不会影响到intArray的
public AtomicIntegerArray(int[] array) {
// Visibility guaranteed by final field guarantees
this.array = array.clone();
}
3、原子更新引用类型类
AtomicReference、AtomicMarkableReference、AtomicStampedReference
(1)、AtomicReference API的演示
/**
* @Author YUBIN
* @create 2018-01-14
*/
public class AtomicReferenceTest {
private static AtomicReference<User> reference = new AtomicReference<User>();
public static void main(String[] args) {
User user = new User("yubin", 18);
User update = new User("yu", 19);
// 设置为给定值
reference.set(user);
User newUser = new User("bin", 20);
user = newUser;
// 如果当前值 ==为预期值,则将值设置为给定的更新值。 成功返回true,反之返回false 注意,他比较的是引用地址,而不是里面具体的值
System.out.println(reference.compareAndSet(user, update));
System.out.println(reference.get().getName());
System.out.println(reference.get().getAge());
}
private static class User {
private String name;
private int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
}
}
(2)、AtomicStampedReference API演示
/**
* 带版本号的原子操作引用类型类的演示
*
* @Author YUBIN
* @create 2018-06-21
*/
public class AtomicStampedReferenceTest {
/**
* AtomicStampedReference(V initialRef, int initialStamp)
* initialRef:初始值
* initialStamp:初始版本号
*/
private static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(0, 0);
public static void main(String[] args) throws InterruptedException {
final Integer reference = atomicStampedReference.getReference();// 返回引用的当前值,这边是获取到的初始值
final int stamp = atomicStampedReference.getStamp();// 获取当前的版本号,这边获取的是初始版本号
// 打印初始版本号和初始值
System.out.println("reference:" + reference + "---------------" + "stamp:" + stamp);
Thread threadA = new Thread("ThreadA"){
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + reference + "-" + stamp + "-" +
atomicStampedReference.compareAndSet(reference, reference + 10, stamp, stamp + 1));
}
};
Thread threadB = new Thread("ThreadB"){
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + reference + "-" + stamp + "-" +
atomicStampedReference.compareAndSet(reference, reference + 10, stamp, stamp + 1));
}
};
threadA.start();
threadA.join(); // 等待线程A执行完成之后再执行下面的操作
threadB.start();
threadB.join();
System.out.println(atomicStampedReference.getReference());
System.out.println(atomicStampedReference.getStamp());
}
}
注意:AtomicMarkableReference和AtomicStampedReference都是为了解决ABA问题的,其中AtomicMarkableReference的标志采用的是boolean类型,而AtomicStampedReference标志采用的是int类型
4、原子更新字段类
AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
(1)、AtomicReferenceFieldUpdater API演示
/**
* 原子更新字段演示
*
* @Author YUBIN
* @create 2018-06-21
*/
public class AtomicReferenceFieldUpdaterTest {
public static void main(String[] args) {
AtomicReferenceFieldUpdater nameUpdater = AtomicReferenceFieldUpdater.newUpdater(User.class, String.class, "name");
AtomicReferenceFieldUpdater ageUpdater = AtomicReferenceFieldUpdater.newUpdater(User.class, Integer.class, "age");
User user = new User("yubin", 18);
System.out.println("原先的名字:"+user.name);
System.out.println("原先的年龄:"+user.age);
nameUpdater.compareAndSet(user, user.name, "yu"); // 处理名字
System.out.println("更新后的名字:" + user.name);
ageUpdater.compareAndSet(user, user.age, user.age + 1);
System.out.println("更新后的年龄:" + user.age);
}
static class User{
public volatile String name;
public volatile Integer age;
public User(String name, Integer age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public Integer getAge() {
return age;
}
}
}
值得注意的是在使用AtomicReferenceFieldUpdater的时候User类中的属性需要定义成public volatile否则会抛出如下异常
Exception in thread "main" java.lang.RuntimeException: java.lang.IllegalAccessException: Class com.yubin.thread.atomic.AtomicReferenceFieldUpdaterTest can not access a member of class com.yubin.thread.atomic.AtomicReferenceFieldUpdaterTest$User with modifiers "private volatile"
at java.util.concurrent.atomic.AtomicReferenceFieldUpdater$AtomicReferenceFieldUpdaterImpl.<init>(AtomicReferenceFieldUpdater.java:332)
at java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater(AtomicReferenceFieldUpdater.java:109)
at com.yubin.thread.atomic.AtomicReferenceFieldUpdaterTest.main(AtomicReferenceFieldUpdaterTest.java:14)
Caused by: java.lang.IllegalAccessException: Class com.yubin.thread.atomic.AtomicReferenceFieldUpdaterTest can not access a member of class com.yubin.thread.atomic.AtomicReferenceFieldUpdaterTest$User with modifiers "private volatile"
at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
at sun.reflect.misc.ReflectUtil.ensureMemberAccess(ReflectUtil.java:103)
at java.util.concurrent.atomic.AtomicReferenceFieldUpdater$AtomicReferenceFieldUpdaterImpl.<init>(AtomicReferenceFieldUpdater.java:320)
... 2 more
Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type
at java.util.concurrent.atomic.AtomicReferenceFieldUpdater$AtomicReferenceFieldUpdaterImpl.<init>(AtomicReferenceFieldUpdater.java:341)
at java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater(AtomicReferenceFieldUpdater.java:109)
at com.yubin.thread.atomic.AtomicReferenceFieldUpdaterTest.main(AtomicReferenceFieldUpdaterTest.java:14)
总结
Java的CAS会使用现代处理器上提供的高效机器级别原子指令,这些原子指令以原子方式对内存执行读-改-写操作,这是在多处理器中实现同步的关键(从本质上来说,能够支持原子性读-改-写指令的计算机器,是顺序计算图灵机的异步等价机器,因此任何现代的多处理器都会去支持某种能对内存执行原子性读-改-写操作的原子指令)。同时,volatile变量的读/写和CAS可以实现线程之间的通信。把这些特性整合在一起,就形成了整个concurrent包得以实现的基石。如果我们仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式
CAS.png