GCD底层源码分析

2020-11-10  本文已影响0人  猿人

底层源码分析。

首先从创建队列讲起, dispatch_queue_create函数

 dispatch_queue_create 内部调用了 
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
// attr我们一般传􏰻􏱬􏰱􏰲􏱷􏱸􏲶DISPATCH_QUEUE_SERIAL,􏲖DISPATCH_QUEUE_CONCURRENT􏱔, 􏴊NULL,
// 􏰸DISPATCH_QUEUE_SERIAL􏰦􏵎􏱃􏱸就是  NULL
    return _dispatch_lane_create_with_target(label, attr,
            DISPATCH_TARGET_QUEUE_DEFAULT, true);
}

_dispatch_lane_create_with_target 函数 当我们在打开libdispatch找到这个函数的时候发现,很难读。但是我们需要找到读它的方法 重点就是看他的返回值返回了什么。从返回值上获取关键线索去排查(这里就不粘贴这个方法全部的代码了有兴趣的去自己下载观看)

    return _dispatch_trace_queue_create(dq)._dq;

进入到 dispatch_trace_queue_create 函数,查看 _dq属性 和穿进去的dq做了什么

static inline dispatch_queue_class_t
_dispatch_trace_queue_create(dispatch_queue_class_t dqu)
{
    ///对传进来的dq进行包装处理
    
/// 仅在启用跟踪时进行分派
    _dispatch_only_if_ktrace_enabled({
        uint64_t dq_label[4] = {0}; // So that we get the right null termination
 
        dispatch_queue_t dq = dqu._dq;
        strncpy((char *)dq_label, (char *)dq->dq_label ?: "", sizeof(dq_label));

        _dispatch_ktrace2(DISPATCH_QOS_TRACE_queue_creation_start,
                dq->dq_serialnum,
                _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));

        _dispatch_ktrace4(DISPATCH_QOS_TRACE_queue_creation_end,
                        dq_label[0], dq_label[1], dq_label[2], dq_label[3]);
    });
    ///将传进来 dq 传进 函数 并返回
    return _dispatch_introspection_queue_create(dqu);
}

进入到 _dispatch_introspection_queue_create

_dispatch_introspection_queue_create(dispatch_queue_t dq)
{
    dispatch_queue_introspection_context_t dqic;
    size_t sz = sizeof(struct dispatch_queue_introspection_context_s);

    if (!_dispatch_introspection.debug_queue_inversions) {
        sz = offsetof(struct dispatch_queue_introspection_context_s,
                __dqic_no_queue_inversion);
    }
    ///初始化了个 dqic
       dqic = _dispatch_calloc(1, sz);
       ///将 dq赋值给  _dq
    dqic->dqic_queue._dq = dq;
    if (_dispatch_introspection.debug_queue_inversions) {
        LIST_INIT(&dqic->dqic_order_top_head);
        LIST_INIT(&dqic->dqic_order_bottom_head);
    }
    dq->do_finalizer = dqic;

    _dispatch_unfair_lock_lock(&_dispatch_introspection.queues_lock);
    LIST_INSERT_HEAD(&_dispatch_introspection.queues, dqic, dqic_list);
    _dispatch_unfair_lock_unlock(&_dispatch_introspection.queues_lock);

    DISPATCH_INTROSPECTION_INTERPOSABLE_HOOK_CALLOUT(queue_create, dq);
    if (DISPATCH_INTROSPECTION_HOOK_ENABLED(queue_create)) {
        _dispatch_introspection_queue_create_hook(dq);
    }
    return upcast(dq)._dqu;
}

继续查看 upcast()

static inline dispatch_object_t
upcast(dispatch_object_t dou)
{
    return dou;
}

经过这几层函数的追踪判断 我们并没有看到我们想要的并指导了 dq就是我们要研究的重点

那么回到 最上面的_dispatch_lane_create_with_target的函数 中寻找 dq做了什么。

DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
        dispatch_queue_t tq, bool legacy)
{
....省略
    dispatch_lane_t dq = _dispatch_object_alloc(vtable,
            sizeof(struct dispatch_lane_s)); // alloc
    
    _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
            DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
            (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0)); // init
    dq->dq_label = label;
    dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos,
            dqai.dqai_relpri);
    if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
        dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    }
    if (!dqai.dqai_inactive) {
        _dispatch_queue_priority_inherit_from_target(dq, tq);
        _dispatch_lane_inherit_wlh_from_target(dq, tq);
    }
    _dispatch_retain(tq);
    dq->do_targetq = tq;
    _dispatch_object_debug(dq, "%s", __func__);
    return _dispatch_trace_queue_create(dq)._dq;
}

在 _dispatch_queue_init 里面我们看到了根据 dqai.dqai_concurrent 来区分 并行 和串行。

下面我们在当前函数看一下 dqai

static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
        dispatch_queue_t tq, bool legacy)
{
    // dqai 创建    ///根据传进来 dqa serio concurrent NULL 获取 dqai
    dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
....省略
}

发现第一行就根据 传进来的 要创建什么样的队列 进行了 dqai的创建 我看一下_dispatch_queue_attr_to_info 函数


dispatch_queue_attr_info_t
_dispatch_queue_attr_to_info(dispatch_queue_attr_t dqa)
{
    dispatch_queue_attr_info_t dqai = { };

    if (!dqa) return dqai;

#if DISPATCH_VARIANT_STATIC
    if (dqa == &_dispatch_queue_attr_concurrent) { // null 默认
        dqai.dqai_concurrent = true;
        return dqai;
    }
#endif

    if (dqa < _dispatch_queue_attrs ||
            dqa >= &_dispatch_queue_attrs[DISPATCH_QUEUE_ATTR_COUNT]) {
        DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
    }

    size_t idx = (size_t)(dqa - _dispatch_queue_attrs);

    dqai.dqai_inactive = (idx % DISPATCH_QUEUE_ATTR_INACTIVE_COUNT);
    idx /= DISPATCH_QUEUE_ATTR_INACTIVE_COUNT;

    dqai.dqai_concurrent = !(idx % DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT);
    idx /= DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT;

    dqai.dqai_relpri = -(int)(idx % DISPATCH_QUEUE_ATTR_PRIO_COUNT);
    idx /= DISPATCH_QUEUE_ATTR_PRIO_COUNT;

    dqai.dqai_qos = idx % DISPATCH_QUEUE_ATTR_QOS_COUNT;
    idx /= DISPATCH_QUEUE_ATTR_QOS_COUNT;

    dqai.dqai_autorelease_frequency =
            idx % DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;
    idx /= DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;

    dqai.dqai_overcommit = idx % DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;
    idx /= DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;

    return dqai;
}

我们继续返回到_dispatch_lane_create_with_target 继续 查找 dqai

    //
    // Step 2: Initialize the queue
    //

    if (legacy) {
        // if any of these attributes is specified, use non legacy classes
        if (dqai.dqai_inactive || dqai.dqai_autorelease_frequency) {
            legacy = false;
        }
    }

    const void *vtable;
    dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;

/*----------------- 这区间里面是在做什么? ------------------------*/  
    if (dqai.dqai_concurrent) {
        // OS_dispatch_queue_concurrent
        vtable = DISPATCH_VTABLE(queue_concurrent);
    } else {
        vtable = DISPATCH_VTABLE(queue_serial);
    }
/*---------------------------------------------------------------*/

    switch (dqai.dqai_autorelease_frequency) {
    case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
        dqf |= DQF_AUTORELEASE_NEVER;
        break;
    case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
        dqf |= DQF_AUTORELEASE_ALWAYS;
        break;
    }
    if (label) {
        const char *tmp = _dispatch_strdup_if_mutable(label);
        if (tmp != label) {
            dqf |= DQF_LABEL_NEEDS_FREE;
            label = tmp;
        }
    }
//    开辟内存空间
    dispatch_lane_t dq = _dispatch_object_alloc(vtable,
            sizeof(struct dispatch_lane_s)); // alloc
/// 区分 串行 还是 并发
    _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
            DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
            (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0)); // init

继续搜索 DISPATCH_VTABLE

#define DISPATCH_VTABLE(name) DISPATCH_OBJC_CLASS(name)

继续搜 DISPATCH_OBJC_CLASS

#define DISPATCH_OBJC_CLASS(name)   (&DISPATCH_CLASS_SYMBOL(name))

