AQS中LockSupport与Synchronized中par

2022-05-10  本文已影响0人  justonemoretry

1.简介

使用LockSupport的线程会与一个许可关联,其实就像是一个二元信号量(意思就是只有一个许可证可以使用),如果这个许可没有被占用,那么当前线程可以获得许可并继续执行,如果许可以已经被占用,则当前线程就会被阻塞,然后等待许可的获取。注意:许可默认是被占用的!
下面是释放许可-获取许可的一个例子,能正常输出b。

public static void main(String[] args) {
        Thread thread = Thread.currentThread();
        LockSupport.unpark(thread);//释放许可
        LockSupport.park();// 获取许可
        System.out.println("b");
    }

1.1 park方法分析

除非许可证可用,否则出于线程调度目的禁用当前线程。 如果许可证可用,则该许可证被消耗,呼叫立即返回;否则,出于线程调度目的,当前线程将被禁用,并处于休眠状态,直到发生以下三种情况之一:

 /**
     * Disables the current thread for thread scheduling purposes unless the
     * permit is available.
     *
     * <p>If the permit is available then it is consumed and the call returns
     * immediately; otherwise
     * the current thread becomes disabled for thread scheduling
     * purposes and lies dormant until one of three things happens:
     *
     * <ul>
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread upon return.
     *
     * @param blocker the synchronization object responsible for this
     *        thread parking
     * @since 1.6
     */
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        // 记录当前线程阻塞的原因,lock调用的blocker是对应的同步器
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        // 线程唤醒后,去掉阻塞原因
        setBlocker(t, null);
    }

1.2 unpark方法分析

使给定线程的许可证可用(如果尚未可用)。如果线程在 park上被阻塞,那么它将解除阻塞。否则,它对 park 的下一次呼叫保证不会被阻塞。如果给定的线程尚未启动,则不能保证此操作有任何效果。

2.JVM源码分析

2.1 park方法源码解析

在 LockSupport 的底层主要是调用 Unsafa 类的 park, unpark 方法实现如下:

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
  UnsafeWrapper("Unsafe_Park");
  EventThreadPark event;
