掌握Android和Java线程原理下
作者:子者不语
转载:https://juejin.im/post/6884979459196190728
线程并发优化
知道了如何解决线程安全问题,接下来就要考虑性能问题了,毕竟多线程的使用,就是为了提高性能,如果使用多线程不能将性能发挥出来,就是很大的浪费了。在并发的优化上,无锁的性能肯定是最好的,但是很多时候我们又不得不加锁,在加锁的方案中,有忙阻塞等待如自旋锁,以及休眠等待,这两种加锁的方式,并不存在哪一种性能更好,需要根据并发的数据进行选择。下面就详细说一下这三种方案如何进行优化。
无锁
我们已经知道协程和本地线程存储可以实现无锁的多线程,还有其他优化方案吗?有,锁消除和偏向锁。
锁消除
我们可以根据经验和业务分析判断是否会有产生临界区脏数据的可能,如果没有这个可能,则可以消除锁。业务中的锁我们可以自己消除,但是虚拟机或者Java内部库中的锁我们就没法消除了,什么是Java内部的锁呢?比如我们常用的StringBuffer.append()函数中就有同步代码块。内部库的锁可以通过编译时消除,比如JVM的逃逸分析技术,在编译代码的过程中,会判断对象是否逃逸,如果没有逃逸,逃逸指的时对象可能会被其他线程使用到,就说明没有线程安全的可能性,于是会消除锁。
偏向锁
偏向锁并不是不加锁,而是只加一次锁,只要一个线程获得了偏向锁,即使当这个线程退出临界区后,这个锁依然会“偏向”这个线程,当这个线程再次要进入临界区是,就可以直接进入临界区,不需要重新加锁的过程。
这里介绍一下JVM的偏向锁的实现,但我们需要先了解一下Java类的对象头的知识点。当一个Java类会被解析成class对象,并加载在内存中后,这个对象由三部分组成:对象头、实例数据和对齐填充,对象头又由markWord和Klass指针组成。
- Mark Word:对象的 Mark Word 部分占 4 个字节(32位系统),包含一些的标记位,比如轻量级锁、偏向锁标记等等
- Klass Pointer:Class 对象指针占是 4个字节(32位系统),指向的位置是对象对应的 Class 对象的内存地址
- 实例数据:这里面包括了对象的所有成员变量
- 对齐填充:最后一部分是对齐填充的字节,按 8 个字节填充
可以看到markwork实际上大部分数据都是用来记录锁相关的信息的,比如锁状态,偏向锁等等。
了解了markwork,我们接着了解什么是偏向锁,偏向锁中由一个ThreadId字段,这个字段如果是空的,某个线程第一次获取锁的时候,就将自身的ThreadId写入到锁的ThreadId字段内,将markwork内的是否偏是向锁的状态位置1,这样下次获取锁的时候,直接检查ThreadId是否和自身线程Id一致,如果一致,则认为当前线程已经获取了锁,因此不需再次获取锁。
忙等待阻塞锁
自旋锁
忙等待锁就是自旋锁,它通过循环不断的获取锁,如果在自旋的途中,获取锁成功,则进入临界区,不成功就一直自旋。关于自旋锁的实现,前面CAS和TAS都有实现过。自旋锁的优点是不需要让线程陷入休眠,避免了线程切换事件,但是如果自旋过久,会浪费CPU的资源,这个时候让线程陷入休眠是一种更好的选择。所以自旋次数的选择就比较重要了,在JDK1.6之前,Synchronized再获取锁时,会先自旋10次,如果不成功就会升级成重量级锁,让线程陷入休眠。
自适应自旋锁
JDK1.6中引入了自适应的自旋锁。自适应意味着自旋的时间不再固定了,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也很有可能再次成功,进而它将允许自旋等待持续相对更长的时间,比如100个循环。另外,如果对于某个锁,自旋很少成功获得过,那在以后要获取这个锁时将可能省略掉自旋过程,以避免浪费处理器资源。有了自适应自旋,随着程序运行和性能监控信息的不断完善,虚拟机对程序锁的状况预测就会越来越准确,虚拟机就会变得越来越“聪明”了。
休眠阻塞锁
休眠阻塞锁,顾名思义,就是在线程获取锁后,让线程陷入休眠,它一般被称为重量级锁。这种锁需要管程的介入,线程的休眠和唤醒也会比较耗费性能,既然都已经是重量级锁了,还有优化的空间吗?还是有的。
细化锁的粒度
我们可以通过减少同步的代码块数量来优化锁的性能,比如将Synchronize锁住整个方法改成只锁住方法内可能会产生线程安全的代码。
粗化锁的粒度
粗化锁的粒度在某些场景也能优化锁的性能,比如某个方法内有好几个锁,我们可以将这些锁都可以一个锁,来减少加锁和释放锁的损耗。JVM虚拟机也会通过粗化锁的粒度来优化锁性能的,比如StringBuffer.append()方法内部是由同步代码快的,如果我们多次连续调用append方法,JVM会将这些append方法内部的锁消除,并在连续append方法间加一把锁。
增加锁的数量
如果大量的并发线程都用同一把锁,那么所有的线程始终同时只有一个线程能访问临界区,其他的线程都在等待,这样也会造成性能的浪费。我们可以通过增加锁的数量,将临界区不同的区域分别加锁,这样就可以让更多的线程对临界区进行访问。
锁优化案例
Synchronized
在JDK1.6版本上,HotSpot虚拟机开发团队花费了很大的的精力去实现和优化各种锁优化技术。Synchronized就是优化的重点之一,Synchronized会先使用偏向锁加锁,如果访问临界区的线程超过了一个,就会升级成轻量级锁,轻量级锁通过互斥来实现加锁的过程,只要多个线程没有产生竞争条件,就可以通过互斥进行加锁,当有多个线程同时竞争锁时,互斥就没用了,所以Synchronized会从轻量级锁升级成重量级锁,Synchronized的重量级锁在获取锁的过程中,也会先通过自旋的方式获取锁,如果自旋失败,最后才采用管程,将线程陷入休眠。
可以看到,Synchronized是从偏向锁,到轻量级锁,然后到自旋锁,最后休眠阻塞锁这样一个不断升级的过程。
ConcurrentHashMap
ConcurrentHashMap在jdk1.7之前采用分断锁来对锁进行了优化,分断锁是通过增加锁的数量来达到优化的目的。我们看一下它的实现。
1.7版本的ConcurrentHashMap主要由两部分组成:Segments和HashEntry。Segments存方HashEntry,HashEntry存放我们的Key,Value。他们的关系实现如下:
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
final int segmentMask; //segments的掩码值
final int segmentShift; //segments的偏移量
final Segment<K,V>[] segments;
……
}
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
transient volatile int count;
transient int modCount;
transient int threshold;
transient volatile HashEntry<K,V>[] table;
final float loadFactor; //扩容负载因子
……
}
我们接着看ConcurrentHashMap的put和get方法是如何保证线程安全的。
put方法的实现
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
//获取key的hash值
int hash = hash(key);
//hash值右移segmentShift位与段掩码进行位运算,定位segment
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
ConcurrentHashMap的put方法获主要是获取获取key值的哈希函数,然后根据hash获取Segment段,接着调用Segment的put方法,它的实现如下。
V put(K key, int hash, V value, boolean onlyIfAbsent) {
//对segment加锁
lock();
try {
int c = count;
if (c++ > threshold) //如果超过再散列的阈值
rehash(); //执行再散列,table 数组的长度将扩充一倍
HashEntry<K,V>[] tab = table;
//把散列码值与 table 数组的长度减 1 的值相“与”
//得到该散列码对应的 table 数组的下标值
int index = hash & (tab.length - 1);
//找到散列码对应的具体的那个桶
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) { //如果键值对以经存在
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value; // 设置 value 值
}
else { //键值对不存在
oldValue = null;
++modCount; //添加新节点到链表中,modCont 要加 1
// 创建新节点,并添加到链表的头部
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; //写 count 变量
}
return oldValue;
} finally {
unlock(); //解锁
}
}
get方法的实现
在接着看get方法的实现
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//先定位Segment,再定位HashEntry
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
由于get并不会导致线程安全问题,所以直接从Segment取HashEntry就行了,并不用加锁。
在JDK1.8中,已经放弃了分段锁的方式,Segment数组也没有了。所有的HashEntry都存放在Node数组中,并且采用CAS+Synchronize的加锁方式,在put方法中,会先判断所存放的Node的位置是否有值,即是否会产生HASH冲突,如果没值,直接采用CAS加锁,存放HashEntry,如果有,则采用Synchronize加锁后再进行存放逻辑。有兴趣的可以去看JDK1.8中ConcurrentHashMap的实现,这里就不说了。
Synchronize实现原理
通过前面的学习,我们已经对多线程的基础知识掌握充分了,靠着这些基础知识,可以挑战更高难度的Boss了,那么接下来就看一下Synchronize在JVM和Art虚拟机中的实现源码。
JVM中的实现原理
我们平常在使用Synchronize进行加锁时,主要有两个方式,一种是锁住整个方法,将Synchronize字段加载方法名上,第二种是锁住一段代码,将Synchronize用在代码中间。
public void f() {
synchronized (this) {
this.hashCode();
}
}
synchronized public void f() {
this.hashCode();
}
当Synchronize修饰方法时,会在方法的访问标志中添加ACC_SYNCHRONIZED
public synchronized void f();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED // 访问标志
Code:
stack=1, locals=1, args_size=1
0: aload_0
1: invokevirtual #2 // Method java/lang/Object.hashCode:()I
4: pop
5: return
LineNumberTable:
line 3: 0
line 4: 5
LocalVariableTable:
Start Length Slot Name Signature
0 6 0 this LMain;
Synchronize放在代码段中时,会在代码的字节码指令被锁住的代码段前后加入monitorenter和monitorexit标志。
public void f();
Code:
0: aload_0
1: dup
2: astore_1
3: monitorenter // synchronized 入口
4: aload_0
5: invokevirtual #4; //Method java/lang/Object.hashCode:()I
8: pop
9: aload_1
10: monitorexit // synchronized 正常出口
11: goto 19
14: astore_2
15: aload_1
16: monitorexit // synchronized 异常出口
17: aload_2
18: athrow
19: return
不管是在方法的访问访问标志的设置ACC_SYNCHRONIZED,还是在方法字节码指令的前后加入monitorenter和monitorexit,其实都是为了告诉解释器,这段代码需要进入管程。管程在前面的知识中提到过,我们可以把它理解成是专门帮我们管理线程并发的程序。
加锁流程
我们先看看Hotspot虚拟机是如何进入管程的。
//文件->\src\share\vm\interpreter\bytecodes.cpp
void Bytecodes::initialize() {
if (_is_initialized) return;
assert(number_of_codes <= 256, "too many bytecodes");
// Java bytecodes
// bytecode bytecode name format wide f. result tp stk traps
……
def(_iload , "iload" , "bi" , "wbii" , T_INT , 1, false);
……
def(_istore , "istore" , "bi" , "wbii" , T_VOID , -1, false);
……
def(_iastore , "iastore" , "b" , NULL , T_VOID , -3, true );
……
def(_iadd , "iadd" , "b" , NULL , T_INT , -1, false);
……
def(_monitorenter , "monitorenter" , "b" , NULL , T_VOID , -1, true );
def(_monitorexit , "monitorexit" , "b" , NULL , T_VOID , -1, true );
// platform specific JVM bytecodes
pd_initialize();
……
// initialization successful
_is_initialized = true;
}
Hotspot虚拟机在字节码初始化时,会将所有字节码对应的方法封装定义好,可以看到JVM虚拟机最多支持256个字节码。接着看一下解析函数中时如何处理monitorenter的
//文件->\src\share\vm\interpreter\bytecodeInterpreter.cpp
void BytecodeInterpreter::run(interpreterState istate) {
……
switch (opcode)
{
……
CASE(_iload):
CASE(_fload):
SET_STACK_SLOT(LOCALS_SLOT(pc[1]), 0);
UPDATE_PC_AND_TOS_AND_CONTINUE(2, 1);
……
CASE(_istore):
CASE(_fstore):
SET_LOCALS_SLOT(STACK_SLOT(-1), pc[1]);
UPDATE_PC_AND_TOS_AND_CONTINUE(2, -1);
……
CASE(_return): {
// Allow a safepoint before returning to frame manager.
SAFEPOINT;
goto handle_return;
}
……
/* monitorenter and monitorexit for locking/unlocking an object */
CASE(_monitorenter): {
//1,获取对象头,这个oop就是前面提到过的包含了markwork和klass的对象头
oop lockee = STACK_OBJECT(-1);
// derefing's lockee ought to provoke implicit null check
CHECK_NULL(lockee);
//线程栈中最后一个锁,这里是一个屏障
BasicObjectLock* limit = istate->monitor_base();
// 2,在私有线程栈找到一个最近并且空闲的锁,BasicObjectLock是锁的基类
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
BasicObjectLock* entry = NULL;
//如果获取的空闲所不是最后一个锁,说明锁可用
while (most_recent != limit ) {
//如果获取的锁中的对象头和当前的对象头一致,说明这个锁被分配给了这个对象,most_recent记录就不用加1,如果不是,说明这个锁没被使用most_recent加1
if (most_recent->obj() == NULL) entry = most_recent;
else if (most_recent->obj() == lockee) break;
most_recent++;
}
if (entry != NULL) {
//3,将对象头赋值给获取到的锁
entry->set_obj(lockee);
//4,创建一个无锁的markword头
markOop displaced = lockee->mark()->set_unlocked();
entry->lock()->set_displaced_header(displaced);
//5,通过cas将无锁的markword赋值给lockee对象头
if (Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// 判断是否是锁重入,如果是重入,则不需要再次加锁
if (THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
entry->lock()->set_displaced_header(NULL);
} else {
//6,执行加锁逻辑
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
} else {
istate->set_msg(more_monitors);
UPDATE_PC_AND_RETURN(0); // Re-execute
}
}
CASE(_monitorexit): {
//1,获取对象头
oop lockee = STACK_OBJECT(-1);
CHECK_NULL(lockee);
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
while (most_recent != limit ) {
if ((most_recent)->obj() == lockee) {
BasicLock* lock = most_recent->lock();
//2,获取对象头中的mardword头
markOop header = lock->displaced_header();
most_recent->set_obj(NULL);
// If it isn't recursive we either must swap old header or call the runtime
if (header != NULL) {
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
// restore object for the slow case
most_recent->set_obj(lockee);
//3,执行释放锁的逻辑
CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
most_recent++;
}
// Need to throw illegal monitor state exception
CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
ShouldNotReachHere();
}
……
return;
}
可以看到虚拟机的解释器解析字节码指令的本质,是一个很长的switch函数,解析到的所有的字节码,如load,add,return,monitorenter等等都有对应的处理逻辑。我们先看monitorenter的处理逻辑,它主要做了这几件事情:
- 获取对象头,一个对象头由markword(数据)和klass(方法指针)组成
- 获取一个可用锁,这里用到了享元模式来复用锁达到优化的目的
- 获取对象头中的markword,并通过cas操作初始化锁状态
- 调用InterpreterRuntime::monitorenter进行加锁操作
接着看InterpreterRuntime::monitorenter函数
//文件-->\src\share\vm\interpreter\interpreterRuntime.cpp
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
if (PrintBiasedLockingStatistics) {
Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
}
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
//判断虚拟机是否开启了偏向锁
if (UseBiasedLocking) {
// 偏向锁加锁
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
//自旋锁或重量级锁加锁
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
"must be NULL or an object");
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
monitorenter主要根据虚拟机是否开启偏向锁来进行偏向锁加锁,如果没开启,则进行自旋锁或重量级锁加锁。先看偏向锁的加锁流程,它的实现在fast_enter函数。
偏向锁加锁流程
//文件-->\src\share\vm\runtime\synchronizer.cpp
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
//判断是否开启了偏向锁
if (UseBiasedLocking) {
//安全检查
if (!SafepointSynchronize::is_at_safepoint()) {
//偏向锁测序或者重偏向逻辑
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
//如果没有开启偏向锁,还是会走重量级锁的加锁流程
slow_enter (obj, lock, THREAD) ;
}
fast_enter的关键流程在revoke_and_rebias函数中实现,函数中的逻辑主要如下:
- 判断markwork是否为偏向锁状态,也就是偏向锁标志位是否为 1,如果为是偏向锁状态,进入下一步检测,如果不是,直接通过CAS进行偏向锁加锁,加锁成功后就可进入临界区执行临界区的字节码;
- 如果是偏向锁状态,则检测markwork中ThreadId,如果指向当前线程,则可以直接进入临界区;如果为空,则进入步骤3;如果指向其它线程,进入步骤4;
- 通过CAS设置markwork中ThreadId为当前线程ID,如果执行CAS成功,表示偏向锁加锁成功,进入临界区,否则进入步骤4;
- 如果执行CAS失败,表示当前存在多个线程竞争锁,撤销偏向锁,执行slow_enter流程。
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
// We can revoke the biases of anonymously-biased objects
// efficiently enough that we should not cause these revocations to
// update the heuristics because doing so may cause unwanted bulk
// revocations (which are expensive) to occur.
markOop mark = obj->mark();
if (mark->is_biased_anonymously() && !attempt_rebias) {
//匿名偏向状态,即ThreadId为0以及偏向标志关闭,则需要撤销偏向锁。
markOop biased_value = mark;
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
//返回BIAS_REVOKED标志后,fast_enter函数中会接着走slow_enter逻辑
return BIAS_REVOKED;
}
} else if (mark->has_bias_pattern()) {
Klass* k = obj->klass();
markOop prototype_header = k->prototype_header();
if (!prototype_header->has_bias_pattern()) {
//如果关闭偏向锁模式,则需要撤销偏向锁
markOop biased_value = mark;
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
return BIAS_REVOKED;
} else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
//偏向锁过期
if (attempt_rebias) {
assert(THREAD->is_Java_thread(), "");
markOop biased_value = mark;
//如果attempt_rebias开启,重新设置过期时间
markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED_AND_REBIASED;
}
} else {
//如果attempt_rebias关闭,则撤销偏向锁
markOop biased_value = mark;
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED;
}
}
}
}
//更新撤销偏向锁计数,并返回偏向锁撤销次数和偏向次数
HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
if (heuristics == HR_NOT_BIASED) {
return NOT_BIASED;
} else if (heuristics == HR_SINGLE_REVOKE) {
//如果要撤销或者重偏向偏向锁的线程是当前线程,则直接撤销当前线程线程的偏向锁
Klass *k = obj->klass();
markOop prototype_header = k->prototype_header();
if (mark->biased_locker() == THREAD &&
prototype_header->bias_epoch() == mark->bias_epoch()) {
ResourceMark rm;
if (TraceBiasedLocking) {
tty->print_cr("Revoking bias by walking my own stack:");
}
//撤销偏向锁或者重偏向
BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);
((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
assert(cond == BIAS_REVOKED, "why not?");
return cond;
} else {
//如果不是当前线程,将方法提交到虚拟机的线程栈中执行
VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
VMThread::execute(&revoke);
return revoke.status_code();
}
}
assert((heuristics == HR_BULK_REVOKE) ||
(heuristics == HR_BULK_REBIAS), "?");
//当撤销偏向锁的次数达到阈值,则表示这个对象不适合偏向锁,于是对所有使用了这个对象的线程进行批量撤销或批量重偏
VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
(heuristics == HR_BULK_REBIAS),
attempt_rebias);
VMThread::execute(&bulk_revoke);
return bulk_revoke.status_code();
}
接着看revoke_bias函数,是如何撤销或者重偏向锁的
static BiasedLocking::Condition revoke_bias(oop obj, bool allow_rebias, bool is_bulk, JavaThread* requesting_thread) {
markOop mark = obj->mark();
……
uint age = mark->age();
markOop biased_prototype = markOopDesc::biased_locking_prototype()->set_age(age);
markOop unbiased_prototype = markOopDesc::prototype()->set_age(age);
JavaThread* biased_thread = mark->biased_locker();
if (biased_thread == NULL) {
// 匿名偏向
if (!allow_rebias) {
obj->set_mark(unbiased_prototype);
}
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of anonymously-biased object");
}
return BiasedLocking::BIAS_REVOKED;
}
// 判断线程是否存活
bool thread_is_alive = false;
if (requesting_thread == biased_thread) {
thread_is_alive = true;
} else {
for (JavaThread* cur_thread = Threads::first(); cur_thread != NULL; cur_thread = cur_thread->next()) {
if (cur_thread == biased_thread) {
thread_is_alive = true;
break;
}
}
}
//如果线程不存活,则将markword设置为匿名偏向锁或者无锁状态
if (!thread_is_alive) {
if (allow_rebias) {
obj->set_mark(biased_prototype);
} else {
obj->set_mark(unbiased_prototype);
}
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of object biased toward dead thread");
}
return BiasedLocking::BIAS_REVOKED;
}
// 线程还存活则遍历线程栈中所有的Lock Record
GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);
BasicLock* highest_lock = NULL;
for (int i = 0; i < cached_monitor_info->length(); i++) {
MonitorInfo* mon_info = cached_monitor_info->at(i);
// 如果能找到对应的Lock Record说明偏向的线程还在执行同步代码块中的代码
if (mon_info->owner() == obj) {
if (TraceBiasedLocking && Verbose) {
tty->print_cr(" mon_info->owner (" PTR_FORMAT ") == obj (" PTR_FORMAT ")",
(void *) mon_info->owner(),
(void *) obj);
}
// 需要升级为轻量级锁,直接修改偏向线程栈中的Lock Record
markOop mark = markOopDesc::encode((BasicLock*) NULL);
highest_lock = mon_info->lock();
highest_lock->set_displaced_header(mark);
}
}
if (highest_lock != NULL) {
// 修改第一个Lock Record为无锁状态,然后将obj的mark word设置为指向该Lock Record的指针
highest_lock->set_displaced_header(unbiased_prototype);
// Reset object header to point to displaced mark
obj->set_mark(markOopDesc::encode(highest_lock));
assert(!obj->mark()->has_bias_pattern(), "illegal mark state: stack lock used bias bit");
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of currently-locked object");
}
} else {
// 走到这里说明偏向线程已经不在同步块中了
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of currently-unlocked object");
}
if (allow_rebias) {
//设置为匿名偏向状态
obj->set_mark(biased_prototype);
} else {
// 将mark word设置为无锁状态
obj->set_mark(unbiased_prototype);
}
}
return BiasedLocking::BIAS_REVOKED;
}
了解了偏向锁的加锁流程,再接着看自旋锁和重量级锁的加锁流程slow_enter。
轻量级锁加锁流程
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
//是否为无锁状态
if (mark->is_neutral()) {
//如果是无锁状态,通过cas加轻量级锁,cas成功则表示加锁成功
lock->set_displaced_header(mark);
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}
} else
//如果是有锁状态,判断是否是同一把锁,如果是,则直接进入临界区
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
lock->set_displaced_header(NULL);
return;
}
lock->set_displaced_header(markOopDesc::unused_mark());
//如果上面两种状态都不满足,说明出现了锁的竞争情况,轻量级锁需要膨胀成重量级锁
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
slow_enter会判断是否是无锁,如果是,则通过CAS进行轻量级锁加锁则,如果有锁,则判断是否是同意把锁,如果是,也可以直接进入临界区,如果不是,轻量级锁需要调用flate函数膨胀成重量级锁,膨胀成重量级锁后,执行enter方法。我们先看看膨胀过程。
轻量级锁膨胀过程
我们接着看inflate是如何进行膨胀的
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
for (;;) {
const markOop mark = object->mark() ;
assert (!mark->has_bias_pattern(), "invariant") ;
// 判断是否有monitor,这个monitor就是管程对象,如果已经有了管程,说明已经是重量级锁了,如果是重量级锁则退出膨胀
if (mark->has_monitor()) {
ObjectMonitor * inf = mark->monitor() ;
assert (inf->header()->is_neutral(), "invariant");
assert (inf->object() == object, "invariant") ;
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
return inf ;
}
//其他线程正在进入膨胀过程,即自旋锁升级重量级锁的过程,当前线程需要进行等待
if (mark == markOopDesc::INFLATING()) {
TEVENT (Inflate: spin while INFLATING) ;
ReadStableMark(object) ;
continue ;
}
//判断是否是轻量级锁
if (mark->has_locker()) {
//获取一个可用管程,ObjectMonitor就是JVM的管程对象
ObjectMonitor * m = omAlloc (Self) ;
m->Recycle();
m->_Responsible = NULL ;
m->OwnerIsThread = 0 ;
m->_recursions = 0 ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
//CAS操作标识Mark Word正在膨胀
if (cmp != mark) {
omRelease (Self, m, true) ;
continue ;
}
//CAS成功,初始化管程ObjectMonitor的信息
markOop dmw = mark->displaced_mark_helper() ;
assert (dmw->is_neutral(), "invariant") ;
m->set_header(dmw) ;
m->set_owner(mark->locker());
guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;
object->release_set_mark(markOopDesc::encode(m));
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
return m ;
}
……
}
}
在flate膨胀过程中,我们获取了重量级锁ObjectMonitor,这个就是JVM的管程对象,并且调用管程的enter方法,开始进入管程模型。
重量级锁加锁流程
接着看看ObjectMonitor管程的enter方法里面做了什么事情。
void ATTR ObjectMonitor::enter(TRAPS) {
// The following code is ordered to check the most common cases first
// and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
Thread * const Self = THREAD ;
void * cur ;
……
//尝试自旋获取锁
if (Knob_SpinEarly && TrySpin (Self) > 0) {
assert (_owner == Self , "invariant") ;
assert (_recursions == 0 , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
Self->_Stalled = 0 ;
return ;
}
……
for (;;) {
……
//自旋失败则进入EnterI流程。
EnterI (THREAD) ;
……
}
……
}
ObjectMonitor的enter方法中,会先通过自旋获取锁,如果自旋超过一定的次数,则自旋失败,进入EnterI流程。
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
//尝试获取锁
if (TryLock (Self) > 0) {
return ;
}
//再次尝试自旋
if (TrySpin (Self) > 0) {
return ;
}
//将当前线程封装成ObjectWaiter对象
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
ObjectWaiter * nxt ;
for (;;) {
//通过循环确保ObjectWaiter插入了队列
nod
e._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// 继续挣扎一下,尝试获取锁
if (TryLock (Self) > 0) {
return ;
}
}
if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
TEVENT (Inflated enter - Contention) ;
int nWakeups = 0 ;
int RecheckInterval = 1 ;
for (;;) {
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
// park self
if (_Responsible == Self || (SyncFlags & 1)) {
//延时挂起当前线程
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
//挂起当前线程
Self->_ParkEvent->park() ;
}
//依然在尝试获取锁
if (TryLock(Self) > 0) break ;
……
}
……
return ;
}
EnterI主要做了这两事情
- 将当前线程封装成ObjectWaiter,并放入等待队列
- 调用ParkEvent的park方法,ParkEvent是Thread对象的内部类,park方法会将当前线程进行挂起。
可以看到,EnterI的方法中不断的在调用tryLock尝试获取锁,主要原因也是因为将线程挂起,然后再唤醒的性能开销是比较大的,能不挂起线程就最好不挂起线程。
加锁的流程讲完了,接着来看一下释放锁的流程。
释放锁流程
释放锁的入口逻辑如下:
CASE(_monitorexit): {
//1,获取对象头
oop lockee = STACK_OBJECT(-1);
CHECK_NULL(lockee);
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
while (most_recent != limit ) {
if ((most_recent)->obj() == lockee) {
BasicLock* lock = most_recent->lock();
//2,获取对象头中的markword头
markOop header = lock->displaced_header();
most_recent->set_obj(NULL);
if (header != NULL) {
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
// restore object for the slow case
most_recent->set_obj(lockee);
//3,执行释放锁的逻辑
CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
most_recent++;
}
// Need to throw illegal monitor state exception
CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
ShouldNotReachHere();
}
_monitorexit的入口逻辑主要调用InterpreterRuntime::monitorexit函数执行释放锁的逻辑,接着往下看
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
if (elem == NULL || h_obj()->is_unlocked()) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
// Free entry. This must be done here, since a pending exception might be installed on
// exit. If it is not cleared, the exception handling code will try to unlock the monitor again.
elem->set_obj(NULL);
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
这里调用了slow_exit函数
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
fast_exit (object, lock, THREAD) ;
}
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
// if displaced header is null, the previous enter is recursive enter, no-op
markOop dhw = lock->displaced_header();
markOop mark ;
if (dhw == NULL) {
//重入锁,直接释放
mark = object->mark() ;
assert (!mark->is_neutral(), "invariant") ;
if (mark->has_locker() && mark != markOopDesc::INFLATING()) {
assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ;
}
if (mark->has_monitor()) {
ObjectMonitor * m = mark->monitor() ;
assert(((oop)(m->object()))->mark() == mark, "invariant") ;
assert(m->is_entered(THREAD), "invariant") ;
}
return ;
}
mark = object->mark() ;
if (mark == (markOop) lock) {
//轻量级锁,直接释放
assert (dhw->is_neutral(), "invariant") ;
if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
TEVENT (fast_exit: release stacklock) ;
return;
}
}
//重量级锁释放
ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}
slow_exit函数里面又调用了fast_exit,这一块的调用逻辑我觉得JVM的团队需要优化一下,方法名给人很大的歧义性。fast_exit主要做了三件事
- 如果是重入锁,则直接return
- 如果是轻量级锁,则CAS重写mardword信息,释放锁
- 如果是重量级锁,通过infalte获得Monitor,然后调用Monitor的exit方法。
重量级锁释放流程
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * Self = THREAD ;
……
for (;;) {
assert (THREAD == _owner, "invariant") ;
……
ObjectWaiter * w = NULL ;
int QMode = Knob_QMode ;
if (QMode == 2 && _cxq != NULL) {
w = _cxq ;
assert (w != NULL, "invariant") ;
assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
ExitEpilog (Self, w) ;
return ;
}
if (QMode == 3 && _cxq != NULL) {
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
// Append the RATs to the EntryList
// TODO: organize EntryList as a CDLL so we can locate the tail in constant-time.
ObjectWaiter * Tail ;
for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
if (Tail == NULL) {
_EntryList = w ;
} else {
Tail->_next = w ;
w->_prev = Tail ;
}
// Fall thru into code that tries to wake a successor from EntryList
}
if (QMode == 4 && _cxq != NULL) {
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
// Prepend the RATs to the EntryList
if (_EntryList != NULL) {
q->_next = _EntryList ;
_EntryList->_prev = q ;
}
_EntryList = w ;
// Fall thru into code that tries to wake a successor from EntryList
}
w = _EntryList ;
if (w != NULL) {
assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
if (QMode == 1) {
// QMode == 1 : drain cxq to EntryList, reversing order
// We also reverse the order of the list.
ObjectWaiter * s = NULL ;
ObjectWaiter * t = w ;
ObjectWaiter * u = NULL ;
while (t != NULL) {
guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
t->TState = ObjectWaiter::TS_ENTER ;
u = t->_next ;
t->_prev = u ;
t->_next = s ;
s = t;
t = u ;
}
_EntryList = s ;
assert (s != NULL, "invariant") ;
} else {
// QMode == 0 or QMode == 2
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
}
if (_succ != NULL) continue;
w = _EntryList ;
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
}
}
exit函数主要的逻辑是根据QMode,也就是优先级的模式,从调用EntryList中取出一个线程,并调用ExitEpilog函数进行唤醒工作。
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
assert (_owner == Self, "invariant") ;
_succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
ParkEvent * Trigger = Wakee->_event ;
Wakee = NULL ;
// Drop the lock
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::fence() ; // ST _owner vs LD in unpark()
if (SafepointSynchronize::do_call_back()) {
TEVENT (unpark before SAFEPOINT) ;
}
//将线程唤醒
Trigger->unpark() ;
}
到这里,HotSpot版本JVM中的Synchronize实现原理已经讲完了。接下来我们再看一下Art虚拟机Synchronize的实现原理,由于他们有很多类似的地方,所以Art中的Synchronize实现逻辑不会不会非常详细的去讲,我们看一下大概流程即可。
Art中的实现原理
Art中Synchronize入口和出口都在interpreter_switch_impl解释器的ExecuteSwitchImpl函数中。
/art/runtime/interpreter/interpreter_switch_impl.cc
进入管程
JValue ExecuteSwitchImpl(Thread* self, const DexFile::CodeItem* code_item,
ShadowFrame& shadow_frame, JValue result_register,
bool interpret_one_instruction) {
constexpr bool do_assignability_check = do_access_check;
self->VerifyStack();
uint32_t dex_pc = shadow_frame.GetDexPC();
const auto* const instrumentation = Runtime::Current()->GetInstrumentation();
const uint16_t* const insns = code_item->insns_;
const Instruction* inst = Instruction::At(insns + dex_pc);
uint16_t inst_data;
ArtMethod* method = shadow_frame.GetMethod();
jit::Jit* jit = Runtime::Current()->GetJit();
do {
dex_pc = inst->GetDexPc(insns);
shadow_frame.SetDexPC(dex_pc);
TraceExecution(shadow_frame, inst, dex_pc);
inst_data = inst->Fetch16(0);
switch (inst->Opcode(inst_data)) {
……
//Synchronize入口
case Instruction::MONITOR_ENTER: {
PREAMBLE();
ObjPtr<mirror::Object> obj = shadow_frame.GetVRegReference(inst->VRegA_11x(inst_data));
if (UNLIKELY(obj == nullptr)) {
ThrowNullPointerExceptionFromInterpreter();
HANDLE_PENDING_EXCEPTION();
} else {
DoMonitorEnter<do_assignability_check>(self, &shadow_frame, obj);
POSSIBLY_HANDLE_PENDING_EXCEPTION(self->IsExceptionPending(), Next_1xx);
}
break;
}
//Synchronize出口
case Instruction::MONITOR_EXIT: {
PREAMBLE();
ObjPtr<mirror::Object> obj = shadow_frame.GetVRegReference(inst->VRegA_11x(inst_data));
if (UNLIKELY(obj == nullptr)) {
ThrowNullPointerExceptionFromInterpreter();
HANDLE_PENDING_EXCEPTION();
} else {
DoMonitorExit<do_assignability_check>(self, &shadow_frame, obj);
POSSIBLY_HANDLE_PENDING_EXCEPTION(self->IsExceptionPending(), Next_1xx);
}
break;
}
……
}
} while (!interpret_one_instruction);
// Record where we stopped.
shadow_frame.SetDexPC(inst->GetDexPc(insns));
return result_register;
} // NOLINT(readability/fn_siz
先看Art是如何加锁的,它的处理逻辑再DoMonitorEnter函数中
加锁流程
/art/runtime/interpreter/interpreter_common.h
static inline void DoMonitorEnter(Thread* self, ShadowFrame* frame, ObjPtr<mirror::Object> ref)
NO_THREAD_SAFETY_ANALYSIS
REQUIRES(!Roles::uninterruptible_) {
StackHandleScope<1> hs(self);
Handle<mirror::Object> h_ref(hs.NewHandle(ref));
h_ref->MonitorEnter(self);
if (kMonitorCounting && frame->GetMethod()->MustCountLocks()) {
frame->GetLockCountData().AddMonitor(self, h_ref.Get());
}
}
里面最终调用了MonitorEnter函数。
mirror::Object* Monitor::MonitorEnter(Thread* self, mirror::Object* obj, bool trylock) {
DCHECK(self != nullptr);
DCHECK(obj != nullptr);
self->AssertThreadSuspensionIsAllowable();
obj = FakeLock(obj);
uint32_t thread_id = self->GetThreadId();
size_t contention_count = 0;
StackHandleScope<1> hs(self);
Handle<mirror::Object> h_obj(hs.NewHandle(obj));
while (true) {
//获取LockWord,类似JVM中的MarkWord
LockWord lock_word = h_obj->GetLockWord(false);
switch (lock_word.GetState()) {
case LockWord::kUnlocked: {
//无锁状态,则进行轻量级锁的加锁过程
LockWord thin_locked(LockWord::FromThinLockId(thread_id, 0, lock_word.GCState()));
if (h_obj->CasLockWordWeakAcquire(lock_word, thin_locked)) {
AtraceMonitorLock(self, h_obj.Get(), false /* is_wait */);
return h_obj.Get(); // Success!
}
continue; // Go again.
}
case LockWord::kThinLocked: {
//轻量级锁
uint32_t owner_thread_id = lock_word.ThinLockOwner();
// 判断lock_word的threadid是否相等
if (owner_thread_id == thread_id){
uint32_t new_count = lock_word.ThinLockCount() + 1;
if (LIKELY(new_count <= LockWord::kThinLockMaxCount)) {
//如果相等,并且小于轻量级锁的最大数量,则直接进入临界区
LockWord thin_locked(LockWord::FromThinLockId(thread_id,
new_count,
lock_word.GCState()));
if (!kUseReadBarrier) {
h_obj->SetLockWord(thin_locked, false /* volatile */);
AtraceMonitorLock(self, h_obj.Get(), false /* is_wait */);
return h_obj.Get(); // Success!
} else {
// Use CAS to preserve the read barrier state.
if (h_obj->CasLockWordWeakRelaxed(lock_word, thin_locked)) {
AtraceMonitorLock(self, h_obj.Get(), false /* is_wait */);
return h_obj.Get(); // Success!
}
}
continue; // Go again.
} else {
// 如果超出轻量级锁的数量,则进行重量级锁的加锁流程
InflateThinLocked(self, h_obj, lock_word, 0);
}
} else {
//lock_word的threadid和当前线程的threadid不等,升级成重量级锁并加锁
if (trylock) {
return nullptr;
}
// Contention.
contention_count++;
Runtime* runtime = Runtime::Current();
if (contention_count <= runtime->GetMaxSpinsBeforeThinLockInflation()) {
sched_yield();
} else {
contention_count = 0;
// No ordering required for initial lockword read. Install rereads it anyway.
InflateThinLocked(self, h_obj, lock_word, 0);
}
}
continue; // Start from the beginning.
}
case LockWord::kFatLocked: {
//重量级锁
QuasiAtomic::ThreadFenceAcquire();
//获取monitor
Monitor* mon = lock_word.FatLockMonitor()
//尝试获取锁
if (trylock) {
return mon->TryLock(self) ? h_obj.Get() : nullptr;
} else {
mon->Lock(self);
return h_obj.Get(); // Success!
}
}
default: {
LOG(FATAL) << "Invalid monitor state " << lock_word.GetState();
UNREACHABLE();
}
}
}
}
MonitorEnter函数中做的事情主要如下:
- 如果是无锁,通过CAS加轻量级锁,调用continue,进入步骤2。
- 如果是轻量级锁,判断LockWord的threadId和当前线程的ThreadId是否相等,如果相等,并且轻量级锁没有超过最大限制的情况下,return退出循环,进入临界区。如果不相等,则通过InflateThinLocked升级成重量级锁,升级的过程主要是创建和初始化Monitor的过程,调用continue,进入不走3。
- 如果是重量级锁,通过调用Monitor的lock函数获取锁
接着看lock获取锁的过程
void Monitor::Lock(Thread* self) {
MutexLock mu(self, monitor_lock_);
while (true) {
//尝试获取锁
if (TryLockLocked(self)) {
return;
}
// Contended.
const bool log_contention = (lock_profiling_threshold_ != 0);
uint64_t wait_start_ms = log_contention ? MilliTime() : 0;
ArtMethod* owners_method = locking_method_;
uint32_t owners_dex_pc = locking_dex_pc_;
// Do this before releasing the lock so that we don't get deflated.
size_t num_waiters = num_waiters_;
++num_waiters_;
monitor_lock_.Unlock(self); // Let go of locks in order.
self->SetMonitorEnterObject(GetObject());
{
ScopedThreadSuspension tsc(self, kBlocked); // Change to blocked and give up mutator_lock_.
uint32_t original_owner_thread_id = 0u;
{
// Reacquire monitor_lock_ without mutator_lock_ for Wait.
MutexLock mu2(self, monitor_lock_);
if (owner_ != nullptr) { // Did the owner_ give the lock up?
original_owner_thread_id = owner_->GetThreadId();
//调用Wait函数,将线程休眠
monitor_contenders_.Wait(self);
}
……
}
}
self->SetMonitorEnterObject(nullptr);
monitor_lock_.Lock(self); // Reacquire locks in order.
--num_waiters_;
}
}
Lock函数主要在自旋中通过TryLockLocked获取锁,获取不到的情况下调用Wait将线程休眠。接着看一下Wait是如何休眠线程的。
void Monitor::Wait(Thread* self, int64_t ms, int32_t ns,
bool interruptShouldThrow, ThreadState why) {
……
//将线程添加到等待队列
AppendToWaitSet(self);
bool was_interrupted = false;
{
self->SetWaitMonitor(this);
// Release the monitor lock.
monitor_contenders_.Signal(self);
monitor_lock_.Unlock(self);
// Handle the case where the thread was interrupted before we called wait().
if (self->IsInterruptedLocked()) {
was_interrupted = true;
} else {
if (why == kWaiting) {
// 调用线程的wait方法将线程休眠
self->GetWaitConditionVariable()->Wait(self);
} else {
DCHECK(why == kTimedWaiting || why == kSleeping) << why;
self->GetWaitConditionVariable()->TimedWait(self, ms, ns);
}
was_interrupted = self->IsInterruptedLocked();
}
}
……
}
Wait函数主要做了两件事情
- 将线程加入等待队列
- 调用线程的Wait方法,将线程休眠。
到这里我们已经清楚Art的Synchronize加锁的流程了,接下来再看一下释放锁的流程。
释放锁的流程
释放锁的逻辑处理函数MonitorExit也在Monitor这个对象中。
bool Monitor::MonitorExit(Thread* self, mirror::Object* obj) {
DCHECK(self != nullptr);
DCHECK(obj != nullptr);
self->AssertThreadSuspensionIsAllowable();
obj = FakeUnlock(obj);
StackHandleScope<1> hs(self);
Handle<mirror::Object> h_obj(hs.NewHandle(obj));
while (true) {
LockWord lock_word = obj->GetLockWord(true);
switch (lock_word.GetState()) {
case LockWord::kHashCode:
// Fall-through.
case LockWord::kUnlocked:
//无锁状态,不需要释放锁
FailedUnlock(h_obj.Get(), self->GetThreadId(), 0u, nullptr);
return false; // Failure.
case LockWord::kThinLocked: {
//轻量级锁
uint32_t thread_id = self->GetThreadId();
uint32_t owner_thread_id = lock_word.ThinLockOwner();
if (owner_thread_id != thread_id) {
FailedUnlock(h_obj.Get(), thread_id, owner_thread_id, nullptr);
return false; // Failure.
} else {
// We own the lock, decrease the recursion count.
LockWord new_lw = LockWord::Default();
if (lock_word.ThinLockCount() != 0) {
uint32_t new_count = lock_word.ThinLockCount() - 1;
new_lw = LockWord::FromThinLockId(thread_id, new_count, lock_word.GCState());
} else {
new_lw = LockWord::FromDefault(lock_word.GCState());
}
if (!kUseReadBarrier) {
DCHECK_EQ(new_lw.ReadBarrierState(), 0U);
// TODO: This really only needs memory_order_release, but we currently have
// no way to specify that. In fact there seem to be no legitimate uses of SetLockWord
// with a final argument of true. This slows down x86 and ARMv7, but probably not v8.
h_obj->SetLockWord(new_lw, true);
AtraceMonitorUnlock();
// Success!
return true;
} else {
// Use CAS to preserve the read barrier state.
if (h_obj->CasLockWordWeakRelease(lock_word, new_lw)) {
AtraceMonitorUnlock();
// Success!
return true;
}
}
continue; // Go again.
}
}
case LockWord::kFatLocked: {
//重量级锁
Monitor* mon = lock_word.FatLockMonitor();
return mon->Unlock(self);
}
default: {
LOG(FATAL) << "Invalid monitor state " << lock_word.GetState();
return false;
}
}
}
}
无锁和轻量级锁的释放流程都很简单,我们直接看重量级锁的释放过程Unlock函数的实现。
bool Monitor::Unlock(Thread* self) {
DCHECK(self != nullptr);
uint32_t owner_thread_id = 0u;
{
MutexLock mu(self, monitor_lock_);
Thread* owner = owner_;
if (owner != nullptr) {
owner_thread_id = owner->GetThreadId();
}
if (owner == self) {
// We own the monitor, so nobody else can be in here.
AtraceMonitorUnlock();
if (lock_count_ == 0) {
owner_ = nullptr;
locking_method_ = nullptr;
locking_dex_pc_ = 0;
// Wake a contender.
monitor_contenders_.Signal(self);
} else {
--lock_count_;
}
return true;
}
}
// We don't own this, so we're not allowed to unlock it.
// The JNI spec says that we should throw IllegalMonitorStateException in this case.
FailedUnlock(GetObject(), self->GetThreadId(), owner_thread_id, this);
return false;
}
复制代码
unlock方法主要调用了Signal函数,来对等待队列中的线程进行缓存。Signal最终会走到管程Monitor的Notify方法中
void Monitor::Notify(Thread* self) {
DCHECK(self != NULL);
MutexLock mu(self, monitor_lock_);
// Make sure that we hold the lock.
if (owner_ != self) {
ThrowIllegalMonitorStateExceptionF("object not locked by thread before notify()");
return;
}
// Signal the first waiting thread in the wait set.
while (wait_set_ != NULL) {
Thread* thread = wait_set_;
wait_set_ = thread->GetWaitNext();
thread->SetWaitNext(nullptr);
// Check to see if the thread is still waiting.
MutexLock mu(self, *thread->GetWaitMutex());
if (thread->GetWaitMonitor() != nullptr) {
thread->GetWaitConditionVariable()->Signal(self);
return;
}
}
}
可以看到,这里Notify函数中会通过GetWaitNext获取当前线程的下一个在等待的线程,然后通过Signal进行唤醒,所以Art中的synchronize的唤醒线程是按照顺序唤醒的。
结尾
线程的原理在这里就讲完,但是线程的知识点远不止这么一点,如果要非常全面非常深入的讲,可能需要讲一本书的内容。但是这篇文章讲的都是线程最基础的知识点,只要深入的掌握线程的基础,不管是在什么样的场景下使用线程,都会更加游刃有余。
最后
小编在这将自己收集的一份《Android核心知识汇总》分享给大家,希望能对大家有所帮助。请点击GitHub获取。 喜欢本文的话,不妨顺手给我点个小赞、评论区留言或者转发支持一下呗~