继续搜 DISPATCH_CLASS_SYMBOL

#define DISPATCH_CLASS_SYMBOL(name) OS_dispatch_##name##_class

#define DISPATCH_CLASS(name) OS_dispatch_##name

继续回到这

/*----------------- 这区间里面是在做什么? ------------------------*/  
    if (dqai.dqai_concurrent) {
        // OS_dispatch_queue_concurrent
        vtable = DISPATCH_VTABLE(queue_concurrent);
    } else {
        vtable = DISPATCH_VTABLE(queue_serial);
    }
/*---------------------------------------------------------------*/

既然创建了一个对象 在 创建完之前 它isa ;类的东西 已经确认完毕,那他在哪里?跟任何一个类 都是一摸摸一样样的。一个对象alloc的时候它前面必然会做,它的父类啊,它的isa的都会指向完毕。

回到 _dispatch_object_alloc的地方

    ///设置 类的 信息
    if (dqai.dqai_concurrent) {
        // OS_dispatch_queue_concurrent
        vtable = DISPATCH_VTABLE(queue_concurrent);
    } else {
        vtable = DISPATCH_VTABLE(queue_serial);
    }
    switch (dqai.dqai_autorelease_frequency) {......}
    if (label) {......}
//    开辟内存空间
    dispatch_lane_t dq = _dispatch_object_alloc(vtable,
            sizeof(struct dispatch_lane_s)); // alloc
    
/// 区分 串行 还是 并发
    _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
            DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
            (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0)); // init
    /// 参数赋值

搜索 _dispatch_object_alloc

#pragma mark dispatch_object_t

void *
_dispatch_object_alloc(const void *vtable, size_t size)
{
#if OS_OBJECT_HAVE_OBJC1
    const struct dispatch_object_vtable_s *_vtable = vtable;
    dispatch_object_t dou;
    dou._os_obj = _os_object_alloc_realized(_vtable->_os_obj_objc_isa, size);
    dou._do->do_vtable = vtable;
    return dou._do;
#else
    return _os_object_alloc_realized(vtable, size);
#endif
}

搜索 _os_object_alloc_realized

_os_object_alloc_realized(const void *cls, size_t size)
{
    _os_object_t obj;
    dispatch_assert(size >= sizeof(struct _os_object_s));
    while (unlikely(!(obj = calloc(1u, size)))) {
        _dispatch_temporary_resource_shortage();
    }
    obj->os_obj_isa = cls;
    return obj;
}
队列底层探索总结
#define DISPATCH_QUEUE_WIDTH_FULL           0x1000ull
#define DISPATCH_QUEUE_WIDTH_MAX  (DISPATCH_QUEUE_WIDTH_FULL - 2)
/// 0x1000ull - 2 =  4094

以上就是对队列的底层探究 下面来到 函数底层探究篇章

dispatch_async 异步函数底层探究

void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME;
    dispatch_qos_t qos;

    // 任务包装器 - 接受 - 保存 - 函数式
    // 保存 block 
    qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

进入 _dispatch_continuation_init 我们看做了什么操作

static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
        dispatch_queue_class_t dqu, dispatch_block_t work,
        dispatch_block_flags_t flags, uintptr_t dc_flags)
{
    ///将work进行copy获取 得到 *ctxt指针
    void *ctxt = _dispatch_Block_copy(work);

    dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
    ///大多数不会走这里
    if (unlikely(_dispatch_block_has_private_data(work))) {
        dc->dc_flags = dc_flags;
        ///将 work副本存入到了 dc包装器里
        dc->dc_ctxt = ctxt;
        // will initialize all fields but requires dc_flags & dc_ctxt to be set
        return _dispatch_continuation_init_slow(dc, dqu, flags);
    }
   ///  根据任务 获取 调用 func
    dispatch_function_t func = _dispatch_Block_invoke(work);
    
    if (dc_flags & DC_FLAG_CONSUME) {
        ///指定执行函数 为 _dispatch_call_block_and_release;
        func = _dispatch_call_block_and_release;
    }
    return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}

我们搜索 _dispatch_continuation_init_f

