11--多线程探索08--GCD源码之dispatch_once
2021-07-28 本文已影响0人
修_远
概述
dispatch_once能保证任务只会被执行一次,即使同时多线程调用也是线程安全的。常用于创建单例、swizzeld method等功能。
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
//创建单例、method swizzled或其他任务
});
源码(老)
在10之后的源码中,隐藏了很多实现细节,不利于解读过程,可以先看一份早期的源码:
void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) {
struct _dispatch_once_waiter_s * volatile *vval =
(struct _dispatch_once_waiter_s**)val;
struct _dispatch_once_waiter_s dow = { NULL, 0 };
struct _dispatch_once_waiter_s *tail, *tmp;
_dispatch_thread_semaphore_t sema;
if (dispatch_atomic_cmpxchg(vval, NULL, &dow, acquire)) {
_dispatch_client_callout(ctxt, func);
dispatch_atomic_maximally_synchronizing_barrier();
// above assumed to contain release barrier
tmp = dispatch_atomic_xchg(vval, DISPATCH_ONCE_DONE, relaxed);
tail = &dow;
while (tail != tmp) {
while (!tmp->dow_next) {
dispatch_hardware_pause();
}
sema = tmp->dow_sema;
tmp = (struct _dispatch_once_waiter_s*)tmp->dow_next;
_dispatch_thread_semaphore_signal(sema);
}
} else {
dow.dow_sema = _dispatch_get_thread_semaphore();
tmp = *vval;
for (;;) {
if (tmp == DISPATCH_ONCE_DONE) {
break;
}
if (dispatch_atomic_cmpxchgvw(vval, tmp, &dow, &tmp, release)) {
dow.dow_next = tmp;
_dispatch_thread_semaphore_wait(dow.dow_sema);
break;
}
}
_dispatch_put_thread_semaphore(dow.dow_sema);
}
}
构造的链表,这个结构体在最新的源码中是找不到的
struct _dispatch_once_waiter_s {
truevolatile struct _dispatch_once_waiter_s *volatile dow_next;
true_dispatch_thread_semaphore_t dow_sema;
};
block执行的源码,这段代码后面也会分析
_dispatch_client_callout(ctxt, func);
我们都知道dispatch_once
的魅力在于它可以保证代码块只执行一次,且是线程安全的
场景一:第一个线程第一次进入
- 刚进来时:
dispatch_once_t==nil
,if (dispatch_atomic_cmpxchg(vval, NULL, &dow, acquire))
条件成立,进入if流程; - 调用:
_dispatch_client_callout
,执行block; - 调用:
dispatch_atomic_xchg(vval, DISPATCH_ONCE_DONE, relaxed);
,将vval的值更新成DISPATCH_ONCE_DONE
表示任务已完成; - 遍历链表的节点并调用
_dispatch_thread_semaphore_signal
来唤醒等待中的信号量;
场景二:第二个线程进入
在第一个线程结束之前,第二个线程就已经进来了,但因为dispatch_atomic_cmpxchg(vval, NULL, &dow, acquire)
里面的流程都是原子操作,原子表示最小的操作,不可分割的操作,所以其他进来的线程需要进入else流程
。
-
if (dispatch_atomic_cmpxchg(vval, NULL, &dow, acquire))
条件不成立,进入else流程
; - 获取当前线程信号量:
dow.dow_sema = _dispatch_get_thread_semaphore();
; - 开始无限循环:
for (;;)
; - 阻塞线程:
_dispatch_thread_semaphore_wait(dow.dow_sema);
- 循环退出条件:
if (tmp == DISPATCH_ONCE_DONE) { break; }
,在场景一中有最后一步,当任务完成时,会vval的值更新成DISPATCH_ONCE_DONE
,然后遍历链表中的所有节点,并并调用_dispatch_thread_semaphore_signal
来唤醒等待中的信号量,这个时候,循环就会退出。 - 更新当前线程信号量:
_dispatch_put_thread_semaphore(dow.dow_sema)
场景三:同一个线程第二次进入
当场景一完成之后,vval的值为DISPATCH_ONCE_DONE
,这个时候if (dispatch_atomic_cmpxchg(vval, NULL, &dow, acquire))
条件就不成立,同样会走到else流程
。
- 场景一已完成,走进for循环,然后立马跳出循环,更新线程信号量,啥都没做;
- 场景一未完成,跟场景二的流程一致,进入for循环,直到vval的值更新成
DISPATCH_ONCE_DONE
。
源码(新)
在新的源码中看不到信号量的作用了。新的源码中用了更多的宏,流程更加抽象。但主体思路变化不大。
源码实现
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
if (likely(v == DLOCK_ONCE_DONE)) {
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
#endif
#endif
if (_dispatch_once_gate_tryenter(l)) {
return _dispatch_once_callout(l, ctxt, func);
}
return _dispatch_once_wait(l);
}
- 先校验once状态是否为
DLOCK_ONCE_DONE
,如果是,则直接返回,啥也不做,否则进入第二步; - 条件判断:
_dispatch_once_gate_tryenter(l)
,其实内部实现也是跟旧的源码差别不大:os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED, (uintptr_t)_dispatch_lock_value_for_self(), relaxed);
; - 条件通过则调用:
_dispatch_once_callout(l, ctxt, func);
,这个流程跟旧源码的if流程
一模一样。先调用block,然后广播信号量状态改变的消息; - 其他条件不通过时,则进入:
_dispatch_once_wait(l);
流程,这个流程和else流程
一致;
源码定义
为了更加清晰的认识新的源码,将宏定义展开、函数嵌套展开之后看过程。整体的流程仍然是完全一致,只是将信号量改成了iOS10之后的unfair_lock
。
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
if (likely(v == DLOCK_ONCE_DONE)) {
return;
}
if (os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
(uintptr_t)_dispatch_lock_value_for_self(), relaxed)) {
//_dispatch_once_callout(l, ctxt, func);
func(ctxt);
dispatch_lock value_self = _dispatch_lock_value_for_self();
uintptr_t v;
v = _dispatch_once_mark_done(l);
if (likely((dispatch_lock)v == value_self)) return;
_dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
return;
}
// _dispatch_once_wait
dispatch_lock self = _dispatch_lock_value_for_self();
uintptr_t old_v, new_v;
uint32_t timeout = 1;
for (;;) {
os_atomic_rmw_loop(&dgo->dgo_once, old_v, new_v, relaxed, {
if (likely(old_v == DLOCK_ONCE_DONE)) {
os_atomic_rmw_loop_give_up(return);
}
new_v = old_v | (uintptr_t)DLOCK_WAITERS_BIT;
if (new_v == old_v) os_atomic_rmw_loop_give_up(break);
});
if (unlikely(_dispatch_lock_is_locked_by((dispatch_lock)old_v, self))) {
DISPATCH_CLIENT_CRASH(0, "trying to lock recursively");
}
_dispatch_unfair_lock_wait(lock, (dispatch_lock)new_v, 0,
DLOCK_LOCK_NONE);
(void)timeout;
}
}
定义
dispatch_once_t
typedef intptr_t dispatch_once_t;
static dispatch_once_t _dispatch_build_pred;
dispatch_once_gate_s, *dispatch_once_gate_t
typedef struct dispatch_once_gate_s {
union {
dispatch_gate_s dgo_gate;
uintptr_t dgo_once;
};
} dispatch_once_gate_s, *dispatch_once_gate_t;
宽度 acquire = __mo_acquire = 2
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
rename everything to the wide variants
重命名为指定宽度的变量 只需要考虑两位即可
enum class memory_order : __memory_order_underlying_t {
relaxed = __mo_relaxed,
consume = __mo_consume,
acquire = __mo_acquire,
release = __mo_release,
acq_rel = __mo_acq_rel,
seq_cst = __mo_seq_cst
};
enum __legacy_memory_order {
__mo_relaxed,
__mo_consume,
__mo_acquire,
__mo_release,
__mo_acq_rel,
__mo_seq_cst
};
宏定义
DLOCK_GATE_UNLOCKED
#define DLOCK_GATE_UNLOCKED ((dispatch_lock)0)
0
DLOCK_ONCE_UNLOCKED
#define DLOCK_ONCE_UNLOCKED ((uintptr_t)0)
0x00
0b0000 0000
DLOCK_ONCE_DONE
#define DLOCK_ONCE_DONE (~(uintptr_t)0)
0xff
0b1111 1111
DLOCK_FAILED_TRYLOCK_BIT
#define DLOCK_FAILED_TRYLOCK_BIT ((dispatch_lock)0x00000002)
0x02
0b0000 0010
DISPATCH_NOESCAPE
#define DISPATCH_NOESCAPE __attribute__((__noescape__))
DISPATCH_EXPECT
#define DISPATCH_EXPECT(x, v) __builtin_expect((x), (v))
函数定义
DISPATCH_ONCE_MAKE_GEN(gen)
(((gen) << 2) + DLOCK_FAILED_TRYLOCK_BIT)
左移两位
+ 0b0000 0010
DISPATCH_ONCE_IS_GEN(gen)
(((gen) & 3) == DLOCK_FAILED_TRYLOCK_BIT)
& 0b0000 0011
比较 0b0000 0010
_dispatch_once_gate_tryenter
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
l->dgo_once == 0
atomic.h
os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
比较+交换
l->dgo_once 与 DLOCK_ONCE_UNLOCKED 比较
将 _dispatch_lock_value_for_self() 赋值给 l->dgo_once
_dispatch_once_callout流程
_dispatch_client_callout(ctxt, func);
return f(ctxt);
调用block
_dispatch_once_gate_broadcast(l);
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
dispatch_lock value_self = _dispatch_lock_value_for_self();
uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
v = _dispatch_once_mark_quiescing(l);
#else
v = _dispatch_once_mark_done(l);
#endif
if (likely((dispatch_lock)v == value_self)) return;
_dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}
最后两位结果为:最后两位0b10
static inline uintptr_t
_dispatch_once_mark_quiescing(dispatch_once_gate_t dgo)
{
return os_atomic_xchg(&dgo->dgo_once, _dispatch_once_generation(), release);
}
static inline uintptr_t
_dispatch_once_generation(void)
{
uintptr_t value;
value = *(volatile uintptr_t *)_COMM_PAGE_CPU_QUIESCENT_COUNTER;
return (uintptr_t)DISPATCH_ONCE_MAKE_GEN(value);
}
最后两位结果为:0b11
static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}
深入浅出 GCD 之 dispatch_once
iOS多线程:GCD源码分析<三>dispatch_once
atomic_cmpxchg