iOS-GCD原理分析(二)

2021-08-11  本文已影响0人  似水流年_9ebe

前言

iOS-GCD原理分析(一)这篇文章我们分析了GCD的函数与队列,GCD的源码还未分析完,我们接着分析。

同步函数死锁

我们知道GCD中有同步和异步函数,那么它们有什么区别,我们来分析下。

我们可以从能否开辟线程任务的回调是否具备异步性,同步性是否阻塞死锁,我们根据这些问题来分析下源码。

我们在源码中搜索dispatch_sync(dis关键字,找到如下源码:

void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    uintptr_t dc_flags = DC_FLAG_BLOCK;
    if (unlikely(_dispatch_block_has_private_data(work))) {
        return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
    }
    _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}

_dispatch_sync_f_dispatch_Block_invoke对任务进行了封装,我们在搜索下_dispatch_sync_f(dis这个,如下:

static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
        uintptr_t dc_flags)
{
    _dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}

这里面调用了_dispatch_sync_f_inline函数,我们再看下它的代码,如下:

static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    if (likely(dq->dq_width == 1)) {
        return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
    }

    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
        DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }

    dispatch_lane_t dl = upcast(dq)._dl;
    // Global concurrent queues and queues bound to non-dispatch threads
    // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
        return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
    }

    if (unlikely(dq->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
    _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
            _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}

这里dq_width == 1 是串行队列,我们再看下_dispatch_barrier_sync_f这个函数的代码,如下:

static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    _dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}

再看_dispatch_barrier_sync_f_inline的源码,如下:

static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    dispatch_tid tid = _dispatch_tid_self();

    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
        DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }

    dispatch_lane_t dl = upcast(dq)._dl;
    // The more correct thing to do would be to merge the qos of the thread
    // that just acquired the barrier lock into the queue state.
    //
    // However this is too expensive for the fast path, so skip doing it.
    // The chosen tradeoff is that if an enqueue on a lower priority thread
    // contends with this fast path, this thread may receive a useless override.
    //
    // Global concurrent queues and queues bound to non-dispatch threads
    // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
        return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
                DC_FLAG_BARRIER | dc_flags);
    }

    if (unlikely(dl->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func,
                DC_FLAG_BARRIER | dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
    _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
            DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                    dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}

这里有调用_dispatch_sync_f_slow这个函数(死锁发生的时候,堆栈中可以看到有调用这个函数),如下图所示:

1
这里发生死锁的时候的堆栈。
真正报错的是下图:
2
我们看下_dispatch_sync_f_slow的源码,如下:
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
        dispatch_function_t func, uintptr_t top_dc_flags,
        dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
    dispatch_queue_t top_dq = top_dqu._dq;
    dispatch_queue_t dq = dqu._dq;
    if (unlikely(!dq->do_targetq)) {
        return _dispatch_sync_function_invoke(dq, ctxt, func);
    }

    pthread_priority_t pp = _dispatch_get_priority();
    struct dispatch_sync_context_s dsc = {
        .dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
        .dc_func     = _dispatch_async_and_wait_invoke,
        .dc_ctxt     = &dsc,
        .dc_other    = top_dq,
        .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
        .dc_voucher  = _voucher_get(),
        .dsc_func    = func,
        .dsc_ctxt    = ctxt,
        .dsc_waiter  = _dispatch_tid_self(),
    };

    _dispatch_trace_item_push(top_dq, &dsc);
    __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

    if (dsc.dsc_func == NULL) {
        // dsc_func being cleared means that the block ran on another thread ie.
        // case (2) as listed in _dispatch_async_and_wait_f_slow.
        dispatch_queue_t stop_dq = dsc.dc_other;
        return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
    }

    _dispatch_introspection_sync_begin(top_dq);
    _dispatch_trace_item_pop(top_dq, &dsc);
    _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
            DISPATCH_TRACE_ARG(&dsc));
}

上面的图有说明DISPATCH_WAIT_FOR_QUEUE是在这个函数报的错,我们来看下源码:

static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
    uint64_t dq_state = _dispatch_wait_prepare(dq);
    if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
        DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
                "dispatch_sync called on queue "
                "already owned by current thread");
    }

    // Blocks submitted to the main thread MUST run on the main thread, and
    // dispatch_async_and_wait also executes on the remote context rather than
    // the current thread.
    //
    // For both these cases we need to save the frame linkage for the sake of
    // _dispatch_async_and_wait_invoke
    _dispatch_thread_frame_save_state(&dsc->dsc_dtf);

    if (_dq_state_is_suspended(dq_state) ||
            _dq_state_is_base_anon(dq_state)) {
        dsc->dc_data = DISPATCH_WLH_ANON;
    } else if (_dq_state_is_base_wlh(dq_state)) {
        dsc->dc_data = (dispatch_wlh_t)dq;
    } else {
        _dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
    }

    if (dsc->dc_data == DISPATCH_WLH_ANON) {
        dsc->dsc_override_qos_floor = dsc->dsc_override_qos =
                (uint8_t)_dispatch_get_basepri_override_qos_floor();
        _dispatch_thread_event_init(&dsc->dsc_event);
    }
    dx_push(dq, dsc, _dispatch_qos_from_pp(dsc->dc_priority));
    _dispatch_trace_runtime_event(sync_wait, dq, 0);
    if (dsc->dc_data == DISPATCH_WLH_ANON) {
        _dispatch_thread_event_wait(&dsc->dsc_event); // acquire
    } else if (!dsc->dsc_wlh_self_wakeup) {
        _dispatch_event_loop_wait_for_ownership(dsc);
    }
    if (dsc->dc_data == DISPATCH_WLH_ANON) {
        _dispatch_thread_event_destroy(&dsc->dsc_event);
        // If _dispatch_sync_waiter_wake() gave this thread an override,
        // ensure that the root queue sees it.
        if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
            _dispatch_set_basepri_override_qos(dsc->dsc_override_qos);
        }
    }
}