#ifndef USDT2
  HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
   HOTSPOT_THREAD_PARK_BEGIN(
                             (uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
  JavaThreadParkedState jtps(thread, time != 0);
  // 这里调用线程持有的parker实例中的park方法
  thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
  HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
  HOTSPOT_THREAD_PARK_END(
                          (uintptr_t) thread->parker());
#endif /* USDT2 */
  if (event.should_commit()) {
    oop obj = thread->current_park_blocker();
    event.set_klass((obj != NULL) ? obj->klass() : NULL);
    event.set_timeout(time);
    event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0);
    event.commit();
  }
UNSAFE_END

UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
  UnsafeWrapper("Unsafe_Unpark");
  Parker* p = NULL;
  if (jthread != NULL) {
    oop java_thread = JNIHandles::resolve_non_null(jthread);
    if (java_thread != NULL) {
      jlong lp = java_lang_Thread::park_event(java_thread);
      if (lp != 0) {
        // This cast is OK even though the jlong might have been read
        // non-atomically on 32bit systems, since there, one word will
        // always be zero anyway and the value set is always the same
        p = (Parker*)addr_from_java(lp);
      } else {
        // Grab lock if apparently null or using older version of library
        MutexLocker mu(Threads_lock);
        java_thread = JNIHandles::resolve_non_null(jthread);
        if (java_thread != NULL) {
          JavaThread* thr = java_lang_Thread::thread(java_thread);
          if (thr != NULL) {
            p = thr->parker();
            if (p != NULL) { // Bind to Java thread for next time.
              java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
            }
          }
        }
      }
    }
  }
  if (p != NULL) {
#ifndef USDT2
    HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
    HOTSPOT_THREAD_UNPARK(
                          (uintptr_t) p);
#endif /* USDT2 */ 调用unpark方法
    p->unpark();
  }
UNSAFE_END

每个线程对象都有一个 Parker 实例

// JSR166 per-thread parker
private:
  Parker*    _parker;
public:
  Parker*     parker() { return _parker; }

Parker类的定义

class Parker : public os::PlatformParker {
private:
 // 许可
  volatile int _counter ;
  Parker * FreeNext ;
  JavaThread * AssociatedWith ; // Current association

public:
  Parker() : PlatformParker() {
    _counter       = 0 ;
    FreeNext       = NULL ;
    AssociatedWith = NULL ;
  }
protected:
  ~Parker() { ShouldNotReachHere(); }
public:
  // For simplicity of interface with Java, all forms of park (indefinite,
  // relative, and absolute) are multiplexed into one call.
  // 无限期、相对时间、绝对时间在一个方法中
  void park(bool isAbsolute, jlong time);
  void unpark();

  // Lifecycle operators
  static Parker * Allocate (JavaThread * t) ;
  static void Release (Parker * e) ;
private:
  static Parker * volatile FreeList ;
  static volatile int ListLock ;

};

linux系统中PlatformParker的实现

class PlatformParker : public CHeapObj<mtInternal> {
  protected:
    enum {
        REL_INDEX = 0,
        ABS_INDEX = 1
    };
    int _cur_index;  // which cond is in use: -1, 0, 1
    // 互斥量
    pthread_mutex_t _mutex [1] ;
    // 条件队列
    pthread_cond_t  _cond  [2] ; // one for relative times and one for abs.

  public:       // TODO-FIXME: make dtor private
    ~PlatformParker() { guarantee (0, "invariant") ; }

  public:
    PlatformParker() {
      int status;
      status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
      assert_status(status == 0, status, "cond_init rel");
      status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
      assert_status(status == 0, status, "cond_init abs");
      status = pthread_mutex_init (_mutex, NULL);
      assert_status(status == 0, status, "mutex_init");
      _cur_index = -1; // mark as unused
    }
};

Parker在不同的操作系统中有不同的实现,下面是在oss_linux系统中的实现过程

void Parker::park(bool isAbsolute, jlong time) {
  // Ideally we'd do something useful while spinning, such
  // as calling unpackTime().

  // Optional fast-path check:
  // Return immediately if a permit is available.
  // We depend on Atomic::xchg() having full barrier semantics
  // since we are doing a lock-free update to _counter.
  // 原子交换,_counter为1则直接返回,不用休眠
  if (Atomic::xchg(0, &_counter) > 0) return;

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

  // Optional optimization -- avoid state transitions if there's an interrupt pending.
  // Check interrupt before trying to wait
 // 线程处于中断状态,直接返回 
  if (Thread::is_interrupted(thread, false)) {
    return;
  }

  // Next, demultiplex/decode time arguments
  timespec absTime;
  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
    return;
  }
  if (time > 0) {
    // 将时间转为ns形式的绝对时间存到absTime中
    unpackTime(&absTime, isAbsolute, time);
  }


  // Enter safepoint region
  // Beware of deadlocks such as 6317397.
  // The per-thread Parker:: mutex is a classic leaf-lock.
  // In particular a thread must never block on the Threads_lock while
  // holding the Parker:: mutex.  If safepoints are pending both the
  // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
  ThreadBlockInVM tbivm(jt);

  // Don't wait if cannot get lock since interference arises from
  // unblocking.  Also. check interrupt before trying wait
  // 再次校验线程是否中断,试着获取锁,获取锁失败退出
  // 这里锁是归属于当前线程的,获锁失败是由于unblocking操作引起的,这时候不用休眠
  // 可以去做唤醒后该做的事。
  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
    return;
  }
  
  // 走到这说明获锁已经成功了
  int status ;
  // 许可大于0不用再等待
  if (_counter > 0)  { // no wait needed
    _counter = 0;
    status = pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    // Paranoia to ensure our locked and lock-free paths interact
    // correctly with each other and Java-level accesses.
    // 添加内存屏障,保证_counter的修改对其它线程可见
    OrderAccess::fence();
    return;
  }

