Synchronize实现原理
JVM中的实现原理
我们平常在使用Synchronize进行加锁时,主要有两个方式,一种是锁住整个方法,将Synchronize字段加载方法名上,第二种是锁住一段代码,将Synchronize用在代码中间。
public void f() {
synchronized (this) {
this.hashCode();
}
}
synchronized public void f() {
this.hashCode();
}
当Synchronize修饰方法时,会在方法的访问标志中添加ACC_SYNCHRONIZED
public synchronized void f();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED // 访问标志
Code:
stack=1, locals=1, args_size=1
0: aload_0
1: invokevirtual #2 // Method java/lang/Object.hashCode:()I
4: pop
5: return
LineNumberTable:
line 3: 0
line 4: 5
LocalVariableTable:
Start Length Slot Name Signature
0 6 0 this LMain;
Synchronize放在代码段中时,会在代码的字节码指令被锁住的代码段前后加入monitorenter和monitorexit标志。
public void f();
Code:
0: aload_0
1: dup
2: astore_1
3: monitorenter // synchronized 入口
4: aload_0
5: invokevirtual #4; //Method java/lang/Object.hashCode:()I
8: pop
9: aload_1
10: monitorexit // synchronized 正常出口
11: goto 19
14: astore_2
15: aload_1
16: monitorexit // synchronized 异常出口
17: aload_2
18: athrow
19: return
不管是在方法的访问访问标志的设置ACC_SYNCHRONIZED,还是在方法字节码指令的前后加入monitorenter和monitorexit,其实都是为了告诉解释器,这段代码需要进入管程。
加锁流程
我们先看看Hotspot虚拟机是如何进入管程的。
//文件->\src\share\vm\interpreter\bytecodes.cpp
void Bytecodes::initialize() {
if (_is_initialized) return;
assert(number_of_codes <= 256, "too many bytecodes");
// Java bytecodes
// bytecode bytecode name format wide f. result tp stk traps
……
def(_iload , "iload" , "bi" , "wbii" , T_INT , 1, false);
……
def(_istore , "istore" , "bi" , "wbii" , T_VOID , -1, false);
……
def(_iastore , "iastore" , "b" , NULL , T_VOID , -3, true );
……
def(_iadd , "iadd" , "b" , NULL , T_INT , -1, false);
……
def(_monitorenter , "monitorenter" , "b" , NULL , T_VOID , -1, true );
def(_monitorexit , "monitorexit" , "b" , NULL , T_VOID , -1, true );
// platform specific JVM bytecodes
pd_initialize();
……
// initialization successful
_is_initialized = true;
}
Hotspot虚拟机在字节码初始化时,会将所有字节码对应的方法封装定义好,可以看到JVM虚拟机最多支持256个字节码。接着看一下解析函数中时如何处理monitorenter的
//文件->\src\share\vm\interpreter\bytecodeInterpreter.cpp
void BytecodeInterpreter::run(interpreterState istate) {
……
switch (opcode)
{
……
CASE(_iload):
CASE(_fload):
SET_STACK_SLOT(LOCALS_SLOT(pc[1]), 0);
UPDATE_PC_AND_TOS_AND_CONTINUE(2, 1);
……
CASE(_istore):
CASE(_fstore):
SET_LOCALS_SLOT(STACK_SLOT(-1), pc[1]);
UPDATE_PC_AND_TOS_AND_CONTINUE(2, -1);
……
CASE(_return): {
// Allow a safepoint before returning to frame manager.
SAFEPOINT;
goto handle_return;
}
……
/* monitorenter and monitorexit for locking/unlocking an object */
CASE(_monitorenter): {
//1,获取对象头,这个oop就是前面提到过的包含了markwork和klass的对象头
oop lockee = STACK_OBJECT(-1);
// derefing's lockee ought to provoke implicit null check
CHECK_NULL(lockee);
//线程栈中最后一个锁,这里是一个屏障
BasicObjectLock* limit = istate->monitor_base();
// 2,在私有线程栈找到一个最近并且空闲的锁,BasicObjectLock是锁的基类
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
BasicObjectLock* entry = NULL;
//如果获取的空闲所不是最后一个锁,说明锁可用
while (most_recent != limit ) {
//如果获取的锁中的对象头和当前的对象头一致,说明这个锁被分配给了这个对象,most_recent记录就不用加1,如果不是,说明这个锁没被使用most_recent加1
if (most_recent->obj() == NULL) entry = most_recent;
else if (most_recent->obj() == lockee) break;
most_recent++;
}
if (entry != NULL) {
//3,将对象头赋值给获取到的锁
entry->set_obj(lockee);
//4,创建一个无锁的markword头
markOop displaced = lockee->mark()->set_unlocked();
entry->lock()->set_displaced_header(displaced);
//5,通过cas将无锁的markword赋值给lockee对象头
if (Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// 判断是否是锁重入,如果是重入,则不需要再次加锁
if (THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
entry->lock()->set_displaced_header(NULL);
} else {
//6,执行加锁逻辑
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
} else {
istate->set_msg(more_monitors);
UPDATE_PC_AND_RETURN(0); // Re-execute
}
}
CASE(_monitorexit): {
//1,获取对象头
oop lockee = STACK_OBJECT(-1);
CHECK_NULL(lockee);
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
while (most_recent != limit ) {
if ((most_recent)->obj() == lockee) {
BasicLock* lock = most_recent->lock();
//2,获取对象头中的mardword头
markOop header = lock->displaced_header();
most_recent->set_obj(NULL);
// If it isn't recursive we either must swap old header or call the runtime
if (header != NULL) {
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
// restore object for the slow case
most_recent->set_obj(lockee);
//3,执行释放锁的逻辑
CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
most_recent++;
}
// Need to throw illegal monitor state exception
CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
ShouldNotReachHere();
}
……
return;
}
可以看到虚拟机的解释器解析字节码指令的本质,是一个很长的switch函数,解析到的所有的字节码,如load,add,return,monitorenter等等都有对应的处理逻辑。我们先看monitorenter的处理逻辑,它主要做了这几件事情:
接着看InterpreterRuntime::monitorenter函数
//文件-->\src\share\vm\interpreter\interpreterRuntime.cpp
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
if (PrintBiasedLockingStatistics) {
Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
}
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
//判断虚拟机是否开启了偏向锁
if (UseBiasedLocking) {
// 偏向锁加锁
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
//自旋锁或重量级锁加锁
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
"must be NULL or an object");
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
monitorenter主要根据虚拟机是否开启偏向锁来进行偏向锁加锁,如果没开启,则进行自旋锁或重量级锁加锁。先看偏向锁的加锁流程,它的实现在fast_enter函数。
偏向锁加锁流程
//文件-->\src\share\vm\runtime\synchronizer.cpp
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
//判断是否开启了偏向锁
if (UseBiasedLocking) {
//安全检查
if (!SafepointSynchronize::is_at_safepoint()) {
//偏向锁测序或者重偏向逻辑
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
//如果没有开启偏向锁,还是会走重量级锁的加锁流程
slow_enter (obj, lock, THREAD) ;
}
fast_enter的关键流程在revoke_and_rebias函数中实现,函数中的逻辑主要如下:
- 判断markwork是否为偏向锁状态,也就是偏向锁标志位是否为 1,如果为是偏向锁状态,进入下一步检测,如果不是,直接通过CAS进行偏向锁加锁,加锁成功后就可进入临界区执行临界区的字节码;
- 如果是偏向锁状态,则检测markwork中ThreadId,如果指向当前线程,则可以直接进入临界区;如果为空,则进入步骤3;如果指向其它线程,进入步骤4;
- 通过CAS设置markwork中ThreadId为当前线程ID,如果执行CAS成功,表示偏向锁加锁成功,进入临界区,否则进入步骤4;
- 如果执行CAS失败,表示当前存在多个线程竞争锁,撤销偏向锁,执行slow_enter流程。
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
// We can revoke the biases of anonymously-biased objects
// efficiently enough that we should not cause these revocations to
// update the heuristics because doing so may cause unwanted bulk
// revocations (which are expensive) to occur.
markOop mark = obj->mark();
if (mark->is_biased_anonymously() && !attempt_rebias) {
//匿名偏向状态,即ThreadId为0以及偏向标志关闭,则需要撤销偏向锁。
markOop biased_value = mark;
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
//返回BIAS_REVOKED标志后,fast_enter函数中会接着走slow_enter逻辑
return BIAS_REVOKED;
}
} else if (mark->has_bias_pattern()) {
Klass* k = obj->klass();
markOop prototype_header = k->prototype_header();
if (!prototype_header->has_bias_pattern()) {
//如果关闭偏向锁模式,则需要撤销偏向锁
markOop biased_value = mark;
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
return BIAS_REVOKED;
} else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
//偏向锁过期
if (attempt_rebias) {
assert(THREAD->is_Java_thread(), "");
markOop biased_value = mark;
//如果attempt_rebias开启,重新设置过期时间
markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED_AND_REBIASED;
}
} else {
//如果attempt_rebias关闭,则撤销偏向锁
markOop biased_value = mark;
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED;
}
}
}
}
//更新撤销偏向锁计数,并返回偏向锁撤销次数和偏向次数
HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
if (heuristics == HR_NOT_BIASED) {
return NOT_BIASED;
} else if (heuristics == HR_SINGLE_REVOKE) {
//如果要撤销或者重偏向偏向锁的线程是当前线程,则直接撤销当前线程线程的偏向锁
Klass *k = obj->klass();
markOop prototype_header = k->prototype_header();
if (mark->biased_locker() == THREAD &&
prototype_header->bias_epoch() == mark->bias_epoch()) {
ResourceMark rm;
if (TraceBiasedLocking) {
tty->print_cr("Revoking bias by walking my own stack:");
}
//撤销偏向锁或者重偏向
BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);
((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
assert(cond == BIAS_REVOKED, "why not?");
return cond;
} else {
//如果不是当前线程,将方法提交到虚拟机的线程栈中执行
VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
VMThread::execute(&revoke);
return revoke.status_code();
}
}
assert((heuristics == HR_BULK_REVOKE) ||
(heuristics == HR_BULK_REBIAS), "?");
//当撤销偏向锁的次数达到阈值,则表示这个对象不适合偏向锁,于是对所有使用了这个对象的线程进行批量撤销或批量重偏
VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
(heuristics == HR_BULK_REBIAS),
attempt_rebias);
VMThread::execute(&bulk_revoke);
return bulk_revoke.status_code();
}
接着看revoke_bias函数,是如何撤销或者重偏向锁的
static BiasedLocking::Condition revoke_bias(oop obj, bool allow_rebias, bool is_bulk, JavaThread* requesting_thread) {
markOop mark = obj->mark();
……
uint age = mark->age();
markOop biased_prototype = markOopDesc::biased_locking_prototype()->set_age(age);
markOop unbiased_prototype = markOopDesc::prototype()->set_age(age);
JavaThread* biased_thread = mark->biased_locker();
if (biased_thread == NULL) {
// 匿名偏向
if (!allow_rebias) {
obj->set_mark(unbiased_prototype);
}
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of anonymously-biased object");
}
return BiasedLocking::BIAS_REVOKED;
}
// 判断线程是否存活
bool thread_is_alive = false;
if (requesting_thread == biased_thread) {
thread_is_alive = true;
} else {
for (JavaThread* cur_thread = Threads::first(); cur_thread != NULL; cur_thread = cur_thread->next()) {
if (cur_thread == biased_thread) {
thread_is_alive = true;
break;
}
}
}
//如果线程不存活,则将markword设置为匿名偏向锁或者无锁状态
if (!thread_is_alive) {
if (allow_rebias) {
obj->set_mark(biased_prototype);
} else {
obj->set_mark(unbiased_prototype);
}
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of object biased toward dead thread");
}
return BiasedLocking::BIAS_REVOKED;
}
// 线程还存活则遍历线程栈中所有的Lock Record
GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);
BasicLock* highest_lock = NULL;
for (int i = 0; i < cached_monitor_info->length(); i++) {
MonitorInfo* mon_info = cached_monitor_info->at(i);
// 如果能找到对应的Lock Record说明偏向的线程还在执行同步代码块中的代码
if (mon_info->owner() == obj) {
if (TraceBiasedLocking && Verbose) {
tty->print_cr(" mon_info->owner (" PTR_FORMAT ") == obj (" PTR_FORMAT ")",
(void *) mon_info->owner(),
(void *) obj);
}
// 需要升级为轻量级锁,直接修改偏向线程栈中的Lock Record
markOop mark = markOopDesc::encode((BasicLock*) NULL);
highest_lock = mon_info->lock();
highest_lock->set_displaced_header(mark);
}
}
if (highest_lock != NULL) {
// 修改第一个Lock Record为无锁状态,然后将obj的mark word设置为指向该Lock Record的指针
highest_lock->set_displaced_header(unbiased_prototype);
// Reset object header to point to displaced mark
obj->set_mark(markOopDesc::encode(highest_lock));
assert(!obj->mark()->has_bias_pattern(), "illegal mark state: stack lock used bias bit");
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of currently-locked object");
}
} else {
// 走到这里说明偏向线程已经不在同步块中了
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of currently-unlocked object");
}
if (allow_rebias) {
//设置为匿名偏向状态
obj->set_mark(biased_prototype);
} else {
// 将mark word设置为无锁状态
obj->set_mark(unbiased_prototype);
}
}
return BiasedLocking::BIAS_REVOKED;
}
了解了偏向锁的加锁流程,再接着看自旋锁和重量级锁的加锁流程slow_enter。
轻量级锁加锁流程
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
//是否为无锁状态
if (mark->is_neutral()) {
//如果是无锁状态,通过cas加轻量级锁,cas成功则表示加锁成功
lock->set_displaced_header(mark);
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}
} else
//如果是有锁状态,判断是否是同一把锁,如果是,则直接进入临界区
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
lock->set_displaced_header(NULL);
return;
}
lock->set_displaced_header(markOopDesc::unused_mark());
//如果上面两种状态都不满足,说明出现了锁的竞争情况,轻量级锁需要膨胀成重量级锁
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
slow_enter会判断是否是无锁,如果是,则通过CAS进行轻量级锁加锁则,如果有锁,则判断是否是同意把锁,如果是,也可以直接进入临界区,如果不是,轻量级锁需要调用flate函数膨胀成重量级锁,膨胀成重量级锁后,执行enter方法。我们先看看膨胀过程。
轻量级锁膨胀过程
我们接着看inflate是如何进行膨胀的
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
for (;;) {
const markOop mark = object->mark() ;
assert (!mark->has_bias_pattern(), "invariant") ;
// 判断是否有monitor,这个monitor就是管程对象,如果已经有了管程,说明已经是重量级锁了,如果是重量级锁则退出膨胀
if (mark->has_monitor()) {
ObjectMonitor * inf = mark->monitor() ;
assert (inf->header()->is_neutral(), "invariant");
assert (inf->object() == object, "invariant") ;
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
return inf ;
}
//其他线程正在进入膨胀过程,即自旋锁升级重量级锁的过程,当前线程需要进行等待
if (mark == markOopDesc::INFLATING()) {
TEVENT (Inflate: spin while INFLATING) ;
ReadStableMark(object) ;
continue ;
}
//判断是否是轻量级锁
if (mark->has_locker()) {
//获取一个可用管程,ObjectMonitor就是JVM的管程对象
ObjectMonitor * m = omAlloc (Self) ;
m->Recycle();
m->_Responsible = NULL ;
m->OwnerIsThread = 0 ;
m->_recursions = 0 ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
//CAS操作标识Mark Word正在膨胀
if (cmp != mark) {
omRelease (Self, m, true) ;
continue ;
}
//CAS成功,初始化管程ObjectMonitor的信息
markOop dmw = mark->displaced_mark_helper() ;
assert (dmw->is_neutral(), "invariant") ;
m->set_header(dmw) ;
m->set_owner(mark->locker());
guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;
object->release_set_mark(markOopDesc::encode(m));
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
return m ;
}
……
}
}
在flate膨胀过程中,我们获取了重量级锁ObjectMonitor,这个就是JVM的管程对象,并且调用管程的enter方法,开始进入管程模型。
重量级锁加锁流程
接着看看ObjectMonitor管程的enter方法里面做了什么事情。
void ATTR ObjectMonitor::enter(TRAPS) {
// The following code is ordered to check the most common cases first
// and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
Thread * const Self = THREAD ;
void * cur ;
……
//尝试自旋获取锁
if (Knob_SpinEarly && TrySpin (Self) > 0) {
assert (_owner == Self , "invariant") ;
assert (_recursions == 0 , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
Self->_Stalled = 0 ;
return ;
}
……
for (;;) {
……
//自旋失败则进入EnterI流程。
EnterI (THREAD) ;
……
}
……
}
ObjectMonitor的enter方法中,会先通过自旋获取锁,如果自旋超过一定的次数,则自旋失败,进入EnterI流程。
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
//尝试获取锁
if (TryLock (Self) > 0) {
return ;
}
//再次尝试自旋
if (TrySpin (Self) > 0) {
return ;
}
//将当前线程封装成ObjectWaiter对象
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
ObjectWaiter * nxt ;
for (;;) {
//通过循环确保ObjectWaiter插入了队列
nod
e._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// 继续挣扎一下,尝试获取锁
if (TryLock (Self) > 0) {
return ;
}
}
if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
TEVENT (Inflated enter - Contention) ;
int nWakeups = 0 ;
int RecheckInterval = 1 ;
for (;;) {
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
// park self
if (_Responsible == Self || (SyncFlags & 1)) {
//延时挂起当前线程
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
//挂起当前线程
Self->_ParkEvent->park() ;
}
//依然在尝试获取锁
if (TryLock(Self) > 0) break ;
……
}
……
return ;
}
EnterI主要做了这两事情
- 将当前线程封装成ObjectWaiter,并放入等待队列
- 调用ParkEvent的park方法,ParkEvent是Thread对象的内部类,park方法会将当前线程进行挂起。
可以看到,EnterI的方法中不断的在调用tryLock尝试获取锁,主要原因也是因为将线程挂起,然后再唤醒的性能开销是比较大的,能不挂起线程就最好不挂起线程。
释放锁流程
释放锁的入口逻辑如下:
CASE(_monitorexit): {
//1,获取对象头
oop lockee = STACK_OBJECT(-1);
CHECK_NULL(lockee);
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
while (most_recent != limit ) {
if ((most_recent)->obj() == lockee) {
BasicLock* lock = most_recent->lock();
//2,获取对象头中的markword头
markOop header = lock->displaced_header();
most_recent->set_obj(NULL);
if (header != NULL) {
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
// restore object for the slow case
most_recent->set_obj(lockee);
//3,执行释放锁的逻辑
CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
most_recent++;
}
// Need to throw illegal monitor state exception
CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
ShouldNotReachHere();
}
_monitorexit的入口逻辑主要调用InterpreterRuntime::monitorexit函数执行释放锁的逻辑,接着往下看
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
if (elem == NULL || h_obj()->is_unlocked()) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
// Free entry. This must be done here, since a pending exception might be installed on
// exit. If it is not cleared, the exception handling code will try to unlock the monitor again.
elem->set_obj(NULL);
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
这里调用了slow_exit函数
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
fast_exit (object, lock, THREAD) ;
}
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
// if displaced header is null, the previous enter is recursive enter, no-op
markOop dhw = lock->displaced_header();
markOop mark ;
if (dhw == NULL) {
//重入锁,直接释放
mark = object->mark() ;
assert (!mark->is_neutral(), "invariant") ;
if (mark->has_locker() && mark != markOopDesc::INFLATING()) {
assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ;
}
if (mark->has_monitor()) {
ObjectMonitor * m = mark->monitor() ;
assert(((oop)(m->object()))->mark() == mark, "invariant") ;
assert(m->is_entered(THREAD), "invariant") ;
}
return ;
}
mark = object->mark() ;
if (mark == (markOop) lock) {
//轻量级锁,直接释放
assert (dhw->is_neutral(), "invariant") ;
if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
TEVENT (fast_exit: release stacklock) ;
return;
}
}
//重量级锁释放
ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}
slow_exit函数里面又调用了fast_exit,这一块的调用逻辑我觉得JVM的团队需要优化一下,方法名给人很大的歧义性。fast_exit主要做了三件事
- 如果是重入锁,则直接return
- 如果是轻量级锁,则CAS重写mardword信息,释放锁
- 如果是重量级锁,通过infalte获得Monitor,然后调用Monitor的exit方法。
重量级锁释放流程
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * Self = THREAD ;
……
for (;;) {
assert (THREAD == _owner, "invariant") ;
……
ObjectWaiter * w = NULL ;
int QMode = Knob_QMode ;
if (QMode == 2 && _cxq != NULL) {
w = _cxq ;
assert (w != NULL, "invariant") ;
assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
ExitEpilog (Self, w) ;
return ;
}
if (QMode == 3 && _cxq != NULL) {
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
// Append the RATs to the EntryList
// TODO: organize EntryList as a CDLL so we can locate the tail in constant-time.
ObjectWaiter * Tail ;
for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
if (Tail == NULL) {
_EntryList = w ;
} else {
Tail->_next = w ;
w->_prev = Tail ;
}
// Fall thru into code that tries to wake a successor from EntryList
}
if (QMode == 4 && _cxq != NULL) {
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
// Prepend the RATs to the EntryList
if (_EntryList != NULL) {
q->_next = _EntryList ;
_EntryList->_prev = q ;
}
_EntryList = w ;
// Fall thru into code that tries to wake a successor from EntryList
}
w = _EntryList ;
if (w != NULL) {
assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
if (QMode == 1) {
// QMode == 1 : drain cxq to EntryList, reversing order
// We also reverse the order of the list.
ObjectWaiter * s = NULL ;
ObjectWaiter * t = w ;
ObjectWaiter * u = NULL ;
while (t != NULL) {
guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
t->TState = ObjectWaiter::TS_ENTER ;
u = t->_next ;
t->_prev = u ;
t->_next = s ;
s = t;
t = u ;
}
_EntryList = s ;
assert (s != NULL, "invariant") ;
} else {
// QMode == 0 or QMode == 2
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
}
if (_succ != NULL) continue;
w = _EntryList ;
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
}
}
exit函数主要的逻辑是根据QMode,也就是优先级的模式,从调用EntryList中取出一个线程,并调用ExitEpilog函数进行唤醒工作。
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
assert (_owner == Self, "invariant") ;
_succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
ParkEvent * Trigger = Wakee->_event ;
Wakee = NULL ;
// Drop the lock
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::fence() ; // ST _owner vs LD in unpark()
if (SafepointSynchronize::do_call_back()) {
TEVENT (unpark before SAFEPOINT) ;
}
//将线程唤醒
Trigger->unpark() ;
}
Art中的实现原理
Art中Synchronize入口和出口都在interpreter_switch_impl解释器的ExecuteSwitchImpl函数中。
/art/runtime/interpreter/interpreter_switch_impl.cc
进入管程
JValue ExecuteSwitchImpl(Thread* self, const DexFile::CodeItem* code_item,
ShadowFrame& shadow_frame, JValue result_register,
bool interpret_one_instruction) {
constexpr bool do_assignability_check = do_access_check;
self->VerifyStack();
uint32_t dex_pc = shadow_frame.GetDexPC();
const auto* const instrumentation = Runtime::Current()->GetInstrumentation();
const uint16_t* const insns = code_item->insns_;
const Instruction* inst = Instruction::At(insns + dex_pc);
uint16_t inst_data;
ArtMethod* method = shadow_frame.GetMethod();
jit::Jit* jit = Runtime::Current()->GetJit();
do {
dex_pc = inst->GetDexPc(insns);
shadow_frame.SetDexPC(dex_pc);
TraceExecution(shadow_frame, inst, dex_pc);
inst_data = inst->Fetch16(0);
switch (inst->Opcode(inst_data)) {
……
//Synchronize入口
case Instruction::MONITOR_ENTER: {
PREAMBLE();
ObjPtr<mirror::Object> obj = shadow_frame.GetVRegReference(inst->VRegA_11x(inst_data));
if (UNLIKELY(obj == nullptr)) {
ThrowNullPointerExceptionFromInterpreter();
HANDLE_PENDING_EXCEPTION();
} else {
DoMonitorEnter<do_assignability_check>(self, &shadow_frame, obj);
POSSIBLY_HANDLE_PENDING_EXCEPTION(self->IsExceptionPending(), Next_1xx);
}
break;
}
//Synchronize出口
case Instruction::MONITOR_EXIT: {
PREAMBLE();
ObjPtr<mirror::Object> obj = shadow_frame.GetVRegReference(inst->VRegA_11x(inst_data));
if (UNLIKELY(obj == nullptr)) {
ThrowNullPointerExceptionFromInterpreter();
HANDLE_PENDING_EXCEPTION();
} else {
DoMonitorExit<do_assignability_check>(self, &shadow_frame, obj);
POSSIBLY_HANDLE_PENDING_EXCEPTION(self->IsExceptionPending(), Next_1xx);
}
break;
}
……
}
} while (!interpret_one_instruction);
// Record where we stopped.
shadow_frame.SetDexPC(inst->GetDexPc(insns));
return result_register;
} // NOLINT(readability/fn_siz
先看Art是如何加锁的,它的处理逻辑再DoMonitorEnter函数中
加锁流程
/art/runtime/interpreter/interpreter_common.h
static inline void DoMonitorEnter(Thread* self, ShadowFrame* frame, ObjPtr<mirror::Object> ref)
NO_THREAD_SAFETY_ANALYSIS
REQUIRES(!Roles::uninterruptible_) {
StackHandleScope<1> hs(self);
Handle<mirror::Object> h_ref(hs.NewHandle(ref));
h_ref->MonitorEnter(self);
if (kMonitorCounting && frame->GetMethod()->MustCountLocks()) {
frame->GetLockCountData().AddMonitor(self, h_ref.Get());
}
}
里面最终调用了MonitorEnter函数。
/art/runtime/monitor.cc
mirror::Object* Monitor::MonitorEnter(Thread* self, mirror::Object* obj, bool trylock) {
DCHECK(self != nullptr);
DCHECK(obj != nullptr);
self->AssertThreadSuspensionIsAllowable();
obj = FakeLock(obj);
uint32_t thread_id = self->GetThreadId();
size_t contention_count = 0;
StackHandleScope<1> hs(self);
Handle<mirror::Object> h_obj(hs.NewHandle(obj));
while (true) {
//获取LockWord,类似JVM中的MarkWord
LockWord lock_word = h_obj->GetLockWord(false);
switch (lock_word.GetState()) {
case LockWord::kUnlocked: {
//无锁状态,则进行轻量级锁的加锁过程
LockWord thin_locked(LockWord::FromThinLockId(thread_id, 0, lock_word.GCState()));
if (h_obj->CasLockWordWeakAcquire(lock_word, thin_locked)) {
AtraceMonitorLock(self, h_obj.Get(), false /* is_wait */);
return h_obj.Get(); // Success!
}
continue; // Go again.
}
case LockWord::kThinLocked: {
//轻量级锁
uint32_t owner_thread_id = lock_word.ThinLockOwner();
// 判断lock_word的threadid是否相等
if (owner_thread_id == thread_id){
uint32_t new_count = lock_word.ThinLockCount() + 1;
if (LIKELY(new_count <= LockWord::kThinLockMaxCount)) {
//如果相等,并且小于轻量级锁的最大数量,则直接进入临界区
LockWord thin_locked(LockWord::FromThinLockId(thread_id,
new_count,
lock_word.GCState()));
if (!kUseReadBarrier) {
h_obj->SetLockWord(thin_locked, false /* volatile */);
AtraceMonitorLock(self, h_obj.Get(), false /* is_wait */);
return h_obj.Get(); // Success!
} else {
// Use CAS to preserve the read barrier state.
if (h_obj->CasLockWordWeakRelaxed(lock_word, thin_locked)) {
AtraceMonitorLock(self, h_obj.Get(), false /* is_wait */);
return h_obj.Get(); // Success!
}
}
continue; // Go again.
} else {
// 如果超出轻量级锁的数量,则进行重量级锁的加锁流程
InflateThinLocked(self, h_obj, lock_word, 0);
}
} else {
//lock_word的threadid和当前线程的threadid不等,升级成重量级锁并加锁
if (trylock) {
return nullptr;
}
// Contention.
contention_count++;
Runtime* runtime = Runtime::Current();
if (contention_count <= runtime->GetMaxSpinsBeforeThinLockInflation()) {
sched_yield();
} else {
contention_count = 0;
// No ordering required for initial lockword read. Install rereads it anyway.
InflateThinLocked(self, h_obj, lock_word, 0);
}
}
continue; // Start from the beginning.
}
case LockWord::kFatLocked: {
//重量级锁
QuasiAtomic::ThreadFenceAcquire();
//获取monitor
Monitor* mon = lock_word.FatLockMonitor()
//尝试获取锁
if (trylock) {
return mon->TryLock(self) ? h_obj.Get() : nullptr;
} else {
mon->Lock(self);
return h_obj.Get(); // Success!
}
}
default: {
LOG(FATAL) << "Invalid monitor state " << lock_word.GetState();
UNREACHABLE();
}
}
}
}
MonitorEnter函数中做的事情主要如下:
- 如果是无锁,通过CAS加轻量级锁,调用continue,进入步骤2。
- 如果是轻量级锁,判断LockWord的threadId和当前线程的ThreadId是否相等,如果相等,并且轻量级锁没有超过最大限制的情况下,return退出循环,进入临界区。如果不相等,则通过InflateThinLocked升级成重量级锁,升级的过程主要是创建和初始化Monitor的过程,调用continue,进入不走3。
- 如果是重量级锁,通过调用Monitor的lock函数获取锁
接着看lock获取锁的过程
/art/runtime/monitor.cc
void Monitor::Lock(Thread* self) {
MutexLock mu(self, monitor_lock_);
while (true) {
//尝试获取锁
if (TryLockLocked(self)) {
return;
}
// Contended.
const bool log_contention = (lock_profiling_threshold_ != 0);
uint64_t wait_start_ms = log_contention ? MilliTime() : 0;
ArtMethod* owners_method = locking_method_;
uint32_t owners_dex_pc = locking_dex_pc_;
// Do this before releasing the lock so that we don't get deflated.
size_t num_waiters = num_waiters_;
++num_waiters_;
monitor_lock_.Unlock(self); // Let go of locks in order.
self->SetMonitorEnterObject(GetObject());
{
ScopedThreadSuspension tsc(self, kBlocked); // Change to blocked and give up mutator_lock_.
uint32_t original_owner_thread_id = 0u;
{
// Reacquire monitor_lock_ without mutator_lock_ for Wait.
MutexLock mu2(self, monitor_lock_);
if (owner_ != nullptr) { // Did the owner_ give the lock up?
original_owner_thread_id = owner_->GetThreadId();
//调用Wait函数,将线程休眠
monitor_contenders_.Wait(self);
}
……
}
}
self->SetMonitorEnterObject(nullptr);
monitor_lock_.Lock(self); // Reacquire locks in order.
--num_waiters_;
}
}
Lock函数主要在自旋中通过TryLockLocked获取锁,获取不到的情况下调用Wait将线程休眠。接着看一下Wait是如何休眠线程的。
/art/runtime/monitor.cc
void Monitor::Wait(Thread* self, int64_t ms, int32_t ns,
bool interruptShouldThrow, ThreadState why) {
……
//将线程添加到等待队列
AppendToWaitSet(self);
bool was_interrupted = false;
{
self->SetWaitMonitor(this);
// Release the monitor lock.
monitor_contenders_.Signal(self);
monitor_lock_.Unlock(self);
// Handle the case where the thread was interrupted before we called wait().
if (self->IsInterruptedLocked()) {
was_interrupted = true;
} else {
if (why == kWaiting) {
// 调用线程的wait方法将线程休眠
self->GetWaitConditionVariable()->Wait(self);
} else {
DCHECK(why == kTimedWaiting || why == kSleeping) << why;
self->GetWaitConditionVariable()->TimedWait(self, ms, ns);
}
was_interrupted = self->IsInterruptedLocked();
}
}
……
}
Wait函数主要做了两件事情
- 将线程加入等待队列
- 调用线程的Wait方法,将线程休眠。
到这里我们已经清楚Art的Synchronize加锁的流程了,接下来再看一下释放锁的流程。
释放锁的流程
释放锁的逻辑处理函数MonitorExit也在Monitor这个对象中。
/art/runtime/monitor.cc
bool Monitor::MonitorExit(Thread* self, mirror::Object* obj) {
DCHECK(self != nullptr);
DCHECK(obj != nullptr);
self->AssertThreadSuspensionIsAllowable();
obj = FakeUnlock(obj);
StackHandleScope<1> hs(self);
Handle<mirror::Object> h_obj(hs.NewHandle(obj));
while (true) {
LockWord lock_word = obj->GetLockWord(true);
switch (lock_word.GetState()) {
case LockWord::kHashCode:
// Fall-through.
case LockWord::kUnlocked:
//无锁状态,不需要释放锁
FailedUnlock(h_obj.Get(), self->GetThreadId(), 0u, nullptr);
return false; // Failure.
case LockWord::kThinLocked: {
//轻量级锁
uint32_t thread_id = self->GetThreadId();
uint32_t owner_thread_id = lock_word.ThinLockOwner();
if (owner_thread_id != thread_id) {
FailedUnlock(h_obj.Get(), thread_id, owner_thread_id, nullptr);
return false; // Failure.
} else {
// We own the lock, decrease the recursion count.
LockWord new_lw = LockWord::Default();
if (lock_word.ThinLockCount() != 0) {
uint32_t new_count = lock_word.ThinLockCount() - 1;
new_lw = LockWord::FromThinLockId(thread_id, new_count, lock_word.GCState());
} else {
new_lw = LockWord::FromDefault(lock_word.GCState());
}
if (!kUseReadBarrier) {
DCHECK_EQ(new_lw.ReadBarrierState(), 0U);
// TODO: This really only needs memory_order_release, but we currently have
// no way to specify that. In fact there seem to be no legitimate uses of SetLockWord
// with a final argument of true. This slows down x86 and ARMv7, but probably not v8.
h_obj->SetLockWord(new_lw, true);
AtraceMonitorUnlock();
// Success!
return true;
} else {
// Use CAS to preserve the read barrier state.
if (h_obj->CasLockWordWeakRelease(lock_word, new_lw)) {
AtraceMonitorUnlock();
// Success!
return true;
}
}
continue; // Go again.
}
}
case LockWord::kFatLocked: {
//重量级锁
Monitor* mon = lock_word.FatLockMonitor();
return mon->Unlock(self);
}
default: {
LOG(FATAL) << "Invalid monitor state " << lock_word.GetState();
return false;
}
}
}
}
无锁和轻量级锁的释放流程都很简单,我们直接看重量级锁的释放过程Unlock函数的实现。
bool Monitor::Unlock(Thread* self) {
DCHECK(self != nullptr);
uint32_t owner_thread_id = 0u;
{
MutexLock mu(self, monitor_lock_);
Thread* owner = owner_;
if (owner != nullptr) {
owner_thread_id = owner->GetThreadId();
}
if (owner == self) {
// We own the monitor, so nobody else can be in here.
AtraceMonitorUnlock();
if (lock_count_ == 0) {
owner_ = nullptr;
locking_method_ = nullptr;
locking_dex_pc_ = 0;
// Wake a contender.
monitor_contenders_.Signal(self);
} else {
--lock_count_;
}
return true;
}
}
// We don't own this, so we're not allowed to unlock it.
// The JNI spec says that we should throw IllegalMonitorStateException in this case.
FailedUnlock(GetObject(), self->GetThreadId(), owner_thread_id, this);
return false;
}
unlock方法主要调用了Signal函数,来对等待队列中的线程进行缓存。Signal最终会走到管程Monitor的Notify方法中
void Monitor::Notify(Thread* self) {
DCHECK(self != NULL);
MutexLock mu(self, monitor_lock_);
// Make sure that we hold the lock.
if (owner_ != self) {
ThrowIllegalMonitorStateExceptionF("object not locked by thread before notify()");
return;
}
// Signal the first waiting thread in the wait set.
while (wait_set_ != NULL) {
Thread* thread = wait_set_;
wait_set_ = thread->GetWaitNext();
thread->SetWaitNext(nullptr);
// Check to see if the thread is still waiting.
MutexLock mu(self, *thread->GetWaitMutex());
if (thread->GetWaitMonitor() != nullptr) {
thread->GetWaitConditionVariable()->Signal(self);
return;
}
}
}
可以看到,这里Notify函数中会通过GetWaitNext获取当前线程的下一个在等待的线程,然后通过Signal进行唤醒,所以Art中的synchronize的唤醒线程是按照顺序唤醒的。