其中

if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
        DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
                "dispatch_sync called on queue "
                "already owned by current thread");
    }

这段代码就是报错的代码。这是为什么会发生死锁?
我们来看下死锁的条析。
"dispatch_sync called on queue ,already owned by current thread"这段英文解释的是同步函数调起的队列,已经被当前的线程所持有
dsc->dsc_waiter中的dsc是由_dispatch_sync_f_slow传过来的,
_dispatch_sync_f_slow有段这样的代码

struct dispatch_sync_context_s dsc = {
        .dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
        .dc_func     = _dispatch_async_and_wait_invoke,
        .dc_ctxt     = &dsc,
        .dc_other    = top_dq,
        .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
        .dc_voucher  = _voucher_get(),
        .dsc_func    = func,
        .dsc_ctxt    = ctxt,
        .dsc_waiter  = _dispatch_tid_self(),
    };

dsc_waiter=_dispatch_tid_self(),_dispatch_tid_self*就是当前线程的线程id,如下:

#define _dispatch_tid_self()        ((dispatch_tid)_dispatch_thread_port())

DISPATCH_WAIT_FOR_QUEUE函数中的

    uint64_t dq_state = _dispatch_wait_prepare(dq);

代码dq就是当前的队列,dq_state是当前队列的状态,

static inline bool
_dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
{
    return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
}

_dispatch_lock_is_locked_by这个函数的代码如下:

static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
    // equivalent to _dispatch_lock_owner(lock_value) == tid
    return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}

检查队列的状态和线程id的匹配。
队列的状态与线程id异或运算和DLOCK_OWNER_MASK进行与运算(#define DLOCK_OWNER_MASK ((dispatch_lock)0xfffffffc))
**也就是说要等待的状态和线程id相同,当前在等待状态,又调起了dq(队列),又要去执行,执行的时候,又发现是在等待状态,产生了死锁。 **

同步函数任务同步

并发,为什么还要保持串联执行呢?我们分析下。
其中这段代码

if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
        DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }

    dispatch_lane_t dl = upcast(dq)._dl;
    // Global concurrent queues and queues bound to non-dispatch threads
    // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
        return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
    }

    if (unlikely(dq->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
    _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
            _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));

这段代码是如何执行的,我们利用断点调试,如下图所示:


2

我们看下_dispatch_sync_invoke_and_complete这个函数,它的源码如下:

static void
_dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
        dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
    _dispatch_sync_function_invoke_inline(dq, ctxt, func);
    _dispatch_trace_item_complete(dc);
    _dispatch_lane_non_barrier_complete(dq, 0);
}

DISPATCH_TRACE_ARG(void dc)这里的ARG就是arg,#define DISPATCH_TRACE_ARG(arg) , arg,这里是可选参数。
经过分析之后,发现走到了
_dispatch_sync_f_slow这里,接着调用了_dispatch_sync_function_invoke*这个函数,经过分析,来到了这里

static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
        dispatch_function_t func)
{
    dispatch_thread_frame_s dtf;
    _dispatch_thread_frame_push(&dtf, dq);
    _dispatch_client_callout(ctxt, func);
    _dispatch_perfmon_workitem_inc();
    _dispatch_thread_frame_pop(&dtf);
}

在这里调用_dispatch_client_callout执行回调,然后出来。
同步函数:任务的执行和函数放在一块,中间处理下状态,这就是为什么同步函数可以立马执行。

异步函数分析上

同步函数分析完,我们再来分析下异步函数。
对于异步函数,我们需要关注它的创建线程和任务回调的的异步性。
我们在源码中搜下 dispatch_async(dis找到如下代码:

void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME;
    dispatch_qos_t qos;

    qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

我们再看下_dispatch_continuation_async这个函数的代码,如下 :

static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
        dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
    if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
        _dispatch_trace_item_push(dqu, dc);
    }
#else
    (void)dc_flags;
#endif
    return dx_push(dqu._dq, dc, qos);
}

这里的qos就是包装的任务,dx_push这里会调用dq_push
,而dq_push会根据你传过来队列的不同导致不同

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
    .do_type        = DISPATCH_QUEUE_CONCURRENT_TYPE,
    .do_dispose     = _dispatch_lane_dispose,
    .do_debug       = _dispatch_queue_debug,
    .do_invoke      = _dispatch_lane_invoke,

    .dq_activate    = _dispatch_lane_activate,
    .dq_wakeup      = _dispatch_lane_wakeup,
    .dq_push        = _dispatch_lane_concurrent_push,
);

