object.wait()和notify()

2017-11-28  本文已影响0人  夜月行者

1,简介

wait和notify是object的方法,也就是说所有对象都有这两个方法,这两个方法可以用来阻塞当前线程(同时放弃互斥锁)或者是 唤醒其他调用wait方法陷入阻塞的线程(不能唤醒那些因为抢占锁而阻塞的队列)

代码样例


public class NotifyAndWaitT {

   public static void main(String[] args) throws InterruptedException {


       Thread waitThread = new Thread(() -> {

           synchronized (NotifyAndWaitT.class) {
               System.out.println("wait get the lock");
               try {
                   NotifyAndWaitT.class.wait();
                   System.out.println("wait wake up");
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       });

       Thread notifyThread = new Thread(() -> {
           try {
               Thread.currentThread().sleep(100);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }

           synchronized (NotifyAndWaitT.class) {
               System.out.println("notify get the lock");
               try {
                   NotifyAndWaitT.class.notify();
                   Thread.currentThread().sleep(1000);
                   System.out.println("notify end---------");
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       });

       waitThread.start();
       notifyThread.start();
       waitThread.join();
       notifyThread.join();
       System.out.println("main end*******************");
   }


}

2. wait 运行过程

能够执行wait和notify的前提是代码已经进入了synchronized包含的代码块中。

当然synchronized现在通过优化,增加了偏向锁,轻量级锁,但是在执行obj.wait()的时候都会膨胀成重量级锁ObjectMonitor (关于每个锁的关联数据可以看看),然后做一系列的操作。

具体看一下执行过程:

参考 object.wait的源码解析.md 可以看到代码执行的逻辑,下面会具体分析一下

首先是每个了解当前线程持有的锁(objectMonitor)拥有的数据结构, objectMonitor在jdk8中的数据结构.md

在这里面可以看到在objectMonitor中有一个地方存储的是 _WaitSet ,这个存放的就是因为obj.wait()进入阻塞的线程。

3. 具体从代码看 wait() 操作

看objectMonitor.cpp文件中的 ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS)


// will need to be replicated in complete_exit above    

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {    

  Thread * const Self = THREAD ;    

  assert(Self->is_Java_thread(), "Must be Java thread!");    

  JavaThread *jt = (JavaThread *)THREAD;    



  DeferredInitialize () ;    



  // Throw IMSX or IEX.    

  CHECK_OWNER();    



  // check for a pending interrupt    检查是否有中断信号,有的话就抛出异常

  if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {    

    // post monitor waited event.  Note that this is past-tense, we are done waiting.    

    if (JvmtiExport::should_post_monitor_waited()) {    

       // Note: 'false' parameter is passed here because the    

       // wait was not timed out due to thread interrupt.    

       JvmtiExport::post_monitor_waited(jt, this, false);    

    }    

    TEVENT (Wait - Throw IEX) ;    

    THROW(vmSymbols::java_lang_InterruptedException());    

    return ;    

  }    

  TEVENT (Wait) ;    



  assert (Self->_Stalled == 0, "invariant") ;    

  Self->_Stalled = intptr_t(this) ;    

  jt->set_current_waiting_monitor(this);    



  // create a node to be put into the queue    

  // Critically, after we reset() the event but prior to park(), we must check    

  // for a pending interrupt.    

//把当前线程包装成一个 objectwaiter对象,

  ObjectWaiter node(Self);    

  node.TState = ObjectWaiter::TS_WAIT ;    

  Self->_ParkEvent->reset() ;    

  OrderAccess::fence();          // ST into Event; membar ; LD interrupted-flag    



  // Enter the waiting queue, which is a circular doubly linked list in this case    

  // but it could be a priority queue or any data structure.    

  // _WaitSetLock protects the wait queue.  Normally the wait queue is accessed only    

  // by the the owner of the monitor *except* in the case where park()    

  // returns because of a timeout of interrupt.  Contention is exceptionally rare    

  // so we use a simple spin-lock instead of a heavier-weight blocking lock.    

//加锁,把objectwaiter 对象添加到等待的set中



  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;    

  AddWaiter (&node) ;    

  Thread::SpinRelease (&_WaitSetLock) ;    



  if ((SyncFlags & 4) == 0) {    

     _Responsible = NULL ;    

  }    

  intptr_t save = _recursions; // record the old recursion count    

  _waiters++;                  // increment the number of waiters    

  _recursions = 0;             // set the recursion level to be 1    

  exit (Self) ;                    // exit the monitor    

  guarantee (_owner != Self, "invariant") ;    



  // As soon as the ObjectMonitor's ownership is dropped in the exit()    

  // call above, another thread can enter() the ObjectMonitor, do the    

  // notify(), and exit() the ObjectMonitor. If the other thread's    

  // exit() call chooses this thread as the successor and the unpark()    

  // call happens to occur while this thread is posting a    

  // MONITOR_CONTENDED_EXIT event, then we run the risk of the event    

  // handler using RawMonitors and consuming the unpark().    

  //    

  // To avoid the problem, we re-post the event. This does no harm    

  // even if the original unpark() was not consumed because we are the    

  // chosen successor for this monitor.    

//在这里防止的情况是当前线程刚刚执行完monitor的释放,

//有一个线程进来了,执行完后退出了,又唤醒了当前线程,

//把当前线程从等待队列中给拿出来了,

//但是当前线程实际上还是没有执行park操作

//这样的话再执行下面的park操作的话,

//阻塞了也没有机会唤醒了,不在那个等待条件的队列里面了

// 那么就对当前线程再执行一次unpark,在下面执行park的时候就直接略过了,不会出现问题

  if (node._notified != 0 && _succ == Self) {    

     node._event->unpark();    

  }    



  // The thread is on the WaitSet list - now park() it.    

  // On MP systems it's conceivable that a brief spin before we park    

  // could be profitable.    

  //    

  // TODO-FIXME: change the following logic to a loop of the form    

  //   while (!timeout && !interrupted && _notified == 0) park()    



  int ret = OS_OK ;    

  int WasNotified = 0 ;    

  { // State transition wrappers    

    OSThread* osthread = Self->osthread();    

    OSThreadWaitState osts(osthread, true);    

    {    

      ThreadBlockInVM tbivm(jt);    

      // Thread is in thread_blocked state and oop access is unsafe.    

      jt->set_suspend_equivalent();    

//执行park操作,把当前线程挂起



      if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {    

          // Intentionally empty    

      } else    

      if (node._notified == 0) {    

        if (millis <= 0) {    

           Self->_ParkEvent->park () ;    

        } else {    

           ret = Self->_ParkEvent->park (millis) ;    

        }    

      }    



      // were we externally suspended while we were waiting?    

      if (ExitSuspendEquivalent (jt)) {    

         // TODO-FIXME: add -- if succ == Self then succ = null.    

         jt->java_suspend_self();    

      }    



    } // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm    





    // Node may be on the WaitSet, the EntryList (or cxq), or in transition    

    // from the WaitSet to the EntryList.    

    // See if we need to remove Node from the WaitSet.    

    // We use double-checked locking to avoid grabbing _WaitSetLock    

    // if the thread is not on the wait queue.    

    //    

    // Note that we don't need a fence before the fetch of TState.    

    // In the worst case we'll fetch a old-stale value of TS_WAIT previously    

    // written by the is thread. (perhaps the fetch might even be satisfied    

    // by a look-aside into the processor's own store buffer, although given    

    // the length of the code path between the prior ST and this load that's    

    // highly unlikely).  If the following LD fetches a stale TS_WAIT value    

    // then we'll acquire the lock and then re-fetch a fresh TState value.    

    // That is, we fail toward safety.    



    if (node.TState == ObjectWaiter::TS_WAIT) {    

        Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;    

        if (node.TState == ObjectWaiter::TS_WAIT) {    

           DequeueSpecificWaiter (&node) ;       // unlink from WaitSet    

           assert(node._notified == 0, "invariant");    

           node.TState = ObjectWaiter::TS_RUN ;    

        }    

        Thread::SpinRelease (&_WaitSetLock) ;    

    }    



    // The thread is now either on off-list (TS_RUN),    

    // on the EntryList (TS_ENTER), or on the cxq (TS_CXQ).    

    // The Node's TState variable is stable from the perspective of this thread.    

    // No other threads will asynchronously modify TState.    

    guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;    

    OrderAccess::loadload() ;    

    if (_succ == Self) _succ = NULL ;    

    WasNotified = node._notified ;    



 



    // post monitor waited event. Note that this is past-tense, we are done waiting.    

    if (JvmtiExport::should_post_monitor_waited()) {    

      JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);    

    }    

    OrderAccess::fence() ;    



    assert (Self->_Stalled != 0, "invariant") ;    

    Self->_Stalled = 0 ;    



    assert (_owner != Self, "invariant") ;    

    ObjectWaiter::TStates v = node.TState ;    

    if (v == ObjectWaiter::TS_RUN) {    

        enter (Self) ;    

    } else {    

        guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;    

        ReenterI (Self, &node) ;    

        node.wait_reenter_end(this);    

    }    

    // want residual elements associated with this thread left on any lists.    

    guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;    

    assert    (_owner == Self, "invariant") ;    

    assert    (_succ != Self , "invariant") ;    

  } // OSThreadWaitState()    



  jt->set_current_waiting_monitor(NULL);    



  guarantee (_recursions == 0, "invariant") ;    

  _recursions = save;     // restore the old recursion count    

  _waiters--;             // decrement the number of waiters    



  // Verify a few postconditions    

  assert (_owner == Self       , "invariant") ;    

  assert (_succ  != Self       , "invariant") ;    

  assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;    



  if (SyncFlags & 32) {    

     OrderAccess::fence() ;    

  }    



  // check if the notification happened    

//如果线程不是通过notify唤醒的,那就只能是通过 interrupt唤醒的,就抛出异常。

//这也是object.wait()方法可以抛出异常的原因

  if (!WasNotified) {    

    // no, it could be timeout or Thread.interrupt() or both    

    // check for interrupt event, otherwise it is timeout    

    if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {    

      TEVENT (Wait - throw IEX from epilog) ;    

      THROW(vmSymbols::java_lang_InterruptedException());    

    }    

  }    

}    









1.检查是否有中断,如果有中断的话就抛出中断(返回)

2.把当前线程包装成一个 objectWaiter 对象

3.通过自旋锁把 objectWaiter 添加到 ObjectMonitor 的 _WaitSet 当中

4.释放 objectMonitor 对象

5.如果当前线程处于被 notifyed 状态(针对这种情况出现的可能是下面的解释),对当前线程执行一次 unpark()操作


//在这里防止的情况是当前线程刚刚执行完monitor的释放,

//有一个线程进来了,执行完后退出了,又唤醒了当前线程,

//把当前线程从等待队列中给拿出来了,

//但是当前线程实际上还是没有执行park操作

//这样的话再执行下面的park操作的话,

//阻塞了也没有机会唤醒了,不在那个等待条件的队列里面了

// 那么就对当前线程再执行一次unpark,在下面 第 6 步  执行 park的时候就直接略过了,不会阻塞,也就不会出现问题

6.执行park操作把当前线程阻塞,调用的是 操作系统的wait 操作的 api

  6.1. 如果出现5的情况则当前线程并不会阻塞,而是直接走过去

7.如果线程醒来:有两种情况

  7.1. 其他线程执行了notify()操作,对应底层有unpark()操作,则该线程恢复运行

  7.2. 其他线程执行了interrupt()也会对当前线程有unpark()操作,该线程也会恢复运行

8.醒来好检查interrupt 标志位是否标示有异常,如果有,则会抛出 interruptedException 异常。

最后一条是线程在执行 obj.wait()有抛出interruptException异常的原因。

4,notify的原理

notify()操作将处于waitSet中的一个节点移动到

EntryList当中,也就是移动到对锁进行竞争而产生阻塞的队列当中。再等待其他锁的唤醒就ok了。

当然notify有一些唤醒策略。

上一篇下一篇

猜你喜欢

热点阅读