并发系列之Thread源码解读

2019-07-30  本文已影响0人  阿伦故事2019

北京大学(中国)校训:“自由平等,民主科学。”


话说,所有不谈源码的开发讲解都是瞎扯淡。今天,阴雨绵绵,心中暑气全无。要不一起看看Thread类的源码,jdk的源码倒是很简单的,这个类中大量重要方法是native方法,在jvm中实现的,那可不是看Java代码了,有c基础的,那就没多大问题了,只要能够看到大概意思,我觉得对开发者来说足够用了,不至于有哪家公司招你来把jvm中的代码改下,哈哈。。。


今天需要查看Thread中的jvm源码,jdk中的方法与jvm源码方法是具有对应关系的,通俗理解为注册表,具体可在openjdk\jdk\src\share\native\java\lang\Thread.c,现摘出部分映射关系:

static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
    {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};

一 线程创建与初始化

下面的代码片段已经加了详细的注释,在此就不说了,直接上干货吧

//名称
    private volatile String name;
    //优先级
    private int priority;
    //守护线程标识
    private boolean daemon = false;
    //线程执行的目标对象
    private Runnable target;
    //线程组
    private ThreadGroup group;
    //当前线程的指定栈大小,默认值为0,设置似乎意义不大,具体栈分配由jvm决定
    private long stackSize;
    //线程序列号,为0
    private static long threadSeqNumber;
    //线程id:由threadSeqNumber++生成
    private long tid;
    //标识线程状态,默认是线程未启动
    /**
     * 线程状态有如下几种:NEW RUNNABLE WAITING TIMED_WAITING TERMINATED
     * NEW时对应的threadStatus为0;
     * */
    private int threadStatus = 0;
    //存储当前线程的局部变量
    ThreadLocal.ThreadLocalMap threadLocals = null;
    /**
     * 在创建子线程时,子线程会接收所有可继承的线程局部变量的初始值,以获得父线程所具有的值
     * 为子线程提供从父线程那里继承的值
     * */
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    //为LockSupport提供的变量,具体可查看LockSupport源码
    volatile Object parkBlocker;
    //阻塞器锁,主要用于处理阻塞情况
    private volatile Interruptible blocker;
    //阻断锁
    private Object blockerLock = new Object();
    //最低优先级
    public final static int MIN_PRIORITY = 1;
    //默认优先级
    public final static int NORM_PRIORITY = 5;
    //最高优先级
    public final static int MAX_PRIORITY = 10;

    /**
     * Thread有多种构造器,这里只列出最全的构造方法
     * 所有构造器均调用init方法
     * */
    public Thread(ThreadGroup group, Runnable target, String name, long stackSize) {
        init(group, target, name, stackSize, null, true);
    }

    /**
     * 底层init方法,用于初始化thread对象
     * 参数已在上述属性中做了说明
     * */
    private void init(ThreadGroup g, Runnable target, String name, long stackSize,
                      AccessControlContext acc, boolean inheritThreadLocals) {
        if (name == null) {
            throw new NullPointerException("name cannot be null");
        }
        this.name = name;
        /**
         * 获取当前线程,即创建线程的线程
         * 这里体现了父子线程的关系
         * */
        Thread parent = currentThread();
        //获得系统的安全管理器
        SecurityManager security = System.getSecurityManager();
        if (g == null) {
            if (security != null) {
                g = security.getThreadGroup();
            }
            //设置线程组,如果子线程未指定,则取父线程的
            if (g == null) {
                g = parent.getThreadGroup();
            }
        }
        g.checkAccess();
        //线程组未启动线程个数++
        g.addUnstarted();
        //线程组
        this.group = g;
        //守护线程继承性,子线程的是否守护取决于父线程
        this.daemon = parent.isDaemon();
        //优先级继承性,子线程的优先级取决于父线程
        this.priority = parent.getPriority();
        //Runnable对象
        this.target = target;
        //设置优先级
        setPriority(priority);
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            //为子线程提供从父线程那里继承的值
            this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        this.stackSize = stackSize;
        //生成线程ID
        tid = nextThreadID();
    }
    //tid生成器
    private static synchronized long nextThreadID() {
        return ++threadSeqNumber;
    }
    public final ThreadGroup getThreadGroup() {
        return this.group;
    }
    public final boolean isDaemon() {
        return this.daemon;
    }
    public final int getPriority() {
        return this.priority;
    }
    public final void setPriority(int priority) {
        if (priority <= 10 && priority >= 1) {
            ThreadGroup var2;
            if ((var2 = this.getThreadGroup()) != null) {
                //这里需要注意:线程的优先级上限取决于所属线程组的优先级
                if (priority > var2.getMaxPriority()) {
                    priority = var2.getMaxPriority();
                }
                this.setPriority0(this.priority = priority);
            }
        } else {
            throw new IllegalArgumentException();
        }
    }
    /**
     * 线程执行的具体任务
     */
    public void run() {
        if (target != null) {
            target.run();
        }
    }
    /**
     * 线程真正退出前执行清理
     */
    private void exit() {
        if (group != null) {
            group = null;
        }
        target = null;
        threadLocals = null;
        inheritableThreadLocals = null;
        blocker = null;
    }
    //获取当前线程的native方法
    public static native Thread currentThread();
    //设置线程优先级的native方法
    private native void setPriority0(int var1);