static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
        dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
        dispatch_block_flags_t flags, uintptr_t dc_flags)
{
    pthread_priority_t pp = 0;
    dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;
    dc->dc_func = f; /// 将func方法存进 dc包装器
    dc->dc_ctxt = ctxt;/// 将 ctxt  block 的上下文指针。
    // in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority
    // should not be propagated, only taken from the handler if it has one
    ///优先级的处理
    if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
        pp = _dispatch_priority_propagate();
    }
    /// 存入处理过的 dc
    _dispatch_continuation_voucher_set(dc, flags);
    return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}

通过上面的操作 我们了解 任务包装器大概是如何 将 外部传进的block进行保存的。 通过堆栈也知道了 正是 任务包装器保存的 _dispatch_call_block_and_release函数 发起的调用 。虽说我们看到了堆栈调用流程,但是具体到每个函数干什么 我还需要据需研究。

下面继续回到dispatch_async;函数 继续研究 下面函数

dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME;
    dispatch_qos_t qos;

    // 任务包装器 - 接受 - 保存 - 函数式
    // 保存 block 
    qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
    
    /// 研究它
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

搜索 _dispatch_continuation_async

_dispatch_continuation_async(dispatch_queue_class_t dqu,
        dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
    if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
        _dispatch_trace_item_push(dqu, dc);
    }
#else
    (void)dc_flags;
#endif
    return dx_push(dqu._dq, dc, qos);
}

那 dx_push函数究竟做了什么?搜索它

#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
#define dx_vtable(x) (&(x)->do_vtable->_os_obj_vtable)

搜索 dq_push

截屏2020-11-08 下午9.58.29.png

将_dispatch_lane_concurrent_push函数打个符号断点 并运行

截屏2020-11-08 下午10.05.39.png

搜索_dispatch_lane_concurrent_push 看他的实现

DISPATCH_NOINLINE
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
    // <rdar://problem/24738102&24743140> reserving non barrier width
    // doesn't fail if only the ENQUEUED bit is set (unlike its barrier
    // width equivalent), so we have to check that this thread hasn't
    // enqueued anything ahead of this call or we can break ordering
/*如果只设置了排队位,保留非栅栏宽度不会失败(不像它的栅栏)
宽度相等),所以我们必须检查这个线程没有
在这个调用之前排队,或者我们可以打破顺序*/
    if (dq->dq_items_tail == NULL &&
            !_dispatch_object_is_waiter(dou) &&
            !_dispatch_object_is_barrier(dou) &&
            _dispatch_queue_try_acquire_async(dq)) {
        return _dispatch_continuation_redirect_push(dq, dou, qos);
    }

    _dispatch_lane_push(dq, dou, qos);
}

我们延续上面符号断点再次将_dispatch_continuation_redirect_push 打入一个符号断点并运行

截屏2020-11-08 下午10.19.54.png

搜索_dispatch_continuation_redirect_push 看它的实现

_dispatch_continuation_redirect_push(dispatch_lane_t dl,
        dispatch_object_t dou, dispatch_qos_t qos)
{
    ///_dispatch对象是不是重定向
      if (likely(!_dispatch_object_is_redirection(dou))) {
        dou._dc = _dispatch_async_redirect_wrap(dl, dou);
    } else if (!dou._dc->dc_ctxt) {
        // find first queue in descending target queue order that has
        // an autorelease frequency set, and use that as the frequency for
        // this continuation./*找到降序目标队列中设置了自动释放频率的第一个队列,并使用该频率作为此后续操作的频率*/
        dou._dc->dc_ctxt = (void *)
        (uintptr_t)_dispatch_queue_autorelease_frequency(dl);
    }

    dispatch_queue_t dq = dl->do_targetq;
    if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
    dx_push(dq, dou, qos);//咦?怎么又调用了dx_push在进行递归
}