这里是并发的。
搜索_dispatch_lane_concurrent_push找到以下代码:

void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
    // <rdar://problem/24738102&24743140> reserving non barrier width
    // doesn't fail if only the ENQUEUED bit is set (unlike its barrier
    // width equivalent), so we have to check that this thread hasn't
    // enqueued anything ahead of this call or we can break ordering
    if (dq->dq_items_tail == NULL &&
            !_dispatch_object_is_waiter(dou) &&
            !_dispatch_object_is_barrier(dou) &&
            _dispatch_queue_try_acquire_async(dq)) {
        return _dispatch_continuation_redirect_push(dq, dou, qos);
    }

    _dispatch_lane_push(dq, dou, qos);
}

_dispatch_lane_push这个函数,并发和串行的都会来到这里。

if (dq->dq_items_tail == NULL &&
            !_dispatch_object_is_waiter(dou) &&
            !_dispatch_object_is_barrier(dou) &&
            _dispatch_queue_try_acquire_async(dq)) {
        return _dispatch_continuation_redirect_push(dq, dou, qos);
    }

区别来于这里栅栏的处理。
_dispatch_lane_push的源码如下:

void
_dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
    dispatch_wakeup_flags_t flags = 0;
    struct dispatch_object_s *prev;

    if (unlikely(_dispatch_object_is_waiter(dou))) {
        return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
    }

    dispatch_assert(!_dispatch_object_is_global(dq));
    qos = _dispatch_queue_push_qos(dq, qos);

    // If we are going to call dx_wakeup(), the queue must be retained before
    // the item we're pushing can be dequeued, which means:
    // - before we exchange the tail if we have to override
    // - before we set the head if we made the queue non empty.
    // Otherwise, if preempted between one of these and the call to dx_wakeup()
    // the blocks submitted to the queue may release the last reference to the
    // queue when invoked by _dispatch_lane_drain. <rdar://problem/6932776>

    prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);
    if (unlikely(os_mpsc_push_was_empty(prev))) {
        _dispatch_retain_2_unsafe(dq);
        flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
    } else if (unlikely(_dispatch_queue_need_override(dq, qos))) {
        // There's a race here, _dispatch_queue_need_override may read a stale
        // dq_state value.
        //
        // If it's a stale load from the same drain streak, given that
        // the max qos is monotonic, too old a read can only cause an
        // unnecessary attempt at overriding which is harmless.
        //
        // We'll assume here that a stale load from an a previous drain streak
        // never happens in practice.
        _dispatch_retain_2_unsafe(dq);
        flags = DISPATCH_WAKEUP_CONSUME_2;
    }
    os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);
    if (flags) {
        return dx_wakeup(dq, qos, flags);
    }
}

我们通过符号断点_dispatch_lane_push_dispatch_lane_push_waiterdx_wakeupdq_wakeup通过查找是_dispatch_lane_wakeup这个函数,我们对它进行符号断点,发现进入了_dispatch_lane_wakeup这个函数。
我们看下这个函数的代码:

void
_dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
        dispatch_wakeup_flags_t flags)
{
    dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;

    if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
        return _dispatch_lane_barrier_complete(dqu, qos, flags);
    }
    if (_dispatch_queue_class_probe(dqu)) {
        target = DISPATCH_QUEUE_WAKEUP_TARGET;
    }
    return _dispatch_queue_wakeup(dqu, qos, flags, target);
}
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
        return _dispatch_lane_barrier_complete(dqu, qos, flags);
    }

如果添加了barrier函数会走这个流程。
之后会调用_dispatch_queue_wakeup这个函数