在上述有两个native方法,即currentThread()和setPriority0(),其中currentThread是获取父线程,setPriority0是设置线程优先级,均在jvm.cpp中,点击查看;现摘出具体片段:
currentThread方法底层调用jvm.cpp中的JVM_CurrentThread函数:

JVM_ENTRY(jobject, JVM_CurrentThread(JNIEnv* env, jclass threadClass))
  JVMWrapper("JVM_CurrentThread");
  oop jthread = thread->threadObj();
  assert (thread != NULL, "no current thread!");
  return JNIHandles::make_local(env, jthread);
JVM_END

setPriority0方法底层调用jvm.cpp中的JVM_CurrentThread函数:

JVM_ENTRY(void, JVM_SetThreadPriority(JNIEnv* env, jobject jthread, jint prio))
  JVMWrapper("JVM_SetThreadPriority");
  // 确保C++线程和OS线程在操作之前不释放
  MutexLocker ml(Threads_lock);
  oop java_thread = JNIHandles::resolve_non_null(jthread);
  java_lang_Thread::set_priority(java_thread, (ThreadPriority)prio);
  JavaThread* thr = java_lang_Thread::thread(java_thread);
  if (thr != NULL) {                  
    // 线程尚未启动,当设置优先级才会启动
    Thread::set_priority(thr, (ThreadPriority)prio);
  }
JVM_END

二 线程启动start

start代码片段如下:

/**
     * 调用start()方法启动线程,执行线程的run方法
     */
    public synchronized void start() {
        /**
         * 线程状态校验,线程必须是0即新建态才能启动
         * 这也是为何一个线程连续两次调start会报错
         */
        if (threadStatus != 0) throw new IllegalThreadStateException();
        //通知线程组当前线程即将执行,同时线程组中未启动线程数-1
        group.add(this);
        boolean started = false;
        try {
            //使线程进入可执行(runnable)状态
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    //启动失败后,修改线程组未启动线程数+1
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) { }
        }
    }
    /**
     * 设置线程启动的native方法
     * 底层会新启动一个线程,新线程才会调用传递过来的Runnable对象run方法
     * */
    private native void start0();

start0方法底层调用jvm.cpp中的JVM_StartThread函数:

JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_StartThread");
  JavaThread *native_thread = NULL;

  //由于排序问题,引发异常时无法持有线程锁。示例:我们可能需要在构造异常时获取堆锁。
  bool throw_illegal_thread_state = false;

  //在线程start中发布jvmti事件之前,必须释放线程锁。
  {
    //确保C++线程和OS线程在操作之前没有被释放。
    MutexLocker mu(Threads_lock);

    //自JDK5以来,线程的threadstatus用于防止重新启动已启动的线程,所以通常会发现javathread是空的。
    //但是对于JNI附加的线程,在创建的线程对象(及其JavaThread集合)和对其ThreadStates的更新之间有一个小窗口,
    //因此我们必须检查这个窗口
    if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
      throw_illegal_thread_state = true;
    } else {
      //我们还可以检查stillborn标志,看看这个线程是否已经停止,但是出于历史原因,我们让线程在它开始运行时检测它自己
      jlong size = java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
      //分配C++线程结构并创建本地线程,从Java中传递过来的stack size已经被声明,
      //但是构造函数采用size_t(无符号类型),因此避免传递负值
      size_t sz = size > 0 ? (size_t) size : 0;
      native_thread = new JavaThread(&thread_entry, sz);
      //此时可能由于内存不足而没有为javathread创建Osthread。
      //检查这种情况并抛出异常。最后,我们可能希望更改此项,以便仅在成功创建线程时获取锁,
      //然后我们还可以执行此检查并在JavaThread构造函数中抛出异常。
      if (native_thread->osthread() != NULL) {
        //注意:当前线程未在“准备”阶段使用
        native_thread->prepare(jthread);
      }
    }
  }
  if (throw_illegal_thread_state) {
    THROW(vmSymbols::java_lang_IllegalThreadStateException());
  }
  assert(native_thread != NULL, "Starting null thread?");
  if (native_thread->osthread() == NULL) {
    // No one should hold a reference to the 'native_thread'.
    delete native_thread;
    if (JvmtiExport::should_post_resource_exhausted()) {
      JvmtiExport::post_resource_exhausted(
        JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
        "unable to create new native thread");
    }
    THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
              "unable to create new native thread");
  }
  Thread::start(native_thread);

JVM_END

三 线程中断判断

/**
     * 判断线程是否已经中断,同时清除中断标识
     * static方法,
     */
    public static boolean interrupted() {
        return currentThread().isInterrupted(true);
    }
    /**
     * 判断线程是否已经中断,不清除中断标识
     * this代表当前调用此方法的线程对象
     */
    public boolean isInterrupted() {
        return this.isInterrupted(false);
    }
    /**
     * native方法判断线程是否中断
     */
    private native boolean isInterrupted(boolean ClearInterrupted);

isInterrupted方法底层调用jvm.cpp中的JVM_IsInterrupted函数:

JVM_QUICK_ENTRY(jboolean, JVM_IsInterrupted(JNIEnv* env, jobject jthread, jboolean clear_interrupted))
  JVMWrapper("JVM_IsInterrupted");

  //确保C++线程和OS线程在操作之前没有被释放
  oop java_thread = JNIHandles::resolve_non_null(jthread);
  MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
  //我们需要重新解析java_thread,因为在获取锁的过程中可能会发生GC
  JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
  if (thr == NULL) {
    return JNI_FALSE;
  } else {
    return (jboolean) Thread::is_interrupted(thr, clear_interrupted != 0);
  }
JVM_END

四 线程join

/**
     * 等待调用join的线程执行结束
     */
    public final synchronized void join(long var1) throws InterruptedException {
        long var3 = System.currentTimeMillis();
        long var5 = 0L;
        if (var1 < 0L) {
            throw new IllegalArgumentException("timeout value is negative");
        } else {
            //如果join时不设置超时,则会调用Object.wait的无超时等待
            if (var1 == 0L) {
                while(this.isAlive()) {
                    this.wait(0L);
                }
            } else {
                //join设置超时,则会调用Object.wait的超时等待
                while(this.isAlive()) {
                    long var7 = var1 - var5;
                    if (var7 <= 0L) {
                        break;
                    }    
                    this.wait(var7);
                    var5 = System.currentTimeMillis() - var3;
                }
            }

        }
    }
    /**
     * native方法判断线程存活
     */
    public final native boolean isAlive();

Object.wait在下面讲述,isAlive方法底层调用jvm.cpp中的JVM_IsThreadAlive函数:

