object.wait()和notify()
1,简介
wait和notify是object的方法,也就是说所有对象都有这两个方法,这两个方法可以用来阻塞当前线程(同时放弃互斥锁)或者是 唤醒其他调用wait方法陷入阻塞的线程(不能唤醒那些因为抢占锁而阻塞的队列)
代码样例
public class NotifyAndWaitT {
public static void main(String[] args) throws InterruptedException {
Thread waitThread = new Thread(() -> {
synchronized (NotifyAndWaitT.class) {
System.out.println("wait get the lock");
try {
NotifyAndWaitT.class.wait();
System.out.println("wait wake up");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread notifyThread = new Thread(() -> {
try {
Thread.currentThread().sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (NotifyAndWaitT.class) {
System.out.println("notify get the lock");
try {
NotifyAndWaitT.class.notify();
Thread.currentThread().sleep(1000);
System.out.println("notify end---------");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
waitThread.start();
notifyThread.start();
waitThread.join();
notifyThread.join();
System.out.println("main end*******************");
}
}
2. wait 运行过程
能够执行wait和notify的前提是代码已经进入了synchronized包含的代码块中。
当然synchronized现在通过优化,增加了偏向锁,轻量级锁,但是在执行obj.wait()的时候都会膨胀成重量级锁ObjectMonitor (关于每个锁的关联数据可以看看),然后做一系列的操作。
具体看一下执行过程:
参考 object.wait的源码解析.md 可以看到代码执行的逻辑,下面会具体分析一下
首先是每个了解当前线程持有的锁(objectMonitor)拥有的数据结构, objectMonitor在jdk8中的数据结构.md
在这里面可以看到在objectMonitor中有一个地方存储的是 _WaitSet ,这个存放的就是因为obj.wait()进入阻塞的线程。
3. 具体从代码看 wait() 操作
看objectMonitor.cpp文件中的 ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS)
// will need to be replicated in complete_exit above
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * const Self = THREAD ;
assert(Self->is_Java_thread(), "Must be Java thread!");
JavaThread *jt = (JavaThread *)THREAD;
DeferredInitialize () ;
// Throw IMSX or IEX.
CHECK_OWNER();
// check for a pending interrupt 检查是否有中断信号,有的话就抛出异常
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
// post monitor waited event. Note that this is past-tense, we are done waiting.
if (JvmtiExport::should_post_monitor_waited()) {
// Note: 'false' parameter is passed here because the
// wait was not timed out due to thread interrupt.
JvmtiExport::post_monitor_waited(jt, this, false);
}
TEVENT (Wait - Throw IEX) ;
THROW(vmSymbols::java_lang_InterruptedException());
return ;
}
TEVENT (Wait) ;
assert (Self->_Stalled == 0, "invariant") ;
Self->_Stalled = intptr_t(this) ;
jt->set_current_waiting_monitor(this);
// create a node to be put into the queue
// Critically, after we reset() the event but prior to park(), we must check
// for a pending interrupt.
//把当前线程包装成一个 objectwaiter对象,
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag
// Enter the waiting queue, which is a circular doubly linked list in this case
// but it could be a priority queue or any data structure.
// _WaitSetLock protects the wait queue. Normally the wait queue is accessed only
// by the the owner of the monitor *except* in the case where park()
// returns because of a timeout of interrupt. Contention is exceptionally rare
// so we use a simple spin-lock instead of a heavier-weight blocking lock.
//加锁,把objectwaiter 对象添加到等待的set中
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
intptr_t save = _recursions; // record the old recursion count
_waiters++; // increment the number of waiters
_recursions = 0; // set the recursion level to be 1
exit (Self) ; // exit the monitor
guarantee (_owner != Self, "invariant") ;
// As soon as the ObjectMonitor's ownership is dropped in the exit()
// call above, another thread can enter() the ObjectMonitor, do the
// notify(), and exit() the ObjectMonitor. If the other thread's
// exit() call chooses this thread as the successor and the unpark()
// call happens to occur while this thread is posting a
// MONITOR_CONTENDED_EXIT event, then we run the risk of the event
// handler using RawMonitors and consuming the unpark().
//
// To avoid the problem, we re-post the event. This does no harm
// even if the original unpark() was not consumed because we are the
// chosen successor for this monitor.
//在这里防止的情况是当前线程刚刚执行完monitor的释放,
//有一个线程进来了,执行完后退出了,又唤醒了当前线程,
//把当前线程从等待队列中给拿出来了,
//但是当前线程实际上还是没有执行park操作
//这样的话再执行下面的park操作的话,
//阻塞了也没有机会唤醒了,不在那个等待条件的队列里面了
// 那么就对当前线程再执行一次unpark,在下面执行park的时候就直接略过了,不会出现问题
if (node._notified != 0 && _succ == Self) {
node._event->unpark();
}
// The thread is on the WaitSet list - now park() it.
// On MP systems it's conceivable that a brief spin before we park
// could be profitable.
//
// TODO-FIXME: change the following logic to a loop of the form
// while (!timeout && !interrupted && _notified == 0) park()
int ret = OS_OK ;
int WasNotified = 0 ;
{ // State transition wrappers
OSThread* osthread = Self->osthread();
OSThreadWaitState osts(osthread, true);
{
ThreadBlockInVM tbivm(jt);
// Thread is in thread_blocked state and oop access is unsafe.
jt->set_suspend_equivalent();
//执行park操作,把当前线程挂起
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty
} else
if (node._notified == 0) {
if (millis <= 0) {
Self->_ParkEvent->park () ;
} else {
ret = Self->_ParkEvent->park (millis) ;
}
}
// were we externally suspended while we were waiting?
if (ExitSuspendEquivalent (jt)) {
// TODO-FIXME: add -- if succ == Self then succ = null.
jt->java_suspend_self();
}
} // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm
// Node may be on the WaitSet, the EntryList (or cxq), or in transition
// from the WaitSet to the EntryList.
// See if we need to remove Node from the WaitSet.
// We use double-checked locking to avoid grabbing _WaitSetLock
// if the thread is not on the wait queue.
//
// Note that we don't need a fence before the fetch of TState.
// In the worst case we'll fetch a old-stale value of TS_WAIT previously
// written by the is thread. (perhaps the fetch might even be satisfied
// by a look-aside into the processor's own store buffer, although given
// the length of the code path between the prior ST and this load that's
// highly unlikely). If the following LD fetches a stale TS_WAIT value
// then we'll acquire the lock and then re-fetch a fresh TState value.
// That is, we fail toward safety.
if (node.TState == ObjectWaiter::TS_WAIT) {
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
if (node.TState == ObjectWaiter::TS_WAIT) {
DequeueSpecificWaiter (&node) ; // unlink from WaitSet
assert(node._notified == 0, "invariant");
node.TState = ObjectWaiter::TS_RUN ;
}
Thread::SpinRelease (&_WaitSetLock) ;
}
// The thread is now either on off-list (TS_RUN),
// on the EntryList (TS_ENTER), or on the cxq (TS_CXQ).
// The Node's TState variable is stable from the perspective of this thread.
// No other threads will asynchronously modify TState.
guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
OrderAccess::loadload() ;
if (_succ == Self) _succ = NULL ;
WasNotified = node._notified ;
// post monitor waited event. Note that this is past-tense, we are done waiting.
if (JvmtiExport::should_post_monitor_waited()) {
JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
}
OrderAccess::fence() ;
assert (Self->_Stalled != 0, "invariant") ;
Self->_Stalled = 0 ;
assert (_owner != Self, "invariant") ;
ObjectWaiter::TStates v = node.TState ;
if (v == ObjectWaiter::TS_RUN) {
enter (Self) ;
} else {
guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
ReenterI (Self, &node) ;
node.wait_reenter_end(this);
}
// want residual elements associated with this thread left on any lists.
guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
assert (_owner == Self, "invariant") ;
assert (_succ != Self , "invariant") ;
} // OSThreadWaitState()
jt->set_current_waiting_monitor(NULL);
guarantee (_recursions == 0, "invariant") ;
_recursions = save; // restore the old recursion count
_waiters--; // decrement the number of waiters
// Verify a few postconditions
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
if (SyncFlags & 32) {
OrderAccess::fence() ;
}
// check if the notification happened
//如果线程不是通过notify唤醒的,那就只能是通过 interrupt唤醒的,就抛出异常。
//这也是object.wait()方法可以抛出异常的原因
if (!WasNotified) {
// no, it could be timeout or Thread.interrupt() or both
// check for interrupt event, otherwise it is timeout
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
TEVENT (Wait - throw IEX from epilog) ;
THROW(vmSymbols::java_lang_InterruptedException());
}
}
}
1.检查是否有中断,如果有中断的话就抛出中断(返回)
2.把当前线程包装成一个 objectWaiter 对象
3.通过自旋锁把 objectWaiter 添加到 ObjectMonitor 的 _WaitSet 当中
4.释放 objectMonitor 对象
5.如果当前线程处于被 notifyed 状态(针对这种情况出现的可能是下面的解释),对当前线程执行一次 unpark()操作
//在这里防止的情况是当前线程刚刚执行完monitor的释放,
//有一个线程进来了,执行完后退出了,又唤醒了当前线程,
//把当前线程从等待队列中给拿出来了,
//但是当前线程实际上还是没有执行park操作
//这样的话再执行下面的park操作的话,
//阻塞了也没有机会唤醒了,不在那个等待条件的队列里面了
// 那么就对当前线程再执行一次unpark,在下面 第 6 步 执行 park的时候就直接略过了,不会阻塞,也就不会出现问题
6.执行park操作把当前线程阻塞,调用的是 操作系统的wait 操作的 api
6.1. 如果出现5的情况则当前线程并不会阻塞,而是直接走过去
7.如果线程醒来:有两种情况
7.1. 其他线程执行了notify()操作,对应底层有unpark()操作,则该线程恢复运行
7.2. 其他线程执行了interrupt()也会对当前线程有unpark()操作,该线程也会恢复运行
8.醒来好检查interrupt 标志位是否标示有异常,如果有,则会抛出 interruptedException 异常。
最后一条是线程在执行 obj.wait()有抛出interruptException异常的原因。
4,notify的原理
notify()操作将处于waitSet中的一个节点移动到
EntryList当中,也就是移动到对锁进行竞争而产生阻塞的队列当中。再等待其他锁的唤醒就ok了。
当然notify有一些唤醒策略。