void
_dispatch_queue_wakeup(dispatch_queue_class_t dqu, dispatch_qos_t qos,
        dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
{
    dispatch_queue_t dq = dqu._dq;
    uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;
    dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);

    if (target && !(flags & DISPATCH_WAKEUP_CONSUME_2)) {
        _dispatch_retain_2(dq);
        flags |= DISPATCH_WAKEUP_CONSUME_2;
    }

    if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
        //
        // _dispatch_lane_class_barrier_complete() is about what both regular
        // queues and sources needs to evaluate, but the former can have sync
        // handoffs to perform which _dispatch_lane_class_barrier_complete()
        // doesn't handle, only _dispatch_lane_barrier_complete() does.
        //
        // _dispatch_lane_wakeup() is the one for plain queues that calls
        // _dispatch_lane_barrier_complete(), and this is only taken for non
        // queue types.
        //
        dispatch_assert(dx_metatype(dq) == _DISPATCH_SOURCE_TYPE);
        qos = _dispatch_queue_wakeup_qos(dq, qos);
        return _dispatch_lane_class_barrier_complete(upcast(dq)._dl, qos,
                flags, target, DISPATCH_QUEUE_SERIAL_DRAIN_OWNED);
    }

    if (target) {
        if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
            enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
        }
        qos = _dispatch_queue_wakeup_qos(dq, qos);
        os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
            new_state = _dq_state_merge_qos(old_state, qos);
            if (flags & DISPATCH_WAKEUP_CLEAR_ACTIVATING) {
                // When an event is being delivered to a source because its
                // unote was being registered before the ACTIVATING state
                // had a chance to be cleared, we don't want to fail the wakeup
                // which could lead to a priority inversion.
                //
                // Instead, these wakeups are allowed to finish the pending
                // activation.
                if (_dq_state_is_activating(old_state)) {
                    new_state &= ~DISPATCH_QUEUE_ACTIVATING;
                }
            }
            if (likely(!_dq_state_is_suspended(new_state) &&
                    !_dq_state_is_enqueued(old_state) &&
                    (!_dq_state_drain_locked(old_state) ||
                    enqueue != DISPATCH_QUEUE_ENQUEUED_ON_MGR))) {
                // Always set the enqueued bit for async enqueues on all queues
                // in the hierachy
                new_state |= enqueue;
            }
            if (flags & DISPATCH_WAKEUP_MAKE_DIRTY) {
                new_state |= DISPATCH_QUEUE_DIRTY;
            } else if (new_state == old_state) {
                os_atomic_rmw_loop_give_up(goto done);
            }
        });
#if HAVE_PTHREAD_WORKQUEUE_QOS
    } else if (qos) {
        //
        // Someone is trying to override the last work item of the queue.
        //
        os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
            // Avoid spurious override if the item was drained before we could
            // apply an override
            if (!_dq_state_drain_locked(old_state) &&
                !_dq_state_is_enqueued(old_state)) {
                os_atomic_rmw_loop_give_up(goto done);
            }
            new_state = _dq_state_merge_qos(old_state, qos);
            if (_dq_state_is_base_wlh(old_state) &&
                    !_dq_state_is_suspended(old_state) &&
                    /* <rdar://problem/63179930> */
                    !_dq_state_is_enqueued_on_manager(old_state)) {

                // Always set the enqueued bit for async enqueues on all queues
                // in the hierachy (rdar://62447289)
                //
                // Scenario:
                // - mach channel DM
                // - targetting TQ
                //
                // Thread 1:
                // - has the lock on (TQ), uncontended sync
                // - causes a wakeup at a low QoS on DM, causing it to have:
                //   max_qos = UT, enqueued = 1
                // - the enqueue of DM onto TQ hasn't happened yet.
                //
                // Thread 2:
                // - an incoming IN IPC is being merged on the servicer
                // - DM having qos=UT, enqueud=1, no further enqueue happens,
                //   but we need an extra override and go through this code for
                //   TQ.
                // - this causes TQ to be "stashed", which requires the enqueued
                //   bit set, else try_lock_wlh() will complain and the
                //   wakeup refcounting will be off.
                new_state |= enqueue;
            }

            if (new_state == old_state) {
                os_atomic_rmw_loop_give_up(goto done);
            }
        });

        target = DISPATCH_QUEUE_WAKEUP_TARGET;
#endif // HAVE_PTHREAD_WORKQUEUE_QOS
    } else {
        goto done;
    }

    if (likely((old_state ^ new_state) & enqueue)) {
        dispatch_queue_t tq;
        if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
            // the rmw_loop above has no acquire barrier, as the last block
            // of a queue asyncing to that queue is not an uncommon pattern
            // and in that case the acquire would be completely useless
            //
            // so instead use depdendency ordering to read
            // the targetq pointer.
            os_atomic_thread_fence(dependency);
            tq = os_atomic_load_with_dependency_on2o(dq, do_targetq,
                    (long)new_state);
        } else {
            tq = target;
        }
        dispatch_assert(_dq_state_is_enqueued(new_state));
        return _dispatch_queue_push_queue(tq, dq, new_state);
    }
#if HAVE_PTHREAD_WORKQUEUE_QOS
    if (unlikely((old_state ^ new_state) & DISPATCH_QUEUE_MAX_QOS_MASK)) {
        if (_dq_state_should_override(new_state)) {
            return _dispatch_queue_wakeup_with_override(dq, new_state,
                    flags);
        }
    }
#endif // HAVE_PTHREAD_WORKQUEUE_QOS
done:
    if (likely(flags & DISPATCH_WAKEUP_CONSUME_2)) {
        return _dispatch_release_2_tailcall(dq);
    }
}

我们再通过符号断点,发现,有时会不断的调用_dispatch_lane_push,_dispatch_lane_wakeup,这是后台会不断调用任务导致。
当我们走到_dispatch_lane_class_barrier_complete,我们搜下_dispatch_lane_class_barrier_complete这个函数,代码如下:

static void
_dispatch_lane_class_barrier_complete(dispatch_lane_t dq, dispatch_qos_t qos,
        dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target,
        uint64_t owned)
{
    uint64_t old_state, new_state, enqueue;
    dispatch_queue_t tq;

    if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
        tq = _dispatch_mgr_q._as_dq;
        enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
    } else if (target) {
        tq = (target == DISPATCH_QUEUE_WAKEUP_TARGET) ? dq->do_targetq : target;
        enqueue = DISPATCH_QUEUE_ENQUEUED;
    } else {
        tq = NULL;
        enqueue = 0;
    }