JVM_ENTRY(jboolean, JVM_IsThreadAlive(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_IsThreadAlive");

  oop thread_oop = JNIHandles::resolve_non_null(jthread);
  return java_lang_Thread::is_alive(thread_oop);
JVM_END

五 线程sleep

/**
     * 线程休眠
     * @param var0 毫秒
     * @param var2 纳秒
     */
    public static void sleep(long var0, int var2) throws InterruptedException {
        if (var0 < 0L) {
            throw new IllegalArgumentException("timeout value is negative");
        } else if (var2 >= 0 && var2 <= 999999) {
            //纳秒四舍五入
            if (var2 >= 500000 || var2 != 0 && var0 == 0L) {
                ++var0;
            }
            sleep(var0);
        } else {
            throw new IllegalArgumentException("nanosecond timeout value out of range");
        }
    }
    /**
     * native方法线程休眠
     */
    public static native void sleep(long var0) throws InterruptedException;

sleep方法底层调用jvm.cpp中的JVM_Sleep函数:

JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
  JVMWrapper("JVM_Sleep");

  if (millis < 0) {
    THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
  }
  //线程中断则抛出异常
  if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
    THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
  }
  //保存当前线程状态并在末尾还原它,并将新线程状态设置为SLEEPING
  JavaThreadSleepState jtss(thread);

#ifndef USDT2
  HS_DTRACE_PROBE1(hotspot, thread__sleep__begin, millis);
#else /* USDT2 */
  HOTSPOT_THREAD_SLEEP_BEGIN(
                             millis);
#endif /* USDT2 */

  EventThreadSleep event;

  if (millis == 0) {
    //当convertsleeptoyield为on时,这与JVM_Sleep的经典VM实现相匹配。
    //对于类似的线程行为(win32)至关重要,即在某些GUI上下文中,对Solaris进行短时间睡眠是有益的。
    if (ConvertSleepToYield) {
      os::yield();
    } else {
      ThreadState old_state = thread->osthread()->get_state();
      thread->osthread()->set_state(SLEEPING);
      os::sleep(thread, MinSleepInterval, false);
      thread->osthread()->set_state(old_state);
    }
  } else {
    ThreadState old_state = thread->osthread()->get_state();
    thread->osthread()->set_state(SLEEPING);
    if (os::sleep(thread, millis, true) == OS_INTRPT) {
      //当休眠时,一个异步异常(例如,threaddeathexception)可能抛出了,但不需要覆盖它们。
      if (!HAS_PENDING_EXCEPTION) {
        if (event.should_commit()) {
          event.set_time(millis);
          event.commit();
        }
#ifndef USDT2
        HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
#else /* USDT2 */
        HOTSPOT_THREAD_SLEEP_END(
                                 1);
#endif /* USDT2 */
       
        //THROW_MSG方法返回,意味着不能以正确地还原线程状态,因为那很可能是错的。
        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
      }
    }
    thread->osthread()->set_state(old_state);
  }
  if (event.should_commit()) {
    event.set_time(millis);
    event.commit();
  }
#ifndef USDT2
  HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
#else /* USDT2 */
  HOTSPOT_THREAD_SLEEP_END(
                           0);
#endif /* USDT2 */
JVM_END

六 线程yield

/**
     * native方法线程让度CPU执行权
     */
    public static native void yield();

yield方法底层调用jvm.cpp中的JVM_Yield函数:

JVM_ENTRY(void, JVM_Yield(JNIEnv *env, jclass threadClass))
  JVMWrapper("JVM_Yield");
  if (os::dont_yield()) return;
#ifndef USDT2
  HS_DTRACE_PROBE0(hotspot, thread__yield);
#else /* USDT2 */
  HOTSPOT_THREAD_YIELD();
#endif /* USDT2 */
  //当ConvertYieldToSleep为off(默认)时,这与传统的VM使用yield相匹配,对于类似的线程行为至关重要
  if (ConvertYieldToSleep) {//on
    //系统调用sleep
    os::sleep(thread, MinSleepInterval, false);
  } else {//off
    //系统调用yield
    os::yield();
  }
JVM_END

七 线程中断interrupt

/**
     * 线程中断
     */
    public void interrupt() {
        Object var1 = this.blockerLock;
        synchronized(this.blockerLock) {
            Interruptible var2 = this.blocker;
            if (var2 != null) {
                this.interrupt0();
                var2.interrupt(this);
                return;
            }
        }
        this.interrupt0();
    }
    /**
     * native方法线程中断
     */
    private native void interrupt0();

interrupt0方法底层调用jvm.cpp中的JVM_Interrupt函数:

JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_Interrupt");

  //确保C++线程和OS线程在操作之前没有被释放
  oop java_thread = JNIHandles::resolve_non_null(jthread);
  MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
  //我们需要重新解析java_thread,因为在获取锁的过程中可能会发生GC
  JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
  if (thr != NULL) {
    Thread::interrupt(thr);
  }
JVM_END

七 Object的Wait/Notify/NotifyAll

/**
     * 线程等待
     * @param var1 毫秒
     * @param var3 纳秒
     */
    public final void wait(long var1, int var3) throws InterruptedException {
        if (var1 < 0L) {
            throw new IllegalArgumentException("timeout value is negative");
        } else if (var3 >= 0 && var3 <= 999999) {
            //纳秒>0,毫秒直接++
            if (var3 > 0) {
                ++var1;
            }
            //调用native方法
            this.wait(var1);
        } else {
            throw new IllegalArgumentException("nanosecond timeout value out of range");
        }
    }
    /**
     * native方法线程等待
     */
    public final native void wait(long var1) throws InterruptedException;
    /**
     * native方法线程单个唤醒
     */
    public final native void notify();
    /**
     * native方法线程唤醒等待池中所有线程
     */
    public final native void notifyAll();

Wait/Notify/NotifyAll在objectMonitor.cpp中,点击查看
Wait片段:

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
   Thread * const Self = THREAD ;
   assert(Self->is_Java_thread(), "Must be Java thread!");
   JavaThread *jt = (JavaThread *)THREAD;

   DeferredInitialize () ;

   // Throw IMSX or IEX.
   CHECK_OWNER();

   //调用is_interrupted()判断并清除线程中断状态,如果中断状态为true,抛出中断异常并结束
   if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
     //post monitor waited event
     //注意这是过去式,已经等待完了
     if (JvmtiExport::should_post_monitor_waited()) {
        //注意:这里传递参数'false',这是因为由于线程中断,等待不会超时
        JvmtiExport::post_monitor_waited(jt, this, false);
     }
     TEVENT (Wait - Throw IEX) ;
     THROW(vmSymbols::java_lang_InterruptedException());
     return ;
   }
   TEVENT (Wait) ;

   assert (Self->_Stalled == 0, "invariant") ;
   Self->_Stalled = intptr_t(this) ;
   jt->set_current_waiting_monitor(this);

   // create a node to be put into the queue
   // Critically, after we reset() the event but prior to park(), we must check
   // for a pending interrupt.

   //创建一个node放入队列
   //关键是,在reset()之后,但在park()之前,必须检查是否有挂起的中断
   ObjectWaiter node(Self);
   node.TState = ObjectWaiter::TS_WAIT ;
   Self->_ParkEvent->reset() ;
   OrderAccess::fence();

   //在本例中等待队列是一个循环的双向链表,但它也可以是一个优先级队列或任何数据结构。
   //_WaitSetLock保护着等待队列.
   //通常,等待队列只能由监视器*except*的所有者访问,但在park()因中断超时而返回的情况下也是可以。
   //竞争非常小,所以使用一个自旋锁而不是重量级的阻塞锁。
   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
   AddWaiter (&node) ;
   Thread::SpinRelease (&_WaitSetLock) ;

   if ((SyncFlags & 4) == 0) {
      _Responsible = NULL ;
   }
   intptr_t save = _recursions; // 记录旧的递归次数
   _waiters++;                  // waiters 自增
   _recursions = 0;             // 设置 recursion level to be 1
   exit (Self) ;                // 退出监视器
   guarantee (_owner != Self, "invariant") ;

   //一旦在上面的exit()调用中删除了ObjectMonitor的所有权,
   //另一个线程就可以进入ObjectMonitor,执行notify()和exit()对象监视器。
   //如果另一个线程的exit()调用选择此线程作为后继者,并且此线程在发布MONITOR_CONTENDED_EXIT时发生unpark()调用,
   //则我们使用RawMonitors运行事件风险处理,并使用unpark().
   //为了避免这个问题,我们重新发布事件,即使未使用原来的unpark(),
   //这也不会造成任何伤害,因为已经为此监视器选好了继任者。
   if (node._notified != 0 && _succ == Self) {
      node._event->unpark();
   }

   // The thread is on the WaitSet list - now park() it.
   // On MP systems it's conceivable that a brief spin before we park
   // could be profitable.
   //
   // TODO-FIXME: change the following logic to a loop of the form
   //   while (!timeout && !interrupted && _notified == 0) park()

   int ret = OS_OK ;
   int WasNotified = 0 ;
   { // State transition wrappers
     OSThread* osthread = Self->osthread();
     OSThreadWaitState osts(osthread, true);
     {
       ThreadBlockInVM tbivm(jt);
       // Thread is in thread_blocked state and oop access is unsafe.
       //线程处于阻塞状态,并且oop访问是不安全的
       jt->set_suspend_equivalent();

       if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
           // Intentionally empty 空处理
       } else
       if (node._notified == 0) {
         if (millis <= 0) {
            // 调用park()方法阻塞线程
            Self->_ParkEvent->park () ;
         } else {
            // 调用park()方法在超时时间内阻塞线程
            ret = Self->_ParkEvent->park (millis) ;
         }
       }

       // were we externally suspended while we were waiting?
       if (ExitSuspendEquivalent (jt)) {
          // TODO-FIXME: add -- if succ == Self then succ = null.
          jt->java_suspend_self();
       }

     } // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm

     //当线程不在等待队列时,使用双重检查锁定避免获取_WaitSetLock
     if (node.TState == ObjectWaiter::TS_WAIT) {
         Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
         if (node.TState == ObjectWaiter::TS_WAIT) {
            DequeueSpecificWaiter (&node) ;       // unlink from WaitSet
            assert(node._notified == 0, "invariant");
            node.TState = ObjectWaiter::TS_RUN ;
         }
         Thread::SpinRelease (&_WaitSetLock) ;
     }

     //从这个线程的角度来看,Node's TState是稳定的,
     //没有其他线程能够异步修改TState
     guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
     OrderAccess::loadload() ;
     if (_succ == Self) _succ = NULL ;
     WasNotified = node._notified ;

     // Reentry phase -- reacquire the monitor.
     // re-enter contended(竞争) monitor after object.wait().
     // retain OBJECT_WAIT state until re-enter successfully completes
     // Thread state is thread_in_vm and oop access is again safe,
     // although the raw address of the object may have changed.
     // (Don't cache naked oops over safepoints, of course).

     // post monitor waited event.
     //注意这是过去式,已经等待完了
     if (JvmtiExport::should_post_monitor_waited()) {
       JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
     }
     OrderAccess::fence() ;

     assert (Self->_Stalled != 0, "invariant") ;
     Self->_Stalled = 0 ;

     assert (_owner != Self, "invariant") ;
     ObjectWaiter::TStates v = node.TState ;
     if (v == ObjectWaiter::TS_RUN) {
         enter (Self) ;
     } else {
         guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
         ReenterI (Self, &node) ;
         node.wait_reenter_end(this);
     }

     // Self has reacquired the lock.
     // Lifecycle - the node representing Self must not appear on any queues.
     // Node is about to go out-of-scope, but even if it were immortal(长久的) we wouldn't
     // want residual(残留的) elements associated with this thread left on any lists.
     guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
     assert    (_owner == Self, "invariant") ;
     assert    (_succ != Self , "invariant") ;
   } // OSThreadWaitState()

   jt->set_current_waiting_monitor(NULL);

   guarantee (_recursions == 0, "invariant") ;
   _recursions = save;     // restore the old recursion count
   _waiters--;             // decrement the number of waiters

   // Verify a few postconditions
   assert (_owner == Self       , "invariant") ;
   assert (_succ  != Self       , "invariant") ;
   assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;

   if (SyncFlags & 32) {
      OrderAccess::fence() ;
   }

   //检查是否有通知notify发生
   // 从park()方法返回后,判断是否是因为中断返回,再次调用
   // thread::is_interrupted(Self, true)判断并清除线程中断状态
   // 如果中断状态为true,抛出中断异常并结束。
   if (!WasNotified) {
     // no, it could be timeout or Thread.interrupt() or both
     // check for interrupt event, otherwise it is timeout
     if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
       TEVENT (Wait - throw IEX from epilog) ;
       THROW(vmSymbols::java_lang_InterruptedException());
     }
   }
   //注意:虚假唤醒将被视为超时;监视器通知优先于线程中断。
}