#ifdef ASSERT
  // Don't catch signals while blocked; let the running threads have the signals.
  // (This allows a debugger to break into the running thread.)
  sigset_t oldsigs;
  sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
  pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
  // 将java线程拥有的操作系统线程设置为convar_wait状态,等待
// 某个条件发生
  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
 // 设置_suspend_equivalent为true,不懂要干嘛
  jt->set_suspend_equivalent();
  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

  assert(_cur_index == -1, "invariant");
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    // 没有时间设置代表永久休眠,条件队列wait操作会先释放互斥锁再休眠,
// 线程被唤醒时,要先拿到互斥锁才能向下执行,status代表获锁状态
    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
  } else {
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
   // 带超时时间的cond_wait,到了超时时间条件没有发生也会唤醒
    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
    // 失败的情况下,删除对应的条件队列,重新初始化
    if (status != 0 && WorkAroundNPTLTimedWaitHang) {
      pthread_cond_destroy (&_cond[_cur_index]) ;
      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
    }
  }
  _cur_index = -1;
  assert_status(status == 0 || status == EINTR ||
                status == ETIME || status == ETIMEDOUT,
                status, "cond_timedwait");

#ifdef ASSERT
  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif
 
 // 被唤醒后,许可被消耗,释放互斥锁
  _counter = 0 ;
  status = pthread_mutex_unlock(_mutex) ;
  assert_status(status == 0, status, "invariant") ;
  // Paranoia to ensure our locked and lock-free paths interact
  // correctly with each other and Java-level accesses.
  // 这里插入内存屏障,确保_counter的写入操作对其它线程可见
   OrderAccess::fence();

  // 如果等待的时候有外部挂起,重新挂起
  // If externally suspended while waiting, re-suspend
  if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
}

从park的实现可以看到:

  1. 无论是什么情况返回,park方法本身都不会告知调用方返回的原因,所以调用的时候一般都会去判断返回的场景,根据场景做不同的处理
  2. 线程的等待与挂起、唤醒等等就是使用的POSIX的线程API
  3. park的许可通过原子变量_count实现,当被消耗时,_count为0,只要拥有许可,就会立即返回
    order fence在linux中的实现
 
inline void OrderAccess::fence() {
  if (os::is_MP()) {
#ifdef AMD64
  // 没有使用mfence,因为mfence有时候性能差于使用 locked addl
    __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory");
#else    __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory");
#endif  }
}

2.2 unpark源码

unpark的过程就比较简单了,如下:

void Parker::unpark() {
  int s, status ;
  // 互斥变量加锁
  status = pthread_mutex_lock(_mutex);
  assert (status == 0, "invariant") ;
  s = _counter;
  // 发放许可
  _counter = 1;
  // 小于1代表许可被消耗,线程可能被park
  if (s < 1) {
    // thread might be parked
    if (_cur_index != -1) {
      // thread is definitely parked
     // 如果之前_counter小于1,那么根据_cur_index来判断是否线程已经阻塞了。
// 如果阻塞根据WorkAroundNPTLTimedWaitHang来选择调用pthread_cond_signal(唤醒线程)和pthread_mutex_unlock(释放_mutex锁)的顺序,
// 当然为了避免NPTL-FUTEX 阻塞在 pthread_cond_timedwait上,这里默认开启为1。
      if (WorkAroundNPTLTimedWaitHang) {
        // 先唤醒等待线程,再释放互斥锁,防止线程超时醒来,直接抢锁,和唤醒线程的操作有竞争
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0, "invariant");
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
      } else {
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0, "invariant");
      }
    } else {
      pthread_mutex_unlock(_mutex);
      assert (status == 0, "invariant") ;
    }
  } else {
   // 许可还在,说明没有线程挂起,直接解锁
    pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
  }
}

3 synchronized中park的实现