again:
    os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
        if (unlikely(_dq_state_needs_ensure_ownership(old_state))) {
            _dispatch_event_loop_ensure_ownership((dispatch_wlh_t)dq);
            _dispatch_queue_move_to_contended_sync(dq->_as_dq);
            os_atomic_rmw_loop_give_up(goto again);
        }
        new_state  = _dq_state_merge_qos(old_state - owned, qos);
        new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
        if (unlikely(_dq_state_is_suspended(old_state))) {
            if (likely(_dq_state_is_base_wlh(old_state))) {
                new_state &= ~DISPATCH_QUEUE_ENQUEUED;
            }
        } else if (enqueue) {
            if (!_dq_state_is_enqueued(old_state)) {
                new_state |= enqueue;
            }
        } else if (unlikely(_dq_state_is_dirty(old_state))) {
            os_atomic_rmw_loop_give_up({
                // just renew the drain lock with an acquire barrier, to see
                // what the enqueuer that set DIRTY has done.
                // the xor generates better assembly as DISPATCH_QUEUE_DIRTY
                // is already in a register
                os_atomic_xor2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, acquire);
                flags |= DISPATCH_WAKEUP_BARRIER_COMPLETE;
                return dx_wakeup(dq, qos, flags);
            });
        } else {
            new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
        }
    });
    old_state -= owned;
    dispatch_assert(_dq_state_drain_locked_by_self(old_state));
    dispatch_assert(!_dq_state_is_enqueued_on_manager(old_state));

    if (_dq_state_is_enqueued(new_state)) {
        _dispatch_trace_runtime_event(sync_async_handoff, dq, 0);
    }

#if DISPATCH_USE_KEVENT_WORKLOOP
    if (_dq_state_is_base_wlh(old_state)) {
        // - Only non-"du_is_direct" sources & mach channels can be enqueued
        //   on the manager.
        //
        // - Only dispatch_source_cancel_and_wait() and
        //   dispatch_source_set_*_handler() use the barrier complete codepath,
        //   none of which are used by mach channels.
        //
        // Hence no source-ish object can both be a workloop and need to use the
        // manager at the same time.
        dispatch_assert(!_dq_state_is_enqueued_on_manager(new_state));
        if (_dq_state_is_enqueued_on_target(old_state) ||
                _dq_state_is_enqueued_on_target(new_state) ||
                !_dq_state_in_uncontended_sync(old_state)) {
            return _dispatch_event_loop_end_ownership((dispatch_wlh_t)dq,
                    old_state, new_state, flags);
        }
        _dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
        if (flags & DISPATCH_WAKEUP_CONSUME_2) {
            return _dispatch_release_2_tailcall(dq);
        }
        return;
    }
#endif

    if (_dq_state_received_override(old_state)) {
        // Ensure that the root queue sees that this thread was overridden.
        _dispatch_set_basepri_override_qos(_dq_state_max_qos(old_state));
    }

    if (tq) {
        if (likely((old_state ^ new_state) & enqueue)) {
            dispatch_assert(_dq_state_is_enqueued(new_state));
            dispatch_assert(flags & DISPATCH_WAKEUP_CONSUME_2);
            return _dispatch_queue_push_queue(tq, dq, new_state);
        }
#if HAVE_PTHREAD_WORKQUEUE_QOS
        // <rdar://problem/27694093> when doing sync to async handoff
        // if the queue received an override we have to forecefully redrive
        // the same override so that a new stealer is enqueued because
        // the previous one may be gone already
        if (_dq_state_should_override(new_state)) {
            return _dispatch_queue_wakeup_with_override(dq, new_state, flags);
        }
#endif
    }
    if (flags & DISPATCH_WAKEUP_CONSUME_2) {
        return _dispatch_release_2_tailcall(dq);
    }
}

os_atomic_rmw_loop2o这里不断的递归。
这里最终会调用_dispatch_root_queue_push这个函数,代码如下:

void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
#if DISPATCH_USE_KEVENT_WORKQUEUE
    dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
    if (unlikely(ddi && ddi->ddi_can_stash)) {
        dispatch_object_t old_dou = ddi->ddi_stashed_dou;
        dispatch_priority_t rq_overcommit;
        rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;

        if (likely(!old_dou._do || rq_overcommit)) {
            dispatch_queue_global_t old_rq = ddi->ddi_stashed_rq;
            dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
            ddi->ddi_stashed_rq = rq;
            ddi->ddi_stashed_dou = dou;
            ddi->ddi_stashed_qos = qos;
            _dispatch_debug("deferring item %p, rq %p, qos %d",
                    dou._do, rq, qos);
            if (rq_overcommit) {
                ddi->ddi_can_stash = false;
            }
            if (likely(!old_dou._do)) {
                return;
            }
            // push the previously stashed item
            qos = old_qos;
            rq = old_rq;
            dou = old_dou;
        }
    }