Notify片段:

void ObjectMonitor::notify(TRAPS) {
  CHECK_OWNER();
  if (_WaitSet == NULL) {
     TEVENT (Empty-Notify) ;
     return ;
  }
  DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);

  int Policy = Knob_MoveNotifyee ;

  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
  ObjectWaiter * iterator = DequeueWaiter() ;
  if (iterator != NULL) {
     TEVENT (Notify1 - Transfer) ;
     guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
     guarantee (iterator->_notified == 0, "invariant") ;
     if (Policy != 4) {
        iterator->TState = ObjectWaiter::TS_ENTER ;
     }
     iterator->_notified = 1 ;

     ObjectWaiter * List = _EntryList ;
     if (List != NULL) {
        assert (List->_prev == NULL, "invariant") ;
        assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        assert (List != iterator, "invariant") ;
     }

     if (Policy == 0) {       // prepend(预追加) to EntryList
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
             List->_prev = iterator ;
             iterator->_next = List ;
             iterator->_prev = NULL ;
             _EntryList = iterator ;
        }
     } else
     if (Policy == 1) {      // append(真正追加) to EntryList
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
            //考虑:当前获取EntryList的tail需要遍历整个链表
            //将tail访问转换为CDLL而不是使用当前的DLL,从而使访问时间固定。
            ObjectWaiter * Tail ;
            for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
            assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
            Tail->_next = iterator ;
            iterator->_prev = Tail ;
            iterator->_next = NULL ;
        }
     } else
     if (Policy == 2) {      // prepend to cxq
         // prepend(预追加) to cxq
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
            iterator->TState = ObjectWaiter::TS_CXQ ;
            for (;;) {
                ObjectWaiter * Front = _cxq ;
                iterator->_next = Front ;
                if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                    break ;
                }
            }
         }
     } else
     if (Policy == 3) {      // append(真正追加) to cxq
        iterator->TState = ObjectWaiter::TS_CXQ ;
        for (;;) {
            ObjectWaiter * Tail ;
            Tail = _cxq ;
            if (Tail == NULL) {
                iterator->_next = NULL ;
                if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
                   break ;
                }
            } else {
                while (Tail->_next != NULL) Tail = Tail->_next ;
                Tail->_next = iterator ;
                iterator->_prev = Tail ;
                iterator->_next = NULL ;
                break ;
            }
        }
     } else {
        ParkEvent * ev = iterator->_event ;
        iterator->TState = ObjectWaiter::TS_RUN ;
        OrderAccess::fence() ;
        ev->unpark() ;
     }

     if (Policy < 4) {
       iterator->wait_reenter_begin(this);
     }

     // _WaitSetLock protects the wait queue, not the EntryList.  We could
     // move the add-to-EntryList operation, above, outside the critical section
     // protected by _WaitSetLock.  In practice that's not useful.  With the
     // exception of  wait() timeouts and interrupts the monitor owner
     // is the only thread that grabs _WaitSetLock.  There's almost no contention
     // on _WaitSetLock so it's not profitable to reduce the length of the
     // critical section.
  }
  Thread::SpinRelease (&_WaitSetLock) ;
  if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) {
     ObjectMonitor::_sync_Notifications->inc() ;
  }
}

