浅析Java并发编程(五)CAS&Unsafe&
前言
“没有共产党就没有新中国”这句歌词相信大家都耳熟能详(作者可是很爱国爱党的,虽然作者不是党员),这句歌词表达的意思同样适用于本文,可以说在Java语言中如果没有Unsafe就没有CAS,如果没有CAS也就没有AQS,进而也就不会有java.util.concurrent
包的诞生。本文是作者自己对它们的理解与总结,不对之处,望指出,共勉。
CAS
CAS即Compare And Swap(比较并交换),用于保证多线程并发情况下数据操作的原子性,通常由底层硬件(CPU指令)实现。 CAS 操作涉及到三个操作数,内存中的值 V,旧的预期值 A,需要修改的新值 B。当且仅当预期值 A 和 内存中的值 V 相同时,才将内存值 V 修改为 新值B,否则什么都不做,CAS操作通常会返回一个布尔值以便告诉调用者操作是否成功。由这些特性可见,CAS其实也是乐观锁思想的一种实现。
//伪代码
boolean compareAndSwap(内存中的值 V,旧的预期值 A,新值 B){
if(V != A]) {
return false;//CAS操作失败
}else{
V = B;//将内存值替换为新值
return true;//CAS操作成功
}
}
比如下面这个简单的自增序列,如果不使用synchronized
关键字或Lock
等其他锁机制,在多线程并发下next()
方法会出现原子性问题进而导致自增值不唯一。
public class Sequence {
private int value;
public /*synchronized*/ int next() {
return value++;
}
}
当然我们也可以通过CAS不加锁来保证next()
方法的原子性,代码大概是下面这个样子。
//伪代码
public int next() {
while(true){
int A = 读取当前内存中的值作为旧的期望值;
int B = A + 1;
if(compareAndSwap(当前内存中的值,A,B)){
return B;
}
}
}
在JUC中很多类的实现都依赖于CAS,比如本系列上一篇文章所说到Atomic类,其实内部的所有方法都是基于CAS来实现的。对于自增这类需要保证原子性的操作使用Atomic类是最好的选择,下面是使用AtomicInteger来实现Sequence的例子。
public class AtomicSequence {
private AtomicInteger value = new AtomicInteger();
public int next() {
return value.incrementAndGet();
}
}
查看incrementAndGet()方法源码,发现是调用的sun.misc.Unsafe类中的getAndAddInt(Object var1, long var2, int var4)方法。
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
再跟下去,熟悉的CAS方法出现在了眼前,由于Java 8 并未提供Unsafe类的源码,这些都是反编译的结果,下节会对其作一详细介绍。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
Unsafe
package sun.misc;
* A collection of methods for performing low-level, unsafe operations.
* Although the class and all methods are public, use of this class is
* limited because only trusted code can obtain instances of it.
*
* @author John R. Rose
* @see #getUnsafe
*/
public final class Unsafe {
...
前面说到CAS由底层硬件实现,而Java语言是不能直接访问底层硬件的,但为了解决某些问题,还不得不访问底层硬件,还好Java提供了JNI技术(JNI是Java Native Interface的缩写,它提供了若干的API实现了Java和其他语言的通信)使我们可以通过C、C++这类语言去访问底层硬件。
Unsafe类便通过JNI封装了一些较为底层的方法,比如内存操作、CAS等。但就如它的类名表达的含义一样,警告使用者使用它里面的方法是不安全的、是很危险的,所以官方连文档也没有给开发者提供,不过越是这样越激发了我们的探索欲,下面来探索探索它。
首先看看他都有哪些有意思的方法(部分),由于官方没有提供源码、文档只能去网上找了,最终找到了一个OpenJdk 7版本的源码,有兴趣的可以去看看。
...
//引用类型的CAS操作
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);
//int类型的CAS操作
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
//long类型的CAS操作
public final native boolean compareAndSwapLong(Object o, long offset,long expected,long x);
//申请内存
public native long allocateMemory(long bytes);
//释放内存
public native void freeMemory(long address);
//获得一个成员变量在内存中的偏移值(内存地址)
public native long objectFieldOffset(Field f);
//获得一个静态变量在内存中的偏移值
public native long staticFieldOffset(Field f);
//直接在内存中定义Class对象
public native Class defineClass(String name, byte[] b, int off, int len);
public native Class defineClass(String name, byte[] b, int off, int len, ClassLoader loader,ProtectionDomain protectionDomain);
//进入对象的监视器锁
public native void monitorEnter(Object o);
//退出对象的监视器锁
public native void monitorExit(Object o);
//为一个类的实例分配内存(创建实例)
public native Object allocateInstance(Class cls) throws InstantiationException;
...
果然提供的方法都很强大,下面找几个使用一下试试,在使用前,我们要创建一个Unsafe的实例。我们不能使用Unsafe unsafe = new Unsafe()来获取,因为Unsafe类的构造器是私有的。不过它有一个静态工厂方法getUnsafe(),但你天真的尝试调用Unsafe.getUnsafe()来获得Unsafe的实例的话,你会遇到一个SecurityException的异常。因为这个方法只能在被信任的代码中使用。
public static Unsafe getUnsafe() {
Class cc = sun.reflect.Reflection.getCallerClass(2);
if (cc.getClassLoader() != null)
throw new SecurityException("Unsafe");
return theUnsafe;
}
通过查看源码发现这个方法只能被BootstrapClassLoader加载的类中使用,也就是说只能被rt.jar包里面的类使用,如果你对类加载器感兴趣,可以看看我前面写的文章:浅析JVM(一)Class文件&类加载机制 。
不过这阻挡不了我们,通过查看源码(反编译)发现,Unsafe类里面有一个私有的静态变量theUnsafe
,它包含了Unsafe类的实例,我们可以通过反射来获得这个变量。
private static final Unsafe theUnsafe = new Unsafe();
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe) f.get(null);
或者使用下面这种方式,通过反射调用它的私有构造器。
Constructor<Unsafe> constructor = Unsafe.class.getDeclaredConstructor();
constructor.setAccessible(true);
Unsafe unsafe = constructor.newInstance();
成功搞到Unsafe的实例后先来试试它的compareAndSwapInt
方法,照猫画虎写一个AtomicInteger,看看能否正常工作。
public class CustomAtomicInteger {
private volatile int value;
private Unsafe unsafe;
private long offset;
public CustomAtomicInteger() {
try {
//获得Unsafe的构造器
Constructor<Unsafe> constructor = Unsafe.class.getDeclaredConstructor();
//突破私有访问权限
constructor.setAccessible(true);
//创建示例
this.unsafe = constructor.newInstance();
//获得value变量的内存偏移量即内存地址
offset = unsafe.objectFieldOffset(CustomAtomicInteger.class.getDeclaredField("value"));
} catch (Exception e) {
e.printStackTrace();
}
}
public int incrementAndGet() {
while (true) {
int expected = value;
int next = expected + 1;
if (unsafe.compareAndSwapInt(this, offset, expected, next)) {
return next;
}
}
}
public static void main(String[] args) throws InterruptedException {
CustomAtomicInteger atomicInteger = new CustomAtomicInteger();
int maxThread = 100000;
CountDownLatch latch = new CountDownLatch(maxThread);
for (int i = 0; i < maxThread; i++) {
new Thread(() -> {
System.out.println(atomicInteger.incrementAndGet());
latch.countDown();
}).start();
}
latch.await();//等待所有线程执行完毕
Assert.assertEquals(atomicInteger.incrementAndGet(), maxThread + 1);
}
}
果然没问题,和AtomicInteger一样可以正常工作,性能也一样,这好像是废话,毕竟实现都一样。如果你对Unsafe类中CAS方法JNI底层实现(C++)感兴趣,可以看看JAVA CAS原理深度分析 、 CAS原子操作实现无锁及性能分析这两篇文章,本文就不敖述了。
由于下面的使用测试都会用到Unsafe对象,为了减少重复代码使用Junit的@before
注解预创建该对象。
public class UnsafeTest {
private Unsafe unsafe;
@Before
public void before() throws Exception {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
unsafe = (Unsafe) f.get(null);
}
}
下面来试试使用defineClass在内存中创建一个Class对象,这和ClassLoader里的defineClass方法类似,区别在于它无需依赖ClassLoader便可在内存中创建Class对象,当你需要动态创建类时,这可能很有用。
//将该类编译后待下面测试使用
public class Test {
public void say (){
System.out.println("Hello");
}
}
@Test
public void test3() throws Exception {
//读取Class文件
File file = new File("E:\\classes\\Test.class");
FileInputStream input = new FileInputStream(file);
byte[] data = new byte[(int) file.length()];
input.read(data);
input.close();
//创建Class对象
//Class clazz = unsafe.defineClass("Test", data, 0, data.length);//这个方法在Java 8 没了。
Class clazz = unsafe.defineClass("Test", data, 0, data.length, null, null);
//通过反射创建示例调用方法
Object instance = clazz.newInstance();
Method method = clazz.getMethod("say", null);
method.invoke(instance);//Hello
}
下面试试使用Unsafe的allocateInstance
方法,该方法可以创建一个类的实例,它和反射一样即便构造器是私有的也可以创建实例(破坏单例模式),区别在于使用Unsafe创建实例将会跳过对象的初始化过程和安全检查。
public class TestClass {
private int value;// 没有初始化
//私有化构造器
private TestClass() {
this.value = 1;// 初始化
}
public void printValue() {
System.out.println(this.value);
}
}
@Test
public void test4() throws Exception {
//通过Unsafe创建实例
TestClass instanceByUnsafe = (TestClass) unsafe.allocateInstance(TestClass.class);
instanceByUnsafe.printValue();//0
//通过反射创建实例
//TestClass instanceByReflect = TestClass.class.newInstance();//该方法只能实例化拥有公开无参构造器的类
Constructor<TestClass> constructor = TestClass.class.getDeclaredConstructor();
constructor.setAccessible(true);//访问私有构造器
TestClass instanceByReflect = constructor.newInstance();//创建实例
instanceByReflect.printValue();//1
}
虽然可以通过反射调用私有构造器破解单例模式,但反射不能跳过安全检查,如果将私有构造器内添加以下代码,反射将无法工作,Unsafe不受影响。
...
//私有化构造器
private TestClass() {
//可以抛出个Error防止单例模式被反射破坏
throw new AssertionError("don't support reflect.");
}
...
下面来试试monitorEnter
和monitorExit
方法,如果你看过本系列前面的文章,应该知道这两个方法的作用。这两个方法等同于JVM的monitorenter指令和monitorexit指令,用于获得monitor和释放monitor。还记得前文说到Object.wait()等方法需要与synchornized
关键词搭配使用吗?这是因为Object.wait()等方法依赖于monitor,如果没有获得monitor将会抛出java.lang.IllegalMonitorStateException
。
@Test
public void test6() throws Exception {
Object lock = new Object();
//注掉下面这行代码则抛出java.lang.IllegalMonitorStateException
unsafe.monitorEnter(lock);
lock.wait(1000);
System.out.println("Hello World");
unsafe.monitorExit(lock);
}
既然能通过这两个方法获得和释放monitor,那么是不是可以实现一个类似synchornized
关键字的锁呢?答案是肯定的。
public class UnsafeLock {
private Unsafe unsafe;
public UnsafeLock() {
try {
//获得Unsafe的构造器
Constructor<Unsafe> constructor = Unsafe.class.getDeclaredConstructor();
//突破私有访问权限
constructor.setAccessible(true);
//创建示例
this.unsafe = constructor.newInstance();
} catch (Exception e) {
e.printStackTrace();
}
}
public void lock(Object obj) {
unsafe.monitorEnter(obj);
}
public void unlock(Object obj) {
unsafe.monitorExit(obj);
}
}
测试一下看看能否正常工作,测试代码如下,不错,可以正常工作。
public class UnsafeLockTest {
public static void main(String[] args) {
TestTask test = new TestTask();
new Thread(() -> test.method1()).start();
new Thread(() -> test.method2()).start();
/**
输出:
method1() execute!
method2() execute!
不使用锁保持同步:
method2() execute!
method1() execute!
*/
}
static class TestTask {
private UnsafeLock lock = new UnsafeLock();
public void method1() {
lock.lock(this);
try {
//模拟方法需要执行100毫秒
Thread.sleep(100);
System.out.println("method1() execute!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(this);
}
}
public void method2() {
lock.lock(this);
System.out.println("method2() execute!");
lock.unlock(this);
}
}
}
最后来试试allocateMemory
方法,顾名思义就是分配内存,这不禁让我想起来使用C语言编程的那段时光。如果有面试官让你写出一段代码来触发OOM,你可以使用下面这段奇技淫巧代码(正经的回答也是要有的),顺带讲一下Unsafe类或许会留个好印象。
@Test
public void test5() throws Exception {
unsafe.allocateMemory(Long.MAX_VALUE);//java.lang.OutOfMemoryError
}
其他方法本文就不再敖述了,有兴趣的可以自己去试试看。不过最好不要将Unsafe类使用在你的生产代码中,因为这是不安全的。
AQS
AQS即AbstractQueuedSynchronizer(抽象的队列同步器)的简称,它是J.U.C
中的一个抽象类,它是构建锁或者其他同步组件、工具的基础,Doug Lea期望它能够成为实现大部分同步需求的基础,所以它被设计为一个抽象类。
AQS内封装了实现同步器时涉及的大量细节问题,例如获取同步状态、同步队列。AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。继承AQS来构建同步器可以带来很多好处。它不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。
J.U.C中继承AQS的类
有了前面Unsafe和CAS的基础下面可以浅析一下AQS了,因为AQS里大量的使用着Unsafe与CAS ,如果没有前面铺垫,理解起来还是比较费劲的。
AQS的内部实现AQS的实现原理相对还是比较复杂的,简而言之就是AQS内部维护了一个同步状态(
private volatile int state
)和一个线程等待队列(双向链表实现),同步状态的值默认为 0,当线程尝试获得锁的时候,需要判断同步状态的值是否为 0,如果为 0 则修改状态的值 (通常为 + 1,也可以是别的数,释放锁的时候对应上就行)线程即获得锁,否则进入队列等待(会被封装成一个Node),释放锁的时候将状态的值修改为0 (如果实现的是可重入锁需要递减该值,直到为 0 方真正释放),此时队列内等待的线程可以继续尝试获取该锁。下面是AQS的部分源码,窥一斑可知全豹。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
/**
* The synchronization state.
*/
//同步状态
private volatile int state;
...
//通过CAS设置状态
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
...
//申请获得锁
public final void acquire(int arg) {
//尝试获取,失败的话将当前线程加入等待队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//尝试获取方法AQS并没有实现,需要有子类实现,其实呢,就是判断同步状态的值是否为0,是的话就获取成功
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
...
//添加等待的线程
private Node addWaiter(Node mode) {
//由双向链表实现,首先创建一个链表节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 保存尾结点
Node pred = tail;
if (pred != null) { // 尾结点不为空,即已经被初始化
// 将node结点的prev域连接到尾结点
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 比较pred是否为尾结点,是则将尾结点设置为node
// 设置尾结点的next域为node
pred.next = node;
return node; // 返回新生成的结点
}
}
enq(node); // 尾结点为空(即还没有被初始化过),或者是compareAndSetTail操作失败,则入队列
return node;
}
...
//入队列
private Node enq(final Node node) {
for (;;) { // 无限循环,确保结点能够成功入队列
// 保存尾结点
Node t = tail;
if (t == null) { // 尾结点为空,即还没被初始化
if (compareAndSetHead(new Node())) // 头结点为空,并设置头结点为新生成的结点
tail = head; // 头结点与尾结点都指向同一个新生结点
} else { // 尾结点不为空,即已经被初始化过
// 将node结点的prev域连接到尾结点
node.prev = t;
if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node
// 设置尾结点的next域为node
t.next = node;
return t; // 返回尾结点
}
}
}
}
...
//释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* Setup to support compareAndSet. We need to natively implement
* this here: For the sake of permitting future enhancements, we
* cannot explicitly subclass AtomicInteger, which would be
* efficient and useful otherwise. So, as the lesser of evils, we
* natively implement using hotspot intrinsics API. And while we
* are at it, we do the same for other CASable fields (which could
* otherwise be done with atomic field updaters).
*/
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
...
}
AQS 的设计基于模板方法模式,其内部已经实现的方法基本上分为 3 类: 独占式获取与释放锁、共享式获取与释放锁以及查询等待队列中的等待线程情况。
void acquire(int)//独占式获取锁,如果当前线程成功获取锁,那么方法就返回,否则会将当前线程放入同步队列等待。该方法会调用子类覆写的 tryAcquire(int arg) 方法判断是否可以获得锁
void acquireInterruptibly(int)//和 acquire(int) 相同,但是该方法响应中断,当线程在同步队列中等待时,如果线程被中断,会抛出 InterruptedException 异常并返回。
boolean tryAcquireNanos(int, long)//在 acquireInterruptibly(int) 基础上添加了超时控制,同时支持中断和超时,当在指定时间内没有获得锁时,会返回 false,获取到了返回 true
void acquireShared(int)//共享式获得锁,如果成功获得锁就返回,否则将当前线程放入同步队列等待,与独占式获取锁的不同是,同一时刻可以有多个线程获得共享锁,该方法调用 tryAcquireShared(int)
void acquireSharedInterruptibly(int)//与 acquireShared(int) 相同,该方法响应中断
void tryAcquireSharedNanos(int, long)//在 acquireSharedInterruptibly(int) 基础上添加了超时控制
boolean release(int) //独占式释放锁,该方法会在释放锁后,将同步队列中第一个等待节点唤醒
boolean releaseShared(int)//共享式释放锁
Collection getQueuedThreads()//获得同步队列中等待的线程集合
在使用AQS构造自定义的同步组件时需要继承 AQS 并覆写一些指定的方法,通常需要覆写的方法主要有下面几个。
protected boolean tryAcquire(int)//独占式获取锁,实现该方法需要查询当前状态并判断同步状态是否和预期值相同,然后使用 CAS 操作设置同步状态
protected boolean tryRelease(int)//独占式释放锁,实际也是修改同步变量
protected int tryAcquireShared(int)//共享式获取锁,返回大于等于 0 的值,表示获取锁成功,反之获取失败
protected boolean tryReleaseShared(int)//共享式释放锁
protected boolean isHeldExclusively()//判断调用该方法的线程是否持有互斥锁
下面通过AQS来实现一个简单的互斥锁或者叫独占锁,代码参考的官方文档AQS类上的使用例子。
public class MutexLock implements Lock, java.io.Serializable {
// 继承ASQ实现同步组件
private static class Sync extends AbstractQueuedSynchronizer {
@Override
//通过状态判断锁是否已被其他线程获取
protected boolean isHeldExclusively() {
return getState() == 1;
}
@Override
//尝试获取锁
public boolean tryAcquire(int acquires) {
//尝试获取锁,当 state 为 0 时获得锁,并将state + 1,由CAS保证操作的原子性
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
//尝试释放锁
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
//直接将state 设置为 0,以便等待队列中的线程去获得锁
setState(0);
return true;
}
// Provides a Condition
Condition newCondition() { return new ConditionObject(); }
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
private final Sync sync = new Sync();
//使用AQS提供的模板方法
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
下面是对其进行一个简单的使用测试。
public class MutexLockTest {
public static void main(String[] args) {
TestTask test = new TestTask();
new Thread(() -> test.method1()).start();
new Thread(() -> test.method2()).start();
/**
输出:
method1() execute!
method2() execute!
不加锁的话:
method2() execute!
method1() execute!
*/
}
static class TestTask {
private MutexLock lock = new MutexLock();
public void method1() {
lock.lock();
try {
//模拟方法需要执行100毫秒
Thread.sleep(100);
System.out.println("method1() execute!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void method2() {
lock.lock();
try {
System.out.println("method2() execute!");
} finally {
lock.unlock();
}
}
}
}
后记
这是《浅析Java并发编程》系列的最后一篇文章,写完该系列大概花费了我十天时间,当然了工作时间也包含在内,其实没那么久。相信通过本系列可以使你对Java并发编程有一个整体认识,不过这仅仅是基础,不要忘了本系列第一篇文章说过的一句话:“ 编写正确的并发程序是非常复杂的,即使对于很简单的问题 ”。
写作的过程其实也是对已有知识的巩固与梳理,希望大家有时间的话都能参与到写作之中,分享知识,快乐编码。最后,如果你对这些基础知识、设计模式等感兴趣的话,可以关注我在 GitHub 上创建的java-codes
项目(地址),那里的源码可能会对你有帮助,该项目持续更新。
参考
- Java Magic. Part 4: sun.misc.Unsafe
- 《Java并发编程实战》
- 【死磕Java并发】—–J.U.C之AQS:AQS简介
- 【JUC】JDK1.8源码分析之AbstractQueuedSynchronizer(二)
- Java Platform, Standard Edition 8 API Specification