#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
    if (_dispatch_root_queue_push_needs_override(rq, qos)) {
        return _dispatch_root_queue_push_override(rq, dou, qos);
    }
#else
    (void)qos;
#endif
    _dispatch_root_queue_push_inline(rq, dou, dou, 1);
}

这里调用了_dispatch_root_queue_push_inline这个函数,代码如下:

static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
        dispatch_object_t _head, dispatch_object_t _tail, int n)
{
    struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
    if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
        return _dispatch_root_queue_poke(dq, n, 0);
    }
}

这里又调用了_dispatch_root_queue_poke这个函数,代码如下:

void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
    if (!_dispatch_queue_class_probe(dq)) {
        return;
    }
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
    if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE))
#endif
    {
        if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) {
            _dispatch_root_queue_debug("worker thread request still pending "
                    "for global queue: %p", dq);
            return;
        }
    }
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
    return _dispatch_root_queue_poke_slow(dq, n, floor);
}

这里调用了_dispatch_root_queue_poke_slow,它的代码如下:

static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
    int remaining = n;
#if !defined(_WIN32)
    int r = ENOSYS;
#endif

    _dispatch_root_queues_init();
    _dispatch_debug_root_queue(dq, __func__);
    _dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);

#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
    if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
#endif
    {
        _dispatch_root_queue_debug("requesting new worker thread for global "
                "queue: %p", dq);
        r = _pthread_workqueue_addthreads(remaining,
                _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
        (void)dispatch_assume_zero(r);
        return;
    }
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
    dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt;
    if (likely(pqc->dpq_thread_mediator.do_vtable)) {
        while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
            _dispatch_root_queue_debug("signaled sleeping worker for "
                    "global queue: %p", dq);
            if (!--remaining) {
                return;
            }
        }
    }

    bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    if (overcommit) {
        os_atomic_add2o(dq, dgq_pending, remaining, relaxed);
    } else {
        if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) {
            _dispatch_root_queue_debug("worker thread request still pending for "
                    "global queue: %p", dq);
            return;
        }
    }

    int can_request, t_count;
    // seq_cst with atomic store to tail <rdar://problem/16932833>
    t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
    do {
        can_request = t_count < floor ? 0 : t_count - floor;
        if (remaining > can_request) {
            _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
                    remaining, can_request);
            os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
            remaining = can_request;
        }
        if (remaining == 0) {
            _dispatch_root_queue_debug("pthread pool is full for root queue: "
                    "%p", dq);
            return;
        }
    } while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
            t_count - remaining, &t_count, acquire));

#if !defined(_WIN32)
    pthread_attr_t *attr = &pqc->dpq_thread_attr;
    pthread_t tid, *pthr = &tid;
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
    if (unlikely(dq == &_dispatch_mgr_root_queue)) {
        pthr = _dispatch_mgr_root_queue_init();
    }
#endif
    do {
        _dispatch_retain(dq); // released in _dispatch_worker_thread
        while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
            if (r != EAGAIN) {
                (void)dispatch_assume_zero(r);
            }
            _dispatch_temporary_resource_shortage();
        }
    } while (--remaining);
#else // defined(_WIN32)
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
    if (unlikely(dq == &_dispatch_mgr_root_queue)) {
        _dispatch_mgr_root_queue_init();
    }
#endif
    do {
        _dispatch_retain(dq); // released in _dispatch_worker_thread
#if DISPATCH_DEBUG
        unsigned dwStackSize = 0;
#else
        unsigned dwStackSize = 64 * 1024;
#endif
        uintptr_t hThread = 0;
        while (!(hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {
            if (errno != EAGAIN) {
                (void)dispatch_assume(hThread);
            }
            _dispatch_temporary_resource_shortage();
        }
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
        if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
            (void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
        }
#endif
        CloseHandle((HANDLE)hThread);
    } while (--remaining);
#endif // defined(_WIN32)
#else
    (void)floor;
#endif // DISPATCH_USE_PTHREAD_POOL
}

_dispatch_root_queues_init这里的函数比较重要,分析它之前,我们先要讲下GCD单例的原理。

单例底层原理

我们看下_dispatch_root_queues_init这个函数的代码,如下所示:

static inline void
_dispatch_root_queues_init(void)
{
    dispatch_once_f(&_dispatch_root_queues_pred, NULL,
            _dispatch_root_queues_init_once);
}

这里其实是单例,我们来分析下。
单例的代码:

 static dispatch_once_t onceToken;
    dispatch_once(&onceToken, ^{
        
    })

我们看下dispatch_once的源码,如下:

void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
    dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}

这里调用的dispatch_once_f这个函数,它的源码如下:

void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
    dispatch_once_gate_t l = (dispatch_once_gate_t)val;

#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
    if (likely(v == DLOCK_ONCE_DONE)) {
        return;
    }
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    if (likely(DISPATCH_ONCE_IS_GEN(v))) {
        return _dispatch_once_mark_done_if_quiesced(l, v);
    }
#endif
#endif
    if (_dispatch_once_gate_tryenter(l)) {
        return _dispatch_once_callout(l, ctxt, func);
    }
    return _dispatch_once_wait(l);
}