NotifyAll片段:

void ObjectMonitor::notifyAll(TRAPS) {
  CHECK_OWNER();
  ObjectWaiter* iterator;
  if (_WaitSet == NULL) {
      TEVENT (Empty-NotifyAll) ;
      return ;
  }
  DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);

  int Policy = Knob_MoveNotifyee ;
  int Tally = 0 ;
  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notifyall") ;

  for (;;) {
     iterator = DequeueWaiter () ;
     if (iterator == NULL) break ;
     TEVENT (NotifyAll - Transfer1) ;
     ++Tally ;

     // Disposition - what might we do with iterator ?
     // a.  add it directly to the EntryList - either tail or head.
     // b.  push it onto the front of the _cxq.
     // For now we use (a).

     guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
     guarantee (iterator->_notified == 0, "invariant") ;
     iterator->_notified = 1 ;
     if (Policy != 4) {
        iterator->TState = ObjectWaiter::TS_ENTER ;
     }

     ObjectWaiter * List = _EntryList ;
     if (List != NULL) {
        assert (List->_prev == NULL, "invariant") ;
        assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        assert (List != iterator, "invariant") ;
     }

     if (Policy == 0) {       // prepend to EntryList
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
             List->_prev = iterator ;
             iterator->_next = List ;
             iterator->_prev = NULL ;
             _EntryList = iterator ;
        }
     } else
     if (Policy == 1) {      // append to EntryList
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
            // CONSIDER:  finding the tail currently requires a linear-time walk of
            // the EntryList.  We can make tail access constant-time by converting to
            // a CDLL instead of using our current DLL.
            ObjectWaiter * Tail ;
            for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
            assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
            Tail->_next = iterator ;
            iterator->_prev = Tail ;
            iterator->_next = NULL ;
        }
     } else
     if (Policy == 2) {      // prepend to cxq
         // prepend to cxq
         iterator->TState = ObjectWaiter::TS_CXQ ;
         for (;;) {
             ObjectWaiter * Front = _cxq ;
             iterator->_next = Front ;
             if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                 break ;
             }
         }
     } else
     if (Policy == 3) {      // append to cxq
        iterator->TState = ObjectWaiter::TS_CXQ ;
        for (;;) {
            ObjectWaiter * Tail ;
            Tail = _cxq ;
            if (Tail == NULL) {
                iterator->_next = NULL ;
                if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
                   break ;
                }
            } else {
                while (Tail->_next != NULL) Tail = Tail->_next ;
                Tail->_next = iterator ;
                iterator->_prev = Tail ;
                iterator->_next = NULL ;
                break ;
            }
        }
     } else {
        ParkEvent * ev = iterator->_event ;
        iterator->TState = ObjectWaiter::TS_RUN ;
        OrderAccess::fence() ;
        ev->unpark() ;
     }

     if (Policy < 4) {
       iterator->wait_reenter_begin(this);
     }

     // _WaitSetLock protects the wait queue, not the EntryList.  We could
     // move the add-to-EntryList operation, above, outside the critical section
     // protected by _WaitSetLock.  In practice that's not useful.  With the
     // exception of  wait() timeouts and interrupts the monitor owner
     // is the only thread that grabs _WaitSetLock.  There's almost no contention
     // on _WaitSetLock so it's not profitable to reduce the length of the
     // critical section.
  }

  Thread::SpinRelease (&_WaitSetLock) ;

  if (Tally != 0 && ObjectMonitor::_sync_Notifications != NULL) {
     ObjectMonitor::_sync_Notifications->inc(Tally) ;
  }
}

特此声明:
分享文章有完整的知识架构图,将从以下几个方面系统展开:
1 基础(Linux/Spring boot/并发)
2 性能调优(jvm/tomcat/mysql)
3 高并发分布式
4 微服务体系
如果您觉得文章不错,请关注阿伦故事,您的支持是我坚持的莫大动力,在此受小弟一拜!


每篇福利:

评论区打出车型.jpg
上一篇下一篇

猜你喜欢

热点阅读