synchronized中是靠parkEvent类来实现park和unpark方法来实现。parkEvent类的定义,以及获取释放方式如下,parkEvent和parker不同,parker是线程直接持有,parkEvent是在一个列表中去查找,查找首个节点,为空闲则和当前线程绑定,不为空则新建,使用完后再归还到链表中:

ParkEvent * ParkEvent::Allocate (Thread * t) {
  ParkEvent * ev ;
 
  //获取锁
  Thread::SpinAcquire(&ListLock, "ParkEventFreeListAllocate");
  {
    //将FreeList链表头从链表中移除
    ev = FreeList;
    if (ev != NULL) {
      FreeList = ev->FreeNext;
    }
  }
  //释放锁
  Thread::SpinRelease(&ListLock);
 
  if (ev != NULL) {
    //如果找到一个空闲的ParkEvent
    guarantee (ev->AssociatedWith == NULL, "invariant") ;
  } else {
    //如果没有找到,则创建一个
    ev = new ParkEvent () ;
    guarantee ((intptr_t(ev) & 0xFF) == 0, "invariant") ;
  }
  //将_Event置为0
  ev->reset() ;                     // courtesy to caller
  //保存关联的线程
  ev->AssociatedWith = t ;          // Associate ev with t
  ev->FreeNext       = NULL ;
  return ev ;
}
 
void ParkEvent::Release (ParkEvent * ev) {
  if (ev == NULL) return ;
  guarantee (ev->FreeNext == NULL      , "invariant") ;
  ev->AssociatedWith = NULL ;
  //获取锁
  Thread::SpinAcquire(&ListLock, "ParkEventFreeListRelease");
  {  
    //归还到FreeList链表中
    ev->FreeNext = FreeList;
    FreeList = ev;
  }
  //释放锁
  Thread::SpinRelease(&ListLock);
}
 
ParkEvent() : PlatformEvent() {
       AssociatedWith = NULL ;
       FreeNext       = NULL ;
       ListNext       = NULL ;
       ListPrev       = NULL ;
       OnList         = 0 ;
       TState         = 0 ;
       Notified       = 0 ;
       IsWaiting      = 0 ;
    }
 
PlatformEvent() {
      int status;
      //初始化_cond和_mutex
      status = pthread_cond_init (_cond, os::Linux::condAttr());
      assert_status(status == 0, status, "cond_init");
      status = pthread_mutex_init (_mutex, NULL);
      assert_status(status == 0, status, "mutex_init");
      _Event   = 0 ;
      _nParked = 0 ;
      _Assoc   = NULL ;
    }
 
 void reset() { _Event = 0 ; }

park方法

int os::PlatformEvent::TryPark() {
  for (;;) {
    const int v = _Event ;
    //_Event只能是0或者1
    guarantee ((v == 0) || (v == 1), "invariant") ;
    //将_Event原子的置为0
    if (Atomic::cmpxchg (0, &_Event, v) == v) return v  ;
  }
}
 
void os::PlatformEvent::park() {       // AKA "down()"
  int v ;
  for (;;) {
      v = _Event ;
      //将其原子的减1
      if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
  }
  guarantee (v >= 0, "invariant") ;
  if (v == 0) {
     //获取锁
     int status = pthread_mutex_lock(_mutex);
     assert_status(status == 0, status, "mutex_lock");
     guarantee (_nParked == 0, "invariant") ;
     //已park线程计数加1
     ++ _nParked ;
     //_Event已经原子的减1,变成-1了
     while (_Event < 0) {
        //无期限等待
        status = pthread_cond_wait(_cond, _mutex);
        if (status == ETIME) { status = EINTR; }
        assert_status(status == 0 || status == EINTR, status, "cond_wait");
     }
     //被唤醒了
     //计数减1
     -- _nParked ;
    //重置成0
    _Event = 0 ;
    //释放锁
     status = pthread_mutex_unlock(_mutex);
     assert_status(status == 0, status, "mutex_unlock");
    //让修改立即生效
    OrderAccess::fence();
  }
  guarantee (_Event >= 0, "invariant") ;
}
 
