AQS中LockSupport与Synchronized中par
1.简介
使用LockSupport的线程会与一个许可关联,其实就像是一个二元信号量(意思就是只有一个许可证可以使用),如果这个许可没有被占用,那么当前线程可以获得许可并继续执行,如果许可以已经被占用,则当前线程就会被阻塞,然后等待许可的获取。注意:许可默认是被占用的!
下面是释放许可-获取许可的一个例子,能正常输出b。
public static void main(String[] args) {
Thread thread = Thread.currentThread();
LockSupport.unpark(thread);//释放许可
LockSupport.park();// 获取许可
System.out.println("b");
}
1.1 park方法分析
除非许可证可用,否则出于线程调度目的禁用当前线程。 如果许可证可用,则该许可证被消耗,呼叫立即返回;否则,出于线程调度目的,当前线程将被禁用,并处于休眠状态,直到发生以下三种情况之一:
- 其他线程以当前线程为目标调用 unpark
- 其他线程中断当前线程
- 调用不合逻辑地返回
此方法不报告导致方法返回的原因。调用方应首先重新检查导致线程停止的条件。调用方还可以在返回时确定线程的中断状态。
/**
* Disables the current thread for thread scheduling purposes unless the
* permit is available.
*
* <p>If the permit is available then it is consumed and the call returns
* immediately; otherwise
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until one of three things happens:
*
* <ul>
* <li>Some other thread invokes {@link #unpark unpark} with the
* current thread as the target; or
*
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
*
* <li>The call spuriously (that is, for no reason) returns.
* </ul>
*
* <p>This method does <em>not</em> report which of these caused the
* method to return. Callers should re-check the conditions which caused
* the thread to park in the first place. Callers may also determine,
* for example, the interrupt status of the thread upon return.
*
* @param blocker the synchronization object responsible for this
* thread parking
* @since 1.6
*/
public static void park(Object blocker) {
Thread t = Thread.currentThread();
// 记录当前线程阻塞的原因,lock调用的blocker是对应的同步器
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
// 线程唤醒后,去掉阻塞原因
setBlocker(t, null);
}
1.2 unpark方法分析
使给定线程的许可证可用(如果尚未可用)。如果线程在 park上被阻塞,那么它将解除阻塞。否则,它对 park 的下一次呼叫保证不会被阻塞。如果给定的线程尚未启动,则不能保证此操作有任何效果。
2.JVM源码分析
2.1 park方法源码解析
在 LockSupport 的底层主要是调用 Unsafa 类的 park, unpark 方法实现如下:
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
UnsafeWrapper("Unsafe_Park");
EventThreadPark event;
#ifndef USDT2
HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
HOTSPOT_THREAD_PARK_BEGIN(
(uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
JavaThreadParkedState jtps(thread, time != 0);
// 这里调用线程持有的parker实例中的park方法
thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
HOTSPOT_THREAD_PARK_END(
(uintptr_t) thread->parker());
#endif /* USDT2 */
if (event.should_commit()) {
oop obj = thread->current_park_blocker();
event.set_klass((obj != NULL) ? obj->klass() : NULL);
event.set_timeout(time);
event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0);
event.commit();
}
UNSAFE_END
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
UnsafeWrapper("Unsafe_Unpark");
Parker* p = NULL;
if (jthread != NULL) {
oop java_thread = JNIHandles::resolve_non_null(jthread);
if (java_thread != NULL) {
jlong lp = java_lang_Thread::park_event(java_thread);
if (lp != 0) {
// This cast is OK even though the jlong might have been read
// non-atomically on 32bit systems, since there, one word will
// always be zero anyway and the value set is always the same
p = (Parker*)addr_from_java(lp);
} else {
// Grab lock if apparently null or using older version of library
MutexLocker mu(Threads_lock);
java_thread = JNIHandles::resolve_non_null(jthread);
if (java_thread != NULL) {
JavaThread* thr = java_lang_Thread::thread(java_thread);
if (thr != NULL) {
p = thr->parker();
if (p != NULL) { // Bind to Java thread for next time.
java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
}
}
}
}
}
}
if (p != NULL) {
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
HOTSPOT_THREAD_UNPARK(
(uintptr_t) p);
#endif /* USDT2 */ 调用unpark方法
p->unpark();
}
UNSAFE_END
每个线程对象都有一个 Parker 实例
- Parker 类继承 os::PlatformParker,应该是一个针对不同操作系统适配的
- 有一个 _counter 属性,可以理解为是否可以调用 park 方法的许可证,只有 _count > 0 的时候才能调用;
- 提供了公开的 park 和 unpark 方法
- 每个线程都有自己的parker实例,那么为什么还要加锁解锁呢,这个地方我理解是因为其它线程unpark的时候也可以操作被唤醒线程的parker实例。
// JSR166 per-thread parker
private:
Parker* _parker;
public:
Parker* parker() { return _parker; }
Parker类的定义
class Parker : public os::PlatformParker {
private:
// 许可
volatile int _counter ;
Parker * FreeNext ;
JavaThread * AssociatedWith ; // Current association
public:
Parker() : PlatformParker() {
_counter = 0 ;
FreeNext = NULL ;
AssociatedWith = NULL ;
}
protected:
~Parker() { ShouldNotReachHere(); }
public:
// For simplicity of interface with Java, all forms of park (indefinite,
// relative, and absolute) are multiplexed into one call.
// 无限期、相对时间、绝对时间在一个方法中
void park(bool isAbsolute, jlong time);
void unpark();
// Lifecycle operators
static Parker * Allocate (JavaThread * t) ;
static void Release (Parker * e) ;
private:
static Parker * volatile FreeList ;
static volatile int ListLock ;
};
linux系统中PlatformParker的实现
class PlatformParker : public CHeapObj<mtInternal> {
protected:
enum {
REL_INDEX = 0,
ABS_INDEX = 1
};
int _cur_index; // which cond is in use: -1, 0, 1
// 互斥量
pthread_mutex_t _mutex [1] ;
// 条件队列
pthread_cond_t _cond [2] ; // one for relative times and one for abs.
public: // TODO-FIXME: make dtor private
~PlatformParker() { guarantee (0, "invariant") ; }
public:
PlatformParker() {
int status;
status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
assert_status(status == 0, status, "cond_init rel");
status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
assert_status(status == 0, status, "cond_init abs");
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
_cur_index = -1; // mark as unused
}
};
Parker在不同的操作系统中有不同的实现,下面是在oss_linux系统中的实现过程
- 判断是否需要阻塞等待,如果已经是 _counter >0, 不需要等待,将 _counter = 0 , 返回
- 如果 1 不成立,构造当前线程的 ThreadBlockInVM ,检查 _counter > 0 是否成立,成立则将 _counter 设置为 0, unlock mutex 返回;
- 如果 2 不成立,更具需要时间进行不同的函数等待,如果等待正确返回,则将 _counter 设置为0, unlock mutex , park 调用成功。
void Parker::park(bool isAbsolute, jlong time) {
// Ideally we'd do something useful while spinning, such
// as calling unpackTime().
// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
// 原子交换,_counter为1则直接返回,不用休眠
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
// Optional optimization -- avoid state transitions if there's an interrupt pending.
// Check interrupt before trying to wait
// 线程处于中断状态,直接返回
if (Thread::is_interrupted(thread, false)) {
return;
}
// Next, demultiplex/decode time arguments
timespec absTime;
if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
return;
}
if (time > 0) {
// 将时间转为ns形式的绝对时间存到absTime中
unpackTime(&absTime, isAbsolute, time);
}
// Enter safepoint region
// Beware of deadlocks such as 6317397.
// The per-thread Parker:: mutex is a classic leaf-lock.
// In particular a thread must never block on the Threads_lock while
// holding the Parker:: mutex. If safepoints are pending both the
// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
ThreadBlockInVM tbivm(jt);
// Don't wait if cannot get lock since interference arises from
// unblocking. Also. check interrupt before trying wait
// 再次校验线程是否中断,试着获取锁,获取锁失败退出
// 这里锁是归属于当前线程的,获锁失败是由于unblocking操作引起的,这时候不用休眠
// 可以去做唤醒后该做的事。
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}
// 走到这说明获锁已经成功了
int status ;
// 许可大于0不用再等待
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
// 添加内存屏障,保证_counter的修改对其它线程可见
OrderAccess::fence();
return;
}
#ifdef ASSERT
// Don't catch signals while blocked; let the running threads have the signals.
// (This allows a debugger to break into the running thread.)
sigset_t oldsigs;
sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
// 将java线程拥有的操作系统线程设置为convar_wait状态,等待
// 某个条件发生
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
// 设置_suspend_equivalent为true,不懂要干嘛
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
assert(_cur_index == -1, "invariant");
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
// 没有时间设置代表永久休眠,条件队列wait操作会先释放互斥锁再休眠,
// 线程被唤醒时,要先拿到互斥锁才能向下执行,status代表获锁状态
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
} else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
// 带超时时间的cond_wait,到了超时时间条件没有发生也会唤醒
status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
// 失败的情况下,删除对应的条件队列,重新初始化
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (&_cond[_cur_index]) ;
pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
}
}
_cur_index = -1;
assert_status(status == 0 || status == EINTR ||
status == ETIME || status == ETIMEDOUT,
status, "cond_timedwait");
#ifdef ASSERT
pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif
// 被唤醒后,许可被消耗,释放互斥锁
_counter = 0 ;
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
// 这里插入内存屏障,确保_counter的写入操作对其它线程可见
OrderAccess::fence();
// 如果等待的时候有外部挂起,重新挂起
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
}
从park的实现可以看到:
- 无论是什么情况返回,park方法本身都不会告知调用方返回的原因,所以调用的时候一般都会去判断返回的场景,根据场景做不同的处理
- 线程的等待与挂起、唤醒等等就是使用的POSIX的线程API
- park的许可通过原子变量_count实现,当被消耗时,_count为0,只要拥有许可,就会立即返回
order fence在linux中的实现
inline void OrderAccess::fence() {
if (os::is_MP()) {
#ifdef AMD64
// 没有使用mfence,因为mfence有时候性能差于使用 locked addl
__asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory");
#else __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory");
#endif }
}
2.2 unpark源码
unpark的过程就比较简单了,如下:
void Parker::unpark() {
int s, status ;
// 互斥变量加锁
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
// 发放许可
_counter = 1;
// 小于1代表许可被消耗,线程可能被park
if (s < 1) {
// thread might be parked
if (_cur_index != -1) {
// thread is definitely parked
// 如果之前_counter小于1,那么根据_cur_index来判断是否线程已经阻塞了。
// 如果阻塞根据WorkAroundNPTLTimedWaitHang来选择调用pthread_cond_signal(唤醒线程)和pthread_mutex_unlock(释放_mutex锁)的顺序,
// 当然为了避免NPTL-FUTEX 阻塞在 pthread_cond_timedwait上,这里默认开启为1。
if (WorkAroundNPTLTimedWaitHang) {
// 先唤醒等待线程,再释放互斥锁,防止线程超时醒来,直接抢锁,和唤醒线程的操作有竞争
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
} else {
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
}
} else {
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
} else {
// 许可还在,说明没有线程挂起,直接解锁
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}
3 synchronized中park的实现
synchronized中是靠parkEvent类来实现park和unpark方法来实现。parkEvent类的定义,以及获取释放方式如下,parkEvent和parker不同,parker是线程直接持有,parkEvent是在一个列表中去查找,查找首个节点,为空闲则和当前线程绑定,不为空则新建,使用完后再归还到链表中:
ParkEvent * ParkEvent::Allocate (Thread * t) {
ParkEvent * ev ;
//获取锁
Thread::SpinAcquire(&ListLock, "ParkEventFreeListAllocate");
{
//将FreeList链表头从链表中移除
ev = FreeList;
if (ev != NULL) {
FreeList = ev->FreeNext;
}
}
//释放锁
Thread::SpinRelease(&ListLock);
if (ev != NULL) {
//如果找到一个空闲的ParkEvent
guarantee (ev->AssociatedWith == NULL, "invariant") ;
} else {
//如果没有找到,则创建一个
ev = new ParkEvent () ;
guarantee ((intptr_t(ev) & 0xFF) == 0, "invariant") ;
}
//将_Event置为0
ev->reset() ; // courtesy to caller
//保存关联的线程
ev->AssociatedWith = t ; // Associate ev with t
ev->FreeNext = NULL ;
return ev ;
}
void ParkEvent::Release (ParkEvent * ev) {
if (ev == NULL) return ;
guarantee (ev->FreeNext == NULL , "invariant") ;
ev->AssociatedWith = NULL ;
//获取锁
Thread::SpinAcquire(&ListLock, "ParkEventFreeListRelease");
{
//归还到FreeList链表中
ev->FreeNext = FreeList;
FreeList = ev;
}
//释放锁
Thread::SpinRelease(&ListLock);
}
ParkEvent() : PlatformEvent() {
AssociatedWith = NULL ;
FreeNext = NULL ;
ListNext = NULL ;
ListPrev = NULL ;
OnList = 0 ;
TState = 0 ;
Notified = 0 ;
IsWaiting = 0 ;
}
PlatformEvent() {
int status;
//初始化_cond和_mutex
status = pthread_cond_init (_cond, os::Linux::condAttr());
assert_status(status == 0, status, "cond_init");
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
_Event = 0 ;
_nParked = 0 ;
_Assoc = NULL ;
}
void reset() { _Event = 0 ; }
park方法
int os::PlatformEvent::TryPark() {
for (;;) {
const int v = _Event ;
//_Event只能是0或者1
guarantee ((v == 0) || (v == 1), "invariant") ;
//将_Event原子的置为0
if (Atomic::cmpxchg (0, &_Event, v) == v) return v ;
}
}
void os::PlatformEvent::park() { // AKA "down()"
int v ;
for (;;) {
v = _Event ;
//将其原子的减1
if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
}
guarantee (v >= 0, "invariant") ;
if (v == 0) {
//获取锁
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
guarantee (_nParked == 0, "invariant") ;
//已park线程计数加1
++ _nParked ;
//_Event已经原子的减1,变成-1了
while (_Event < 0) {
//无期限等待
status = pthread_cond_wait(_cond, _mutex);
if (status == ETIME) { status = EINTR; }
assert_status(status == 0 || status == EINTR, status, "cond_wait");
}
//被唤醒了
//计数减1
-- _nParked ;
//重置成0
_Event = 0 ;
//释放锁
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "mutex_unlock");
//让修改立即生效
OrderAccess::fence();
}
guarantee (_Event >= 0, "invariant") ;
}
int os::PlatformEvent::park(jlong millis) {
guarantee (_nParked == 0, "invariant") ;
int v ;
for (;;) {
v = _Event ;
//将其原子的减1
if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
}
guarantee (v >= 0, "invariant") ;
if (v != 0) return OS_OK ;
//计算等待的时间
struct timespec abst;
compute_abstime(&abst, millis);
int ret = OS_TIMEOUT;
//获取锁
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
guarantee (_nParked == 0, "invariant") ;
//计数加1
++_nParked ;
while (_Event < 0) {
//让线程休眠,底层是pthread_cond_timedwait
status = os::Linux::safe_cond_timedwait(_cond, _mutex, &abst);
//WorkAroundNPTLTimedWaitHang的值默认是1
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (_cond);
pthread_cond_init (_cond, os::Linux::condAttr()) ;
}
//被中断后就返回EINTR,正常被唤醒就返回0,另外两个是等待超时
assert_status(status == 0 || status == EINTR ||
status == ETIME || status == ETIMEDOUT,
status, "cond_timedwait");
//FilterSpuriousWakeups默认是true
if (!FilterSpuriousWakeups) break ; // previous semantics
//如果超时了则退出循环
if (status == ETIME || status == ETIMEDOUT) break ;
}
--_nParked ;
if (_Event >= 0) {
ret = OS_OK;
}
_Event = 0 ;
//解锁
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "mutex_unlock");
assert (_nParked == 0, "invariant") ;
//让修改立即生效
OrderAccess::fence();
return ret;
}
unpark方法
unpark用于唤醒某个被park方法阻塞的线程,其实现如下:
void os::PlatformEvent::unpark() {
//将其原子的置为1,如果原来就是1,说明已经unpark过了,直接返回
if (Atomic::xchg(1, &_Event) >= 0) return;
//获取锁
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
int AnyWaiters = _nParked;
assert(AnyWaiters == 0 || AnyWaiters == 1, "invariant");
//WorkAroundNPTLTimedWaitHang默认是true
if (AnyWaiters != 0 && WorkAroundNPTLTimedWaitHang) {
AnyWaiters = 0;
//发信号唤醒该线程,被唤醒后将_nParked置为0
pthread_cond_signal(_cond);
}
//释放锁
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "mutex_unlock");
if (AnyWaiters != 0) {
//pthread_cond_signal不要求获取锁,此处再次唤醒
status = pthread_cond_signal(_cond);
assert_status(status == 0, status, "cond_signal");
}
}
4 总结
lockSupport中park和unpark都是依赖于Parker类来实现,linux中parker类的实现主要是维护了一个许可_counter变量,利用posix(Portable Operating System Interface of UNIX)thread api去实现线程的休眠与唤醒。
Synchronized中依赖于ParkerEvent类,其中linux的实现,主要是维护_Event的值,然后同样是用pthread_mutex_lock进行加锁,pthread_cond_wait进行线程休眠,按注释说,后续会用parkerEvent替代parker类。
参考链接
LockSupport 以及 park、unpark 方法源码分析
LockSupport的park和unpark的原理
Hotspot Parker和ParkEvent 源码解析