这里的val就是onceToken,就是全局静态变量。

dispatch_once_gate_t,强制转的,关或开。

uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
    if (likely(v == DLOCK_ONCE_DONE)) {
        return;
    }

os_atomic_load判断如果做了一次,就不再进来 ,return;
如果没有就进入 了

if (_dispatch_once_gate_tryenter(l)) {
        return _dispatch_once_callout(l, ctxt, func);
    }

这个流程,这里判断是否进去_dispatch_once_gate_tryenter
_dispatch_once_callout又调用了_dispatch_client_callout执地回调。

static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
    return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
            (uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}

这里又调用了os_atomic_cmpxchg这里会进行锁的处理,这里是由线程锁控制。
_dispatch_lock_value_for_self它的代码如下:

static inline dispatch_lock
_dispatch_lock_value_for_self(void)
{
    return _dispatch_lock_value_from_tid(_dispatch_tid_self());
}

所以GCD的单列是多线程安全的
_dispatch_once_callout又调起了_dispatch_once_gate_broadcast,这个函数的代码如下:

static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
    dispatch_lock value_self = _dispatch_lock_value_for_self();
    uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    v = _dispatch_once_mark_quiescing(l);
#else
    v = _dispatch_once_mark_done(l);
#endif
    if (likely((dispatch_lock)v == value_self)) return;
    _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}

这里又调了_dispatch_once_mark_done这个函数,代码如下:

static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
    return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}

这里会值DLOCK_ONCE_DONE进去,下次就不会再进来了。
如果dispatch_once_f里面,如果没有标记DLOCK_ONCE_DONE,_dispatch_once_gate_tryenter也不能进去,那么就会调用_dispatch_once_wait无限制等待开锁。
通过单列底层的分析,* _dispatch_root_queues_init*会让初始化只进来一次。

异步函数分析下

_dispatch_root_queues_init这个函数调用了_dispatch_root_queues_init_once,我们看下它的源码

static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
    _dispatch_fork_becomes_unsafe();
#if DISPATCH_USE_INTERNAL_WORKQUEUE
    size_t i;
    for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
        _dispatch_root_queue_init_pthread_pool(&_dispatch_root_queues[i], 0,
                _dispatch_root_queues[i].dq_priority);
    }
#else
    int wq_supported = _pthread_workqueue_supported();
    int r = ENOTSUP;

    if (!(wq_supported & WORKQ_FEATURE_MAINTENANCE)) {
        DISPATCH_INTERNAL_CRASH(wq_supported,
                "QoS Maintenance support required");
    }

#if DISPATCH_USE_KEVENT_SETUP
    struct pthread_workqueue_config cfg = {
        .version = PTHREAD_WORKQUEUE_CONFIG_VERSION,
        .flags = 0,
        .workq_cb = 0,
        .kevent_cb = 0,
        .workloop_cb = 0,
        .queue_serialno_offs = dispatch_queue_offsets.dqo_serialnum,
#if PTHREAD_WORKQUEUE_CONFIG_VERSION >= 2
        .queue_label_offs = dispatch_queue_offsets.dqo_label,
#endif
    };
#endif

#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunreachable-code"
    if (unlikely(!_dispatch_kevent_workqueue_enabled)) {
#if DISPATCH_USE_KEVENT_SETUP
        cfg.workq_cb = _dispatch_worker_thread2;
        r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
        r = _pthread_workqueue_init(_dispatch_worker_thread2,
                offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
#if DISPATCH_USE_KEVENT_WORKLOOP
    } else if (wq_supported & WORKQ_FEATURE_WORKLOOP) {
#if DISPATCH_USE_KEVENT_SETUP
        cfg.workq_cb = _dispatch_worker_thread2;
        cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
        cfg.workloop_cb = (pthread_workqueue_function_workloop_t) _dispatch_workloop_worker_thread;
        r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
        r = _pthread_workqueue_init_with_workloop(_dispatch_worker_thread2,
                (pthread_workqueue_function_kevent_t)
                _dispatch_kevent_worker_thread,
                (pthread_workqueue_function_workloop_t)
                _dispatch_workloop_worker_thread,
                offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
#endif // DISPATCH_USE_KEVENT_WORKLOOP
#if DISPATCH_USE_KEVENT_WORKQUEUE
    } else if (wq_supported & WORKQ_FEATURE_KEVENT) {
#if DISPATCH_USE_KEVENT_SETUP
        cfg.workq_cb = _dispatch_worker_thread2;
        cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
        r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
        r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread2,
                (pthread_workqueue_function_kevent_t)
                _dispatch_kevent_worker_thread,
                offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
#endif
    } else {
        DISPATCH_INTERNAL_CRASH(wq_supported, "Missing Kevent WORKQ support");
    }
#pragma clang diagnostic pop

    if (r != 0) {
        DISPATCH_INTERNAL_CRASH((r << 16) | wq_supported,
                "Root queue initialization failed");
    }
#endif // DISPATCH_USE_INTERNAL_WORKQUEUE
}

接着我们再看下dispatch_async的调用堆栈,如图所示:

3
frame1到frame5就是任务的执行,那么它到底是在哪调用的,我们来看下。
这个任务是包装在_dispatch_worker_thread2这里,其实包装在pthread中API中,GCD是封装了pthread。
这里有_pthread_workqueue_init_with_workloop工作循环调起, 不是立马调用的,受我们当前的OS管控。
异步线程的回调是异步的
我们再来看下_dispatch_root_queue_poke_slow这个函数,
if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
#endif
    {
        _dispatch_root_queue_debug("requesting new worker thread for global "
                "queue: %p", dq);
        r = _pthread_workqueue_addthreads(remaining,
                _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
        (void)dispatch_assume_zero(r);
        return;
    }

这里如果是全局并发类型,这里会_pthread_workqueue_addthreads调用它创建线程,执行。

如果是普通的,

t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
    do {
        can_request = t_count < floor ? 0 : t_count - floor;
        if (remaining > can_request) {
            _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
                    remaining, can_request);
            os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
            remaining = can_request;
        }
        if (remaining == 0) {
            _dispatch_root_queue_debug("pthread pool is full for root queue: "
                    "%p", dq);
            return;
        }
    } while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
            t_count - remaining, &t_count, acquire));

