GCD分析(中)
同步函数死锁
死锁现象
- 主线程因为
同步函数的原因
等着先执⾏任务 - 主队列等着
主线程的任务执⾏完毕
再执⾏⾃⼰的任务 -
主队列
和主线程
相互等待会造成死锁
同步函数和异步函数的区别
- 能否开辟线程
- 任务的回调是否具备异步性或同步性
同步串行死锁底层源码分析
- 全局搜索
dispatch_sync(dis
DISPATCH_NOINLINE
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
#endif // __BLOCKS__
- 查看
_dispatch_sync_f
方法 - 查看
_dispatch_sync_f_inline
方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {
// 查看死锁方法
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
- 查看
_dispatch_barrier_sync_f
方法 - 查看
_dispatch_barrier_sync_f_inline
方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
//查看_dispatch_sync_f_slow方法,死锁的时候会报这个异常
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
死锁时报错
死锁真正报错的方法是__DISPATCH_WAIT_FOR_QUEUE__
- 查看
_dispatch_sync_f_slow
方法 - 查看
__DISPATCH_WAIT_FOR_QUEUE__
DISPATCH_NOINLINE
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
uint64_t dq_state = _dispatch_wait_prepare(dq);
// 查看死锁条件
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
// 死锁产生在这里
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
// Blocks submitted to the main thread MUST run on the main thread, and
// dispatch_async_and_wait also executes on the remote context rather than
// the current thread.
//
// For both these cases we need to save the frame linkage for the sake of
// _dispatch_async_and_wait_invoke
_dispatch_thread_frame_save_state(&dsc->dsc_dtf);
if (_dq_state_is_suspended(dq_state) ||
_dq_state_is_base_anon(dq_state)) {
dsc->dc_data = DISPATCH_WLH_ANON;
} else if (_dq_state_is_base_wlh(dq_state)) {
dsc->dc_data = (dispatch_wlh_t)dq;
} else {
_dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
}
...... //省略代码
里面的日志和我们崩溃的死锁最后执行__DISPATCH_WAIT_FOR_QUEUE__
的截图日志是一样的,意味着死锁就在这里,你调用的队列被当前线程持有
-
产生死锁的
image.pngdsc_waiter
是从外面传进来的,返回去查看
-
查看
_dispatch_tid_self()
是tid
#define _dispatch_tid_self() ((dispatch_tid)_dispatch_thread_port())
- 由死锁判断条件
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter)))
,去查看dq_state
状态,调用的是_dq_state_drain_locked_by
方法
DISPATCH_ALWAYS_INLINE
static inline bool
_dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
{
return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
}
- 查看
_dispatch_lock_is_locked_by
方法
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// equivalent to _dispatch_lock_owner(lock_value) == tid
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
lock_value = tid
等于0,lock_value ^ tid = 0 & DLOCK_OWNER_MASK
才等于0
dq_state
和dsc->dsc_waiter
代表这两个值相同
才发生死锁
同步函数任务同步
同步函数全局并发队列
底层源码分析
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
-
_dispatch_sync_invoke_and_complete
方法传参以func
开头,为什么这么写?我们进去探索
DISPATCH_NOINLINE
static void
_dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
_dispatch_trace_item_complete(dc);
_dispatch_lane_non_barrier_complete(dq, 0);
}
- 查看
DISPATCH_TRACE_ARG
方法
// 把逗号封装到这里,意味着arg是可选参数,根据条件来控制
#define DISPATCH_TRACE_ARG(arg) , arg
-
同步函数全局并发队列
会执行到_dispatch_sync_f_inline
方法中的哪个方法?我们不得而知,这里就添加符号断点
来探索
image.png
-
下面探索执行到
image.png_dispatch_sync_f_slow
方法中的哪个方法?依然是添加符号断点
来探索
-
跳过断点继续执行,发现刚才添加的两个符号断点并没有执行到
- 那么应该就是执行到
_dispatch_sync_f_slow
方法中的_dispatch_sync_function_invoke
,我们重新添加符号断点
发现确实执行到这里。
DISPATCH_NOINLINE
static void
_dispatch_sync_function_invoke(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
}
- 查看
_dispatch_sync_function_invoke_inline
方法,执行的是_dispatch_client_callout
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}
-
bt
查看顿栈信息
异步函数分析上
异步函数并发队列
底层源码分析
- 查看
dispatch_async
方法
#ifdef __BLOCKS__
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
#endif
- 查看
_dispatch_continuation_async
方法 - 查看
dx_push
方法
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
-
查看
image.pngdq_push
方法
-
查看
_dispatch_lane_concurrent_push
方法
DISPATCH_NOINLINE
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
// <rdar://problem/24738102&24743140> reserving non barrier width
// doesn't fail if only the ENQUEUED bit is set (unlike its barrier
// width equivalent), so we have to check that this thread hasn't
// enqueued anything ahead of this call or we can break ordering
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
// barrier表示栅栏,控制流程
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
_dispatch_lane_push(dq, dou, qos);
}
- 查看
_dispatch_lane_push
方法,可以添加符号断点
查看执行流程
DISPATCH_NOINLINE
void
_dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
dispatch_wakeup_flags_t flags = 0;
struct dispatch_object_s *prev;
if (unlikely(_dispatch_object_is_waiter(dou))) {
return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
}
dispatch_assert(!_dispatch_object_is_global(dq));
qos = _dispatch_queue_push_qos(dq, qos);
prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);
if (unlikely(os_mpsc_push_was_empty(prev))) {
_dispatch_retain_2_unsafe(dq);
flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
} else if (unlikely(_dispatch_queue_need_override(dq, qos))) {
_dispatch_retain_2_unsafe(dq);
flags = DISPATCH_WAKEUP_CONSUME_2;
}
os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);
if (flags) {
return dx_wakeup(dq, qos, flags);
}
}
- 查看
dx_wakeup
方法
#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
- 查看
dq_wakeup
方法,执行的是_dispatch_lane_wakeup
方法
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,
);
- 查看
_dispatch_lane_wakeup
方法 - 查看
_dispatch_queue_wakeup
方法
DISPATCH_NOINLINE
void
_dispatch_queue_wakeup(dispatch_queue_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
{
dispatch_queue_t dq = dqu._dq;
uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;
dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);
if (target && !(flags & DISPATCH_WAKEUP_CONSUME_2)) {
_dispatch_retain_2(dq);
flags |= DISPATCH_WAKEUP_CONSUME_2;
}
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
dispatch_assert(dx_metatype(dq) == _DISPATCH_SOURCE_TYPE);
qos = _dispatch_queue_wakeup_qos(dq, qos);
// 执行到_dispatch_lane_class_barrier_complete方法
return _dispatch_lane_class_barrier_complete(upcast(dq)._dl, qos,
flags, target, DISPATCH_QUEUE_SERIAL_DRAIN_OWNED);
}
...... //省略代码
- 查看
_dispatch_lane_class_barrier_complete
方法
DISPATCH_NOINLINE
static void
_dispatch_lane_class_barrier_complete(dispatch_lane_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target,
uint64_t owned)
{
uint64_t old_state, new_state, enqueue;
dispatch_queue_t tq;
if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
tq = _dispatch_mgr_q._as_dq;
enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
} else if (target) {
tq = (target == DISPATCH_QUEUE_WAKEUP_TARGET) ? dq->do_targetq : target;
enqueue = DISPATCH_QUEUE_ENQUEUED;
} else {
tq = NULL;
enqueue = 0;
}
again:
// os_atomic_rmw_loop2o产生递归调用
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
if (unlikely(_dq_state_needs_ensure_ownership(old_state))) {
_dispatch_event_loop_ensure_ownership((dispatch_wlh_t)dq);
_dispatch_queue_move_to_contended_sync(dq->_as_dq);
os_atomic_rmw_loop_give_up(goto again);
}
...... // 省略代码
if (_dq_state_received_override(old_state)) {
// Ensure that the root queue sees that this thread was overridden.
_dispatch_set_basepri_override_qos(_dq_state_max_qos(old_state));
}
if (tq) {
if (likely((old_state ^ new_state) & enqueue)) {
dispatch_assert(_dq_state_is_enqueued(new_state));
dispatch_assert(flags & DISPATCH_WAKEUP_CONSUME_2);
// 这里最终会执行到_dispatch_root_queue_push
return _dispatch_queue_push_queue(tq, dq, new_state);
}
#if HAVE_PTHREAD_WORKQUEUE_QOS
if (_dq_state_should_override(new_state)) {
return _dispatch_queue_wakeup_with_override(dq, new_state, flags);
}
#endif
...... // 省略代码
- 查看
_dispatch_root_queue_push
方法 - 查看
_dispatch_root_queue_push_inline
方法
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
dispatch_object_t _head, dispatch_object_t _tail, int n)
{
struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
return _dispatch_root_queue_poke(dq, n, 0);
}
}
- 查看
_dispatch_root_queue_poke
方法 - 查看
_dispatch_root_queue_poke_slow
方法
DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
int remaining = n;
#if !defined(_WIN32)
int r = ENOSYS;
#endif
// 单例底层方法
_dispatch_root_queues_init();
_dispatch_debug_root_queue(dq, __func__);
_dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
...... //省略代码
_dispatch_root_queue_poke_slow
方法中调用了_dispatch_root_queues_init
,异步函数分析先暂停到这里,下面先分析单例底层原理
......
单例底层原理
- 查看
_dispatch_root_queues_init
方法
static inline void
_dispatch_root_queues_init(void)
{
dispatch_once_f(&_dispatch_root_queues_pred, NULL,
_dispatch_root_queues_init_once);
}
GCD单例写法
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
});
- 查看
dispatch_once
方法
#ifdef __BLOCKS__
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
// 用val做一个判断,来创建单例
dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
#endif
- 查看
dispatch_once_f
方法
DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
if (likely(v == DLOCK_ONCE_DONE)) {
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
#endif
#endif
// 添加锁
if (_dispatch_once_gate_tryenter(l)) {
return _dispatch_once_callout(l, ctxt, func);
}
// _dispatch_once_wait 等待检测 DLOCK_ONCE_DONE
return _dispatch_once_wait(l);
}
-
_dispatch_once_gate_tryenter
上锁
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
// 原子操作,线程锁
return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
- 查看
_dispatch_once_callout
方法,回调block
DISPATCH_NOINLINE
static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
dispatch_function_t func)
{
_dispatch_client_callout(ctxt, func);
_dispatch_once_gate_broadcast(l);
}
-
_dispatch_once_gate_broadcast
方法解锁
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
dispatch_lock value_self = _dispatch_lock_value_for_self();
uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
v = _dispatch_once_mark_quiescing(l);
#else
// 设值done,防止单例下次重新初始化
v = _dispatch_once_mark_done(l);
#endif
if (likely((dispatch_lock)v == value_self)) return;
_dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}
-
_dispatch_once_mark_done
设置DLOCK_ONCE_DONE
DISPATCH_ALWAYS_INLINE
static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}
- 查看
_dispatch_once_wait
,等待检测DLOCK_ONCE_DONE
void
_dispatch_once_wait(dispatch_once_gate_t dgo)
{
dispatch_lock self = _dispatch_lock_value_for_self();
uintptr_t old_v, new_v;
#if HAVE_UL_UNFAIR_LOCK || HAVE_FUTEX
dispatch_lock *lock = &dgo->dgo_gate.dgl_lock;
#endif
uint32_t timeout = 1;
...... //省略代码
异步函数分析下
继续上面异步函数分析
,上面我们探索到_dispatch_root_queues_init -> _dispatch_root_queues_init_once
方法
static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
_dispatch_fork_becomes_unsafe();
#if DISPATCH_USE_INTERNAL_WORKQUEUE
size_t i;
for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
_dispatch_root_queue_init_pthread_pool(&_dispatch_root_queues[i], 0,
_dispatch_root_queues[i].dq_priority);
}
#else
int wq_supported = _pthread_workqueue_supported();
int r = ENOTSUP;
if (!(wq_supported & WORKQ_FEATURE_MAINTENANCE)) {
DISPATCH_INTERNAL_CRASH(wq_supported,
"QoS Maintenance support required");
}
#if DISPATCH_USE_KEVENT_SETUP
struct pthread_workqueue_config cfg = {
.version = PTHREAD_WORKQUEUE_CONFIG_VERSION,
.flags = 0,
.workq_cb = 0,
.kevent_cb = 0,
.workloop_cb = 0,
.queue_serialno_offs = dispatch_queue_offsets.dqo_serialnum,
#if PTHREAD_WORKQUEUE_CONFIG_VERSION >= 2
.queue_label_offs = dispatch_queue_offsets.dqo_label,
#endif
};
#endif
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunreachable-code"
if (unlikely(!_dispatch_kevent_workqueue_enabled)) {
#if DISPATCH_USE_KEVENT_SETUP
// 执行_dispatch_worker_thread2
cfg.workq_cb = _dispatch_worker_thread2;
r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
r = _pthread_workqueue_init(_dispatch_worker_thread2,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
#if DISPATCH_USE_KEVENT_WORKLOOP
} else if (wq_supported & WORKQ_FEATURE_WORKLOOP) {
#if DISPATCH_USE_KEVENT_SETUP
// 执行_dispatch_worker_thread2
cfg.workq_cb = _dispatch_worker_thread2;
cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
cfg.workloop_cb = (pthread_workqueue_function_workloop_t) _dispatch_workloop_worker_thread;
r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
// 执行_dispatch_worker_thread2
r = _pthread_workqueue_init_with_workloop(_dispatch_worker_thread2,
(pthread_workqueue_function_kevent_t)
_dispatch_kevent_worker_thread,
(pthread_workqueue_function_workloop_t)
_dispatch_workloop_worker_thread,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
......
image.png
让_dispatch_root_queues_init_once
执行一次的任务包装在 _dispatch_worker_thread2
里面,其实包装在pthread中API
中,GCD是封装了pthread
。
这里的_pthread_workqueue_init_with_workloop
工作循环调起,并不是及时调用的,而是受我们当前OS
管控。
- 回到
_dispatch_root_queue_poke_slow
方法
DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
//默认传1 全局并发队列创建1个线程
int remaining = n;
#if !defined(_WIN32)
int r = ENOSYS;
#endif
_dispatch_root_queues_init();
_dispatch_debug_root_queue(dq, __func__);
//runtime相关处理
_dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
#endif
{
//全局GLOBAL队列 创建线程
_dispatch_root_queue_debug("requesting new worker thread for global "
"queue: %p", dq);
// 创造线程执行
r = _pthread_workqueue_addthreads(remaining,
_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
(void)dispatch_assume_zero(r);
return;
}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
...... //省略代码
//如果是普通的 进入do while循环
//remaining空余的数量
int can_request, t_count;
// seq_cst with atomic store to tail <rdar://problem/16932833>
t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
do {
can_request = t_count < floor ? 0 : t_count - floor;
//大于抛异常
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
remaining, can_request);
os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
remaining = can_request;
}
//变0抛异常
if (remaining == 0) {
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
} while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire));
//dgq_thread_pool_size 标记为1。
...... //省略代码
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
#if DISPATCH_DEBUG
unsigned dwStackSize = 0;
#else
//到底开多大
unsigned dwStackSize = 64 * 1024;
#endif
...... //省略代码
-
dgq_thread_pool_size
标记为1
struct dispatch_queue_global_s _dispatch_mgr_root_queue = {
DISPATCH_GLOBAL_OBJECT_HEADER(queue_global),
.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
.do_ctxt = &_dispatch_mgr_root_queue_pthread_context,
.dq_label = "com.apple.root.libdispatch-manager",
.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
.dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER |
DISPATCH_PRIORITY_SATURATED_OVERRIDE,
.dq_serialnum = 3,
.dgq_thread_pool_size = 1,
};
DISPATCH_QUEUE_WIDTH_POOL
#define DISPATCH_QUEUE_WIDTH_FULL_BIT 0x0020000000000000ull
#define DISPATCH_QUEUE_WIDTH_FULL 0x1000ull
#define DISPATCH_QUEUE_WIDTH_POOL (DISPATCH_QUEUE_WIDTH_FULL - 1)
#define DISPATCH_QUEUE_WIDTH_MAX (DISPATCH_QUEUE_WIDTH_FULL - 2)
#define DISPATCH_QUEUE_USES_REDIRECTION(width) \
({ uint16_t _width = (width); \
_width > 1 && _width < DISPATCH_QUEUE_WIDTH_POOL; })
-
image.png全局队列
比并发队列
大1
-
通过
os_atomic_inc2o
自增加++
(void)os_atomic_inc2o(dq, dgq_thread_pool_size, release);
-
DISPATCH_WORKQ_MAX_PTHREAD_COUNT
线程池最大数量
#ifndef DISPATCH_WORKQ_MAX_PTHREAD_COUNT
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
#endif
unsigned dwStackSize = 64 * 1024;
最大开辟线程数量1GB = 1024*1024/16kb = 1024*64
异步函数调用流程_dispatch_worker_thread2 ->_dispatch_root_queue_drain->_dispatch_async_redirect_invoke->_dispatch_continuation_pop->_dispatch_client_callout->_dispatch_call_block_and_release