int os::PlatformEvent::park(jlong millis) {
  guarantee (_nParked == 0, "invariant") ;
 
  int v ;
  for (;;) {
      v = _Event ;
      //将其原子的减1
      if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
  }
  guarantee (v >= 0, "invariant") ;
  if (v != 0) return OS_OK ;
 
  //计算等待的时间
  struct timespec abst;
  compute_abstime(&abst, millis);
 
  int ret = OS_TIMEOUT;
  //获取锁
  int status = pthread_mutex_lock(_mutex);
  assert_status(status == 0, status, "mutex_lock");
  guarantee (_nParked == 0, "invariant") ;
  //计数加1
  ++_nParked ;
 
  while (_Event < 0) {
    //让线程休眠,底层是pthread_cond_timedwait
    status = os::Linux::safe_cond_timedwait(_cond, _mutex, &abst);
    //WorkAroundNPTLTimedWaitHang的值默认是1
    if (status != 0 && WorkAroundNPTLTimedWaitHang) {
      pthread_cond_destroy (_cond);
      pthread_cond_init (_cond, os::Linux::condAttr()) ;
    }
    //被中断后就返回EINTR,正常被唤醒就返回0,另外两个是等待超时
    assert_status(status == 0 || status == EINTR ||
                  status == ETIME || status == ETIMEDOUT,
                  status, "cond_timedwait");
    //FilterSpuriousWakeups默认是true              
    if (!FilterSpuriousWakeups) break ;                 // previous semantics
    //如果超时了则退出循环
    if (status == ETIME || status == ETIMEDOUT) break ;
  }
  --_nParked ;
  if (_Event >= 0) {
     ret = OS_OK;
  }
  _Event = 0 ;
  //解锁
  status = pthread_mutex_unlock(_mutex);
  assert_status(status == 0, status, "mutex_unlock");
  assert (_nParked == 0, "invariant") ;
  //让修改立即生效
  OrderAccess::fence();
  return ret;
}

unpark方法
unpark用于唤醒某个被park方法阻塞的线程,其实现如下:

void os::PlatformEvent::unpark() {
  //将其原子的置为1,如果原来就是1,说明已经unpark过了,直接返回
  if (Atomic::xchg(1, &_Event) >= 0) return;
 
  //获取锁
  int status = pthread_mutex_lock(_mutex);
  assert_status(status == 0, status, "mutex_lock");
  int AnyWaiters = _nParked;
  assert(AnyWaiters == 0 || AnyWaiters == 1, "invariant");
  //WorkAroundNPTLTimedWaitHang默认是true
  if (AnyWaiters != 0 && WorkAroundNPTLTimedWaitHang) {
    AnyWaiters = 0;
    //发信号唤醒该线程,被唤醒后将_nParked置为0
    pthread_cond_signal(_cond);
  }
  //释放锁
  status = pthread_mutex_unlock(_mutex);
  assert_status(status == 0, status, "mutex_unlock");
  if (AnyWaiters != 0) {
    //pthread_cond_signal不要求获取锁,此处再次唤醒
    status = pthread_cond_signal(_cond);
    assert_status(status == 0, status, "cond_signal");
  }
}

4 总结

lockSupport中park和unpark都是依赖于Parker类来实现,linux中parker类的实现主要是维护了一个许可_counter变量,利用posix(Portable Operating System Interface of UNIX)thread api去实现线程的休眠与唤醒。
Synchronized中依赖于ParkerEvent类,其中linux的实现,主要是维护_Event的值,然后同样是用pthread_mutex_lock进行加锁,pthread_cond_wait进行线程休眠,按注释说,后续会用parkerEvent替代parker类。

参考链接

LockSupport 以及 park、unpark 方法源码分析
LockSupport的park和unpark的原理
Hotspot Parker和ParkEvent 源码解析

上一篇下一篇

猜你喜欢

热点阅读