这里是do while循环。
while的条件os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire)

dgq_thread_pool_size 标记为1。
如下:

const struct dispatch_queue_global_s _dispatch_custom_workloop_root_queue = {
    DISPATCH_GLOBAL_OBJECT_HEADER(queue_global),
    .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
    .do_ctxt = NULL,
    .dq_label = "com.apple.root.workloop-custom",
    .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
    .dq_priority = _dispatch_priority_make_fallback(DISPATCH_QOS_DEFAULT) |
            DISPATCH_PRIORITY_SATURATED_OVERRIDE,
    .dq_serialnum = DISPATCH_QUEUE_SERIAL_NUMBER_WLF,
    .dgq_thread_pool_size = 1,
};

全局并发队列要比并发队列大1

#define DISPATCH_QUEUE_WIDTH_POOL (DISPATCH_QUEUE_WIDTH_FULL - 1) // 全局并发队列
#define DISPATCH_QUEUE_WIDTH_MAX  (DISPATCH_QUEUE_WIDTH_FULL - 2) // 并发队列

全局并发队列的pool_size是从1开始的。
dgq_thread_pool_size会不断的赋值,++操作。
_dispatch_root_queue_poke_slow

t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
    do {
        can_request = t_count < floor ? 0 : t_count - floor;
        if (remaining > can_request) {
            _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
                    remaining, can_request);
            os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
            remaining = can_request;
        }
        if (remaining == 0) {
            _dispatch_root_queue_debug("pthread pool is full for root queue: "
                    "%p", dq);
            return;
        }
    } while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
            t_count - remaining, &t_count, acquire));

这段代码中,remaining = can_request;空余的数量=我能够请求的数量,can_request来之于can_request = t_count < floor ? 0 : t_count - floor;这里,如果floor传过来的,t_count是加载过来的。
remaing来之于参数n,经过分析传的是1,异步线程进来,创建一个线程,所以是1 。
所以这里

if (remaining > can_request) {
            _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
                    remaining, can_request);
            os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
            remaining = can_request;
        }

不大于能够创建的,如果大于就说明有异常了,如果

if (remaining == 0) {
            _dispatch_root_queue_debug("pthread pool is full for root queue: "
                    "%p", dq);
            return;
        }

也会报经常,说明线程池队列满了,就返回,不执行。
那么最多可以开多少线程呢,我们分析下。

do {
        _dispatch_retain(dq); // released in _dispatch_worker_thread
#if DISPATCH_DEBUG
        unsigned dwStackSize = 0;
#else
        unsigned dwStackSize = 64 * 1024;
#endif
        uintptr_t hThread = 0;
        while (!(hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {
            if (errno != EAGAIN) {
                (void)dispatch_assume(hThread);
            }
            _dispatch_temporary_resource_shortage();
        }
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
        if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
            (void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
        }
#endif
        CloseHandle((HANDLE)hThread);
    } while (--remaining);

这段代码中,我们来看下,

wStackSize = 64 * 1024这是目前看到的栈的大小,是不是这样, 我们还需要验证下。
os_atomic_cmpxchgv2o判断是否已经满了,我们看下dgq_thread_pool_size大小

#ifndef DISPATCH_WORKQ_MAX_PTHREAD_COUNT
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
#endif

这里定义的255,线程池最大数,但是不代表能开辟这么多。
我们看下官方文档的说明,如图:

3
这里写的是512k,至少是16k,如果栈越大,开辟线程的内存占用就越大,那么开辟的线程数量就越小。
如果4gb内存,系统内核态1gb
如果是16k,1024*1024/16这么多,线程默认的大小一般是512k,可以开到2048,但是显然开不了这么多。

总结

这篇文章主要介绍了同步函数死锁,同步函数任务同步,异步函数,单列底层原理分析,后续还会再介绍GCD的其它相关知识,文章中有错误地方,请大家指正。

上一篇下一篇

猜你喜欢

热点阅读