首先分析 `if (likely(!_dispatch_object_is_redirection(dou)))
1. 拓展一下 likely

 #define likely(x) __builtin_expect(!!(x), 1) 
 #define unlikely(x) __builtin_expect(!!(x), 0)
  1. 我们看 _dispatch_object_is_redirection 实现代码
 _dispatch_object_is_redirection(dispatch_object_t dou)
 {
  return _dispatch_object_has_type(dou,
          DISPATCH_CONTINUATION_TYPE(ASYNC_REDIRECT));
 }
  
 _dispatch_object_has_type(dispatch_object_t dou, unsigned long type)
 {
  return _dispatch_object_has_vtable(dou) && dx_type(dou._do) == type;
 }

进入到 if判断 里面 搜索 _dispatch_async_redirect_wrap 看其实现

  _dispatch_async_redirect_wrap(dispatch_lane_t dq, dispatch_object_t dou)
      {
      dispatch_continuation_t dc = _dispatch_continuation_alloc();//创建新的 任务包装器

       dou._do->do_next = NULL;
       dc->do_vtable = DC_VTABLE(ASYNC_REDIRECT); // 
       dc->dc_func = NULL;
       dc->dc_ctxt = (void *)(uintptr_t)_dispatch_queue_autorelease_frequency(dq);
       dc->dc_data = dq;
       dc->dc_other = dou._do;
       dc->dc_voucher = DISPATCH_NO_VOUCHER;
       dc->dc_priority = DISPATCH_NO_PRIORITY;
       _dispatch_retain_2(dq); // released in _dispatch_async_redirect_invoke
      return dc;
     }

回到_dispatch_async_redirect_wrap

_dispatch_continuation_redirect_push(dispatch_lane_t dl,
        dispatch_object_t dou, dispatch_qos_t qos)
{
    if (likely(!_dispatch_object_is_redirection(dou))) {
     ///将  创建新的任务包装器 并指定给当前  dispatch_object_t 的任务包装,
    ///  重定向的.do_invoke函数为 _dispatch_async_redirect_invoke  
          dou._dc = _dispatch_async_redirect_wrap(dl, dou);
    } else if (!dou._dc->dc_ctxt) {
        // find first queue in descending target queue order that has
        // an autorelease frequency set, and use that as the frequency for
        // this continuation.
        dou._dc->dc_ctxt = (void *)
        (uintptr_t)_dispatch_queue_autorelease_frequency(dl);
    }
    ///看这里  -----------------------------------------------*/
    dispatch_queue_t dq = dl->do_targetq;
    if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
    dx_push(dq, dou, qos);
    /* -----------------------------------------------*/
}

再次进入 到 dx_push

#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)

搜索 dq_push 找到关于 主队列的vtable

截屏2020-11-09 上午12.49.31.png

老规矩 打符号断点验证

截屏2020-11-09 上午12.53.53.png

来到 _dispatch_root_queue_push

_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
///调度使用中的队列
#if DISPATCH_USE_KEVENT_WORKQUEUE
    dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
    if (unlikely(ddi && ddi->ddi_can_stash)) {
        dispatch_object_t old_dou = ddi->ddi_stashed_dou;
        dispatch_priority_t rq_overcommit;
        rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
        if (likely(!old_dou._do || rq_overcommit)) {
            dispatch_queue_global_t old_rq = ddi->ddi_stashed_rq;
            dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
            ddi->ddi_stashed_rq = rq;
            ddi->ddi_stashed_dou = dou;
            ddi->ddi_stashed_qos = qos;
            _dispatch_debug("deferring item %p, rq %p, qos %d",
                    dou._do, rq, qos);
            if (rq_overcommit) {
                ddi->ddi_can_stash = false;
            }
            if (likely(!old_dou._do)) {
                return;
            }
            // push the previously stashed item
            qos = old_qos;
            rq = old_rq;
            dou = old_dou;
        }
    }
#endif

#if HAVE_PTHREAD_WORKQUEUE_QOS
// 一般情况下,无论自定义还是非自定义都会走进这个条件(比如:dispatch_get_global_queue)
// 里面主要是对比的是 qos与root队列的qos是否 一直。基本上都不一致,如果不一致走进这个 if语句
    if (_dispatch_root_queue_push_needs_override(rq, qos)) {
        return _dispatch_root_queue_push_override(rq, dou, qos);
    }
#else
    (void)qos;
#endif
    _dispatch_root_queue_push_inline(rq, dou, dou, 1);
}

进入 _dispatch_root_queue_push_override

DISPATCH_NOINLINE
static void
_dispatch_root_queue_push_override(dispatch_queue_global_t orig_rq,
        dispatch_object_t dou, dispatch_qos_t qos)
{
    bool overcommit = orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    dispatch_queue_global_t rq = _dispatch_get_root_queue(qos, overcommit);
    dispatch_continuation_t dc = dou._dc;
// 这个_dispatch_object_is_redirection函数其实就是return _dispatch_object_has_type(dou,DISPATCH_CONTINUATION_TYPE(ASYNC_REDIRECT));
// 所有自定义队列会走if语句,如果是dispatch_get_global_queue不会走if语句    
if (_dispatch_object_is_redirection(dc)) {
        // no double-wrap is needed, _dispatch_async_redirect_invoke will do
        // the right thing
        dc->dc_func = (void *)orig_rq;
    } else {
        // dispatch_get_global_queue来到这里
               dc = _dispatch_continuation_alloc();
              // 相当于 下面,也就指定了执行函数为 _dispatch_queue_override_innvoke,所以有别与自定义队列的invoke函数。
//DC_VTABLE_ENTRY(OVERRIDE_OWNING,.do_invoke = _dispatch_queue_override_invoke),

        dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING);
        dc->dc_ctxt = dc;
        dc->dc_other = orig_rq;
        dc->dc_data = dou._do;
        dc->dc_priority = DISPATCH_NO_PRIORITY;
        dc->dc_voucher = DISPATCH_NO_VOUCHER;
    }
    _dispatch_root_queue_push_inline(rq, dc, dc, 1);
}

来到_dispatch_root_queue_push_inline 进行分析

_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
        dispatch_object_t _head, dispatch_object_t _tail, int n)
{
    struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
// 把任务装进队列,大多数不走近 if语句。 
  if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
        return _dispatch_root_queue_poke(dq, n, 0);
    }
}

搜索 _dispatch_root_queue_poke

_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
    if (!_dispatch_queue_class_probe(dq)) {
        return;
    }
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
    if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE))
#endif
    {
        if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) {
            _dispatch_root_queue_debug("worker thread request still pending "
                    "for global queue: %p", dq);
            return;
        }
    }
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
    return _dispatch_root_queue_poke_slow(dq, n, floor);
}

继续搜索重点_dispatch_root_queue_poke_slow(有省略)

DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
    int remaining = n;
    int r = ENOSYS;

    _dispatch_root_queues_init();//重点
    
    ...
    //do-while循环创建线程
    do {
        _dispatch_retain(dq); // released in _dispatch_worker_thread
        while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
            if (r != EAGAIN) {
                (void)dispatch_assume_zero(r);
            }
            _dispatch_temporary_resource_shortage();
        }
    } while (--remaining);
    
    ...
}

搜索 _dispatch_root_queues_init

_dispatch_root_queues_init(void)
{
    dispatch_once_f(&_dispatch_root_queues_pred, NULL,
            _dispatch_root_queues_init_once);
}

我们继续搜索 _dispatch_root_queues_init_once 看其实现

2251862-de6a9414407814d3.jpg

小总结
1 、看到这里 我们 先 小总结 一下 ,异步函数 ,首先 将任务进行包装 ,然后 通过 dxpush 函数 递归 的去重定向到根队列 并执行 根队列 的 dopush 也就是 _dispatch_root_queue_push,并注册了 其执行句柄 _dispatch_worker_thread2 并看到了 线程的创建。
分析:
此时此刻 ,我们需要整体的去分析一下 队列 是什么?它是一种数据结构,并发队列+异步函数,可以支持同一时间多条线程同时执行任务。那任务在什么时候开始执行?它是就绪状态,等待着cpu来进行调度。 所以
_dispatch_root_queue_push 个人认为 它就是在为任务分配线程。及丢到 并发队列中。等待cpu的调度执行。

2 、打印任务 调用栈 我们也看到 任务的执行流程。流程 start_wqthread -> _pthread_wqthread -> _dispatch_worker_thread2 ->_dispatch_root_queue_drain ->_dispatch_async_redirect_invoke ->_dispatch_continuation_pop -> _dispatch_client_callout -> _dispatch_call_block_and_release。

3 、其中start_wqthread ->_pthread_wqthread并不在 libdispatch.dylib 源码之中。我们这里也不过多分析。我们就将其想象成,cpu来进行调度即可。流程已经熟悉。那每个函数之间的调用又发生了什么 我们下面进入到调用栈到的流程底层源码实现分析。

从 _dispatch_worker_thread2 开始查看

static void
_dispatch_worker_thread2(pthread_priority_t pp)
{
    bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
    dispatch_queue_global_t dq;

    pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
    _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
    dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), overcommit);
//自省线程添加
    _dispatch_introspection_thread_add();
//跟踪运行时事件
    _dispatch_trace_runtime_event(worker_unpark, dq, 0);

    int pending = os_atomic_dec2o(dq, dgq_pending, relaxed);
    dispatch_assert(pending >= 0);
/*-----------------------------------调用这里 主队列 线程----------------------------------------------*/
    _dispatch_root_queue_drain(dq, dq->dq_priority,
            DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN);
 /*------------------------------------------------------------------------------------*/
    _dispatch_voucher_debug("root queue clear", NULL);
    _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
    _dispatch_trace_runtime_event(worker_park, NULL, 0);
}

搜索 _dispatch_root_queue_drain

static void
_dispatch_root_queue_drain(dispatch_queue_global_t dq,
        dispatch_priority_t pri, dispatch_invoke_flags_t flags)
{
#if DISPATCH_DEBUG
    dispatch_queue_t cq;
    if (unlikely(cq = _dispatch_queue_get_current())) {
        DISPATCH_INTERNAL_CRASH(cq, "Premature thread recycling");
    }
#endif
    _dispatch_queue_set_current(dq);
    _dispatch_init_basepri(pri);
    _dispatch_adopt_wlh_anon();

    struct dispatch_object_s *item;
    bool reset = false;
    dispatch_invoke_context_s dic = { };
#if DISPATCH_COCOA_COMPAT
    _dispatch_last_resort_autorelease_pool_push(&dic);
#endif // DISPATCH_COCOA_COMPAT
    _dispatch_queue_drain_init_narrowing_check_deadline(&dic, pri);
    _dispatch_perfmon_start();
// 循环取出任务   
while (likely(item = _dispatch_root_queue_drain_one(dq))) {
        if (reset) _dispatch_wqthread_override_reset();
/*---------------------------------------------------------------------------*/ 
///调度出任务的执行函数   
             _dispatch_continuation_pop_inline(item, &dic, flags, dq);
/*-------------------------------------------------------------------------*/
        reset = _dispatch_reset_basepri_override();
        if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
            break;
        }
    }

    // overcommit or not. worker thread
    if (pri & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
        _dispatch_perfmon_end(perfmon_thread_worker_oc);
    } else {
        _dispatch_perfmon_end(perfmon_thread_worker_non_oc);
    }

#if DISPATCH_COCOA_COMPAT
    _dispatch_last_resort_autorelease_pool_pop(&dic);
#endif // DISPATCH_COCOA_COMPAT
    _dispatch_reset_wlh();
    _dispatch_clear_basepri();
    _dispatch_queue_set_current(NULL);
}

搜索 _dispatch_continuation_pop_inline

static inline void
_dispatch_continuation_pop_inline(dispatch_object_t dou,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
        dispatch_queue_class_t dqu)
{
    dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
            _dispatch_get_pthread_root_queue_observer_hooks();
    ///监听处理方便调试
    if (observer_hooks) observer_hooks->queue_will_execute(dqu._dq);
    
    flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
    
    /*-------------------研究重点----------------------------*/
    if (_dispatch_object_has_vtable(dou)) {
        dx_invoke(dou._dq, dic, flags);
    } else {
        _dispatch_continuation_invoke_inline(dou, flags, dqu);
    }
    /*-----------------------------------------------*/
    
    if (observer_hooks) observer_hooks->queue_did_execute(dqu._dq);
}

搜索dx_invoke

#define dx_invoke(x, y, z) dx_vtable(x)->do_invoke(x, y, z)

搜索 _dispatch_async_redirect_invoke

void
_dispatch_async_redirect_invoke(dispatch_continuation_t dc,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags)
{
    dispatch_thread_frame_s dtf;
    struct dispatch_continuation_s *other_dc = dc->dc_other;
    dispatch_invoke_flags_t ctxt_flags = (dispatch_invoke_flags_t)dc->dc_ctxt;
    // if we went through _dispatch_root_queue_push_override,
    // the "right" root queue was stuffed into dc_func
    dispatch_queue_global_t assumed_rq = (dispatch_queue_global_t)dc->dc_func;
    dispatch_lane_t dq = dc->dc_data;
    dispatch_queue_t rq, old_dq;
    dispatch_priority_t old_dbp;

    if (ctxt_flags) {
        flags &= ~_DISPATCH_INVOKE_AUTORELEASE_MASK;
        flags |= ctxt_flags;
    }
    old_dq = _dispatch_queue_get_current();
    if (assumed_rq) {
        old_dbp = _dispatch_root_queue_identity_assume(assumed_rq);
        _dispatch_set_basepri(dq->dq_priority);
    } else {
        old_dbp = _dispatch_set_basepri(dq->dq_priority);
    }

    uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_NO_INTROSPECTION;
    _dispatch_thread_frame_push(&dtf, dq);
    _dispatch_continuation_pop_forwarded(dc, dc_flags, NULL, {
        _dispatch_continuation_pop(other_dc, dic, flags, dq);
    });
    _dispatch_thread_frame_pop(&dtf);
    if (assumed_rq) _dispatch_queue_set_current(old_dq);
    _dispatch_reset_basepri(old_dbp);

    rq = dq->do_targetq;
    while (unlikely(rq->do_targetq && rq != old_dq)) {
        _dispatch_lane_non_barrier_complete(upcast(rq)._dl, 0);
        rq = rq->do_targetq;
    }

    // pairs with _dispatch_async_redirect_wrap
    _dispatch_lane_non_barrier_complete(dq, DISPATCH_WAKEUP_CONSUME_2);
}


搜索 _dispatch_continuation_pop

_dispatch_continuation_pop(dispatch_object_t dou, dispatch_invoke_context_t dic,
        dispatch_invoke_flags_t flags, dispatch_queue_class_t dqu)
{
    _dispatch_continuation_pop_inline(dou, dic, flags, dqu._dq);
}

搜索_dispatch_continuation_pop_inline

static inline void
_dispatch_continuation_pop_inline(dispatch_object_t dou,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
        dispatch_queue_class_t dqu)
{
    dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
            _dispatch_get_pthread_root_queue_observer_hooks();
    ///监听处理方便调试
    if (observer_hooks) observer_hooks->queue_will_execute(dqu._dq);
    
    flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
    
    /*-------------------研究重点----------------------------*/
    if (_dispatch_object_has_vtable(dou)) {
        dx_invoke(dou._dq, dic, flags);
    } else {
        _dispatch_continuation_invoke_inline(dou, flags, dqu);
    }
    /*-----------------------------------------------*/
    
    if (observer_hooks) observer_hooks->queue_did_execute(dqu._dq);
}

搜索_dispatch_continuation_invoke_inline

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_invoke_inline(dispatch_object_t dou,
        dispatch_invoke_flags_t flags, dispatch_queue_class_t dqu)
{
    dispatch_continuation_t dc = dou._dc, dc1;
    dispatch_invoke_with_autoreleasepool(flags, {
        uintptr_t dc_flags = dc->dc_flags;
        // Add the item back to the cache before calling the function. This
        // allows the 'hot' continuation to be used for a quick callback.
        //
        // The ccache version is per-thread.
        // Therefore, the object has not been reused yet.
        // This generates better assembly.
        _dispatch_continuation_voucher_adopt(dc, dc_flags);
        if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
            _dispatch_trace_item_pop(dqu, dou);
        }
        if (dc_flags & DC_FLAG_CONSUME) {
            dc1 = _dispatch_continuation_free_cacheonly(dc);
        } else {
            dc1 = NULL;
        }
        if (unlikely(dc_flags & DC_FLAG_GROUP_ASYNC)) {
            _dispatch_continuation_with_group_invoke(dc);
        } else {
            _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
            _dispatch_trace_item_complete(dc);
        }
        if (unlikely(dc1)) {
            _dispatch_continuation_free_to_cache_limit(dc1);
        }
    });
    _dispatch_perfmon_workitem_inc();
}

搜索 _dispatch_client_callout

_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
    return f(ctxt);
}
上一篇 下一篇

猜你喜欢

热点阅读