iOS随笔iOS 底层探索之路

iOS 底层探索:多线程GCD底层探索(上)

2020-11-11  本文已影响0人  欧德尔丶胡

iOS 底层探索: 学习大纲 OC篇

前言

准备

一 、查找 GCD 源码

来到工程,我们跳转查看dispatch_async如下:只是提供了一个对外的接口。

#ifdef __BLOCKS__
API_AVAILABLE(macos(10.6), ios(4.0))
DISPATCH_EXPORT DISPATCH_NONNULL_ALL DISPATCH_NOTHROW
void
dispatch_async(dispatch_queue_t queue, dispatch_block_t block);
#endif

所以我们再去断点调试:


我们在dispatch_async下断点,查看堆栈下的dispatch_async,就会来到汇编调用情况,其中我们可以看到libdispatch.dylib 调用了dispatch_async:,所以根据以前的经验,GCD的库就在libdispatch.dylib 里面.所以我们去源码中查看. 下载地址上面已给出。

二 、libdispatch源码分析

当我翻开源码的时候,首先我发现,没法编译这份源码,其次源码中结构体、方法等的命名不能从字面理解,各种宏定义的嵌套错综复杂,所以看了半天有点读不下去了。但是,虽然不能像以前探索objc源码那样通过LLDB调试验证,但是我们可以从我们使用GCD流程下手,刨根问底,抱着探索求真的目的,顺着思路往下慢慢的找关键代码,然后进行分析其流程。

所以我们开始第一阶段的探索:

在上一篇讲解了GCD的使用步骤:

队列类型 dispatch_queue_t
队列的创建方法:dispatch_queue_create

创建串行队列
dispatch_queue_t queue = 
dispatch_queue_create("com.lgcooci.seial", DISPATCH_QUEUE_SERIAL);
创建并发队列
dispatch_queue_t queue = 
dispatch_queue_create("com.lgcooci.concurrent", DISPATCH_QUEUE_CONCURRENT);

首先我们在源码中全局去搜索找找dispatch_queue_t是什么?

1. 队列类型dispatch_queue_t

首先可以看到dispatch_queue_t本身只是dispatch_queue_s这个结构体指针

typedef struct dispatch_queue_s *dispatch_queue_t; 

继续查找dispatch_queue_s定义

struct dispatch_queue_s {
    DISPATCH_QUEUE_CLASS_HEADER(queue, void *__dq_opaque1);
    /* 32bit hole on LP64 */
} DISPATCH_ATOMIC64_ALIGN;

继续查找DISPATCH_QUEUE_CLASS_HEADER ,发现是个如下

#define DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
    _DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__); \
    /* LP64 global queue cacheline boundary */ \
    unsigned long dq_serialnum; \// queue的编号
    const char *dq_label; \   //标签
    DISPATCH_UNION_LE(uint32_t volatile dq_atomic_flags, \
        const uint16_t dq_width, \
        const uint16_t __dq_opaque2 \
    ); \
    dispatch_priority_t dq_priority; \//优先级
    union { \
        struct dispatch_queue_specific_head_s *dq_specific_head; \
        struct dispatch_source_refs_s *ds_refs; \
        struct dispatch_timer_source_refs_s *ds_timer_refs; \
        struct dispatch_mach_recv_refs_s *dm_recv_refs; \
        struct dispatch_channel_callbacks_s const *dch_callbacks; \
    }; \
    int volatile dq_sref_cnt

在这个宏里我们找到相关的方法_DISPATCH_QUEUE_CLASS_HEADER(x, pointer_sized_field); 继续展开搜索查看里面的内容如下:

// 展开_DISPATCH_QUEUE_CLASS_HEADER

#define _DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
    DISPATCH_OBJECT_HEADER(x); \
    DISPATCH_UNION_LE(uint64_t volatile dq_state, \
            dispatch_lock dq_state_lock, \
            uint32_t dq_state_bits \
    ); \

// 持续展开DISPATCH_OBJECT_HEADER

#define DISPATCH_OBJECT_HEADER(x) \
    struct dispatch_object_s _as_do[0]; \
    _DISPATCH_OBJECT_HEADER(x)
    
// 进一步查看 _DISPATCH_OBJECT_HEADER

#define _DISPATCH_OBJECT_HEADER(x) \
    struct _os_object_s _as_os_obj[0]; \
    OS_OBJECT_STRUCT_HEADER(dispatch_##x); \  // 这个宏,可以理解为dispatch_object_s继承自_os_object_s
    struct dispatch_##x##_s *volatile do_next; \
    struct dispatch_queue_s *do_targetq; \
    void *do_ctxt; \
    void *do_finalizer
    

来到OS_OBJECT_STRUCT_HEADER之后,我们需要注意一个成员变量,记住这个成员变量名字叫做do_vtable。再继续拆解_OS_OBJECT_HEADER发现里面起就是一个isa指针引用计数一些信息。

再查看 OS_OBJECT_STRUCT_HEADER

#define OS_OBJECT_STRUCT_HEADER(x) \
    _OS_OBJECT_HEADER(\
    const void *_objc_isa, \
    do_ref_cnt, \
    do_xref_cnt); \
// 注意这个成员变量,后面将任务Push到队列就是通过这个变量
    const struct x##_vtable_s *do_vtable
    
// 进一步查看 _OS_OBJECT_HEADER

#define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt) \
        isa; /* must be pointer-sized */ \ // isa指针
        int volatile ref_cnt; \ // gcd对象内部引用计数
        int volatile xref_cnt// gcd对象外部引用计数(内外部都要减到0时,对象会被释放)

经过一步步相关代码的查找最终来到_OS_OBJECT_HEADER ,可以发现这些代码晦涩难懂,但是我们研究源码不一定要知道每句代码的意思,我们查看其核心内容:dispatch_queue_t 的本质是一个结构体指针对象,里面包含了label(标签)、priority(优先级)等一些信息。dispatch_queue_t类型的指针,指向一个dispatch_queue_s 类型的结构体。

其实GCD源码中的数据结构为dispatch_object_t联合抽象类

typedef union {
    struct _os_object_s *_os_obj;// 基类
    struct dispatch_object_s *_do;// 基类继承os_object
    struct dispatch_queue_s *_dq;// 队列结构
    struct dispatch_queue_attr_s *_dqa;// 队列相关属性
    struct dispatch_group_s *_dg;// group结构
    struct dispatch_source_s *_ds;
    struct dispatch_channel_s *_dch;
    struct dispatch_mach_s *_dm;
    struct dispatch_mach_msg_s *_dmsg;
    struct dispatch_semaphore_s *_dsema;// 信号量
    struct dispatch_data_s *_ddata;
    struct dispatch_io_s *_dchannel;
} dispatch_object_t DISPATCH_TRANSPARENT_UNION;

2. 创建队列 dispatch_queue_create

源码查看如下:

dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
    return _dispatch_lane_create_with_target(label, attr,
            DISPATCH_TARGET_QUEUE_DEFAULT, true);
}

label : 标签,我们平时传入队列的名字
attr :我们知道创建队列时, attr 属性有三个值可选,nil、DISPATCH_QUEUE_SERIAL(实际上就是 nil) 或 DISPATCH_QUEUE_CONCURRENT

_dispatch_queue_create_with_target函数,这里会创建一个root队列,并将自己新建的队列绑定到所对应的root队列上。

DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
        dispatch_queue_t tq, bool legacy)
{
    dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);

    //
    // Step 1: Normalize arguments (qos, overcommit, tq) 规范化参数
    //
    // 根据上文代码注释里提到的,作者认为调用者传入DISPATCH_QUEUE_SERIAL和nil的几率要大于传DISPATCH_QUEUE_CONCURRENT。所以这里设置个默认值。
    // 这里怎么理解呢?只要看做if(!dqa)即可
    if (!slowpath(dqa)) {
        // _dispatch_get_default_queue_attr里面会将dqa的dqa_autorelease_frequency指定为DISPATCH_AUTORELEASE_FREQUENCY_INHERIT的,inactive也指定为false。这里就不展开了,只需要知道赋了哪些值。因为后面会用到。
        dqa = _dispatch_get_default_queue_attr();
    } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
        DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
    }

    // 取出优先级
    dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);

    // overcommit单纯从英文理解表示过量使用的意思,那这里这个overcommit就是一个标识符,表示是不是就算负荷很高了,但还是得给我新开一个线程出来给我执行任务。
    _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
    if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
        if (tq->do_targetq) {
            DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
                    "a non-global target queue");
        }
    }

    // 如果overcommit没有被指定
    if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
         // 所以对于overcommit,如果是串行的话默认是开启的,而并行是关闭的
        overcommit = dqa->dqa_concurrent ?
                _dispatch_queue_attr_overcommit_disabled :
                _dispatch_queue_attr_overcommit_enabled;
    }

    // 之前说过初始化队列默认传了DISPATCH_TARGET_QUEUE_DEFAULT,也就是null,所以进入if语句。
    if (!tq) {
        // 获取一个管理自己队列的root队列。
        tq = _dispatch_get_root_queue(
                qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
                overcommit == _dispatch_queue_attr_overcommit_enabled);
        if (slowpath(!tq)) {
            DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
        }
    }

//  -----------  开始进行队列的创建   -----------------

    // legacy默认是true的
    if (legacy) {
        // 之前说过,默认是会给dqa_autorelease_frequency指定为DISPATCH_AUTORELEASE_FREQUENCY_INHERIT,所以这个判断式是成立的
        if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
            legacy = false;
        }
    }
// vtable变量很重要,之后会被赋值到之前说的dispatch_queue_t结构体里的do_vtable变量上
    const void *vtable;
    dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
    if (dqai.dqai_concurrent) {
// 如果创建队列的时候传了DISPATCH_QUEUE_CONCURRENT,就是走这里
        vtable = DISPATCH_VTABLE(queue_concurrent);
    } else {
        // 如果创建线程没有指定为并行队列,无论你传DISPATCH_QUEUE_SERIAL还是nil,都会创建一个串行队列。
        vtable = DISPATCH_VTABLE(queue_serial);
    }
    switch (dqai.dqai_autorelease_frequency) {
             .....
    }
     if (label) {
        // 判断传进来的字符串是否可变的,如果可变的copy成一份不可变的
        const char *tmp = _dispatch_strdup_if_mutable(label);
        if (tmp != label) {
            dqf |= DQF_LABEL_NEEDS_FREE;
            label = tmp;
        }
    }


   // _dispatch_object_alloc里面就将vtable赋值给do_vtable变量上了。
    dispatch_lane_t dq = _dispatch_object_alloc(vtable,
            sizeof(struct dispatch_lane_s));
   // 第三个参数根据是否并行队列,如果不是则最多开一个线程,如果是则最多开0x1000 - 2个线程,这个数量很惊人了已经,换成十进制就是(4096 - 2)个。
    // dqa_inactive之前说串行是false的
    // DISPATCH_QUEUE_ROLE_INNER 也是0,所以这里串行队列的话dqa->dqa_state是0
    _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
            DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
            (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));

    dq->dq_label = label;
    dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos,
            dqai.dqai_relpri);
    if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
        dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    }
    if (!dqai.dqai_inactive) {
        _dispatch_queue_priority_inherit_from_target(dq, tq);
        _dispatch_lane_inherit_wlh_from_target(dq, tq);
    }
    _dispatch_retain(tq);
   // 自定义的queue的目标队列是root队列
    dq->do_targetq = tq;
    _dispatch_object_debug(dq, "%s", __func__);
    return _dispatch_trace_queue_create(dq)._dq;
}

源码中大部分代码都不能从字面上了解其意思,但是这里有几个重要的函数我们可以找到,首先是创建一个root队列_dispatch_get_root_queue函数。取root队列

static inline dispatch_queue_global_t
_dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit)
{
    if (unlikely(qos < DISPATCH_QOS_MIN || qos > DISPATCH_QOS_MAX)) {
        DISPATCH_CLIENT_CRASH(qos, "Corrupted priority");
    }
    return &_dispatch_root_queues[2 * (qos - 1) + overcommit];
}

看下这个_dispatch_root_queues数组。我们可以看到,每一个优先级都有对应的root队列,每一个优先级又分为是不是可以过载的队列。

// 6618342 Contact the team that owns the Instrument DTrace probe before
//         renaming this symbol
struct dispatch_queue_global_s _dispatch_root_queues[] = {
#define _DISPATCH_ROOT_QUEUE_IDX(n, flags) \
        ((flags & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) ? \
        DISPATCH_ROOT_QUEUE_IDX_##n##_QOS_OVERCOMMIT : \
        DISPATCH_ROOT_QUEUE_IDX_##n##_QOS)
#define _DISPATCH_ROOT_QUEUE_ENTRY(n, flags, ...) \
    [_DISPATCH_ROOT_QUEUE_IDX(n, flags)] = { \
        DISPATCH_GLOBAL_OBJECT_HEADER(queue_global), \
        .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
        .do_ctxt = _dispatch_root_queue_ctxt(_DISPATCH_ROOT_QUEUE_IDX(n, flags)), \
        .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), \
        .dq_priority = flags | ((flags & DISPATCH_PRIORITY_FLAG_FALLBACK) ? \
                _dispatch_priority_make_fallback(DISPATCH_QOS_##n) : \
                _dispatch_priority_make(DISPATCH_QOS_##n, 0)), \
        __VA_ARGS__ \
    }
    _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, 0,
        .dq_label = "com.apple.root.maintenance-qos",
        .dq_serialnum = 4,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
        .dq_label = "com.apple.root.maintenance-qos.overcommit",
        .dq_serialnum = 5,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, 0,
        .dq_label = "com.apple.root.background-qos",
        .dq_serialnum = 6,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
        .dq_label = "com.apple.root.background-qos.overcommit",
        .dq_serialnum = 7,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, 0,
        .dq_label = "com.apple.root.utility-qos",
        .dq_serialnum = 8,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
        .dq_label = "com.apple.root.utility-qos.overcommit",
        .dq_serialnum = 9,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK,
        .dq_label = "com.apple.root.default-qos",
        .dq_serialnum = 10,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
            DISPATCH_PRIORITY_FLAG_FALLBACK | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
        .dq_label = "com.apple.root.default-qos.overcommit",
        .dq_serialnum = 11,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, 0,
        .dq_label = "com.apple.root.user-initiated-qos",
        .dq_serialnum = 12,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
        .dq_label = "com.apple.root.user-initiated-qos.overcommit",
        .dq_serialnum = 13,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, 0,
        .dq_label = "com.apple.root.user-interactive-qos",
        .dq_serialnum = 14,
    ),
    _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
        .dq_label = "com.apple.root.user-interactive-qos.overcommit",
        .dq_serialnum = 15,
    ),
};

其中DISPATCH_GLOBAL_OBJECT_HEADER(queue_root),解析到最后是OSdispatch##name##_class这样的这样的,对应的实例对象是如下代码,指定了root队列各个操作对应的函数。

可以看到很熟悉的alloc 、init 操作了;
_dispatch_object_alloc: 申请对应类型的内存;
_dispatch_queue_init:初始化。

首先看前者的实现:

// 注意对于iOS并不满足 OS_OBJECT_HAVE_OBJC1
void *
_dispatch_object_alloc(const void *vtable, size_t size)
{
#if OS_OBJECT_HAVE_OBJC1
    const struct dispatch_object_vtable_s *_vtable = vtable;
    dispatch_object_t dou;
    dou._os_obj = _os_object_alloc_realized(_vtable->_os_obj_objc_isa, size);
    dou._do->do_vtable = vtable;
    return dou._do;
#else
    return _os_object_alloc_realized(vtable, size);
#endif
}

// 接着看 _os_object_alloc_realized
inline _os_object_t
_os_object_alloc_realized(const void *cls, size_t size)
{
    _os_object_t obj;
    dispatch_assert(size >= sizeof(struct _os_object_s));
    while (unlikely(!(obj = calloc(1u, size)))) {
        _dispatch_temporary_resource_shortage();
    }
    obj->os_obj_isa = cls;
    return obj;
}

再看一下 _os_objc_alloc


_os_object_t
_os_object_alloc(const void *cls, size_t size)
{
    if (!cls) cls = &_os_object_vtable;
    return _os_object_alloc_realized(cls, size);
}

inline _os_object_t
_os_object_alloc_realized(const void *cls, size_t size)
{
    _os_object_t obj;
    dispatch_assert(size >= sizeof(struct _os_object_s));
    while (unlikely(!(obj = calloc(1u, size)))) {
        _dispatch_temporary_resource_shortage();
    }
    obj->os_obj_isa = cls;
    return obj;
}

返回值类型:结构体_os_object_s

typedef struct _os_object_s {
    _OS_OBJECT_HEADER(
    const _os_object_vtable_s *os_obj_isa,
    os_obj_ref_cnt,
    os_obj_xref_cnt);
} _os_object_s;


// 系统对象头部定义宏
#define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt) 
        isa;    // isa指针
        int volatile ref_cnt; // gcd对象内部引用计数
        int volatile xref_cnt // gcd对象外部引用计数(内外部都要减到0时,对象会被释放)

再看下_dispatch_queue_init函数,这里也就是做些初始化工作了

static inline void _dispatch_queue_init(dispatch_queue_t dq, dispatch_queue_flags_t dqf,
        uint16_t width, uint64_t initial_state_bits)
{
    uint64_t dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(width);

    dispatch_assert((initial_state_bits & ~(DISPATCH_QUEUE_ROLE_MASK |
            DISPATCH_QUEUE_INACTIVE)) == 0);

    if (initial_state_bits & DISPATCH_QUEUE_INACTIVE) {
        dq_state |= DISPATCH_QUEUE_INACTIVE + DISPATCH_QUEUE_NEEDS_ACTIVATION;
        dq_state |= DLOCK_OWNER_MASK;
        dq->do_ref_cnt += 2; 
    }

    dq_state |= (initial_state_bits & DISPATCH_QUEUE_ROLE_MASK);
    // 指向DISPATCH_OBJECT_LISTLESS是优化编译器的作用。只是为了生成更好的指令让CPU更好的编码
    dq->do_next = (struct dispatch_queue_s *)DISPATCH_OBJECT_LISTLESS;
    dqf |= DQF_WIDTH(width);
    // dqf 保存进 dq->dq_atomic_flags
    os_atomic_store2o(dq, dq_atomic_flags, dqf, relaxed);
    dq->dq_state = dq_state;
    dq->dq_serialnum =
            os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed);
}

  最后是_dispatch_introspection_queue_create函数,一个内省函数。

dispatch_queue_t _dispatch_introspection_queue_create(dispatch_queue_t dq)
{
    TAILQ_INIT(&dq->diq_order_top_head);
    TAILQ_INIT(&dq->diq_order_bottom_head);
    _dispatch_unfair_lock_lock(&_dispatch_introspection.queues_lock);
    TAILQ_INSERT_TAIL(&_dispatch_introspection.queues, dq, diq_list);
    _dispatch_unfair_lock_unlock(&_dispatch_introspection.queues_lock);

    DISPATCH_INTROSPECTION_INTERPOSABLE_HOOK_CALLOUT(queue_create, dq);
    if (DISPATCH_INTROSPECTION_HOOK_ENABLED(queue_create)) {
        _dispatch_introspection_queue_create_hook(dq);
    }
    return dq;
}

至此,一个队列的创建过程我们大致了解了。大致可以分为这么几点

    1. 设置队列优先级
    1. 默认创建的是一个串行队列
    1. 设置队列挂载的根队列。优先级不同根队列也不同
    1. 实例化vtable对象,这个对象给不同队列指定了push、wakeup等函数。

3. 任务异步dispatch_async

内部就是两个函数:

dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME; // 设置标识位
    dispatch_qos_t qos;
 // 任务的包装器  接收 保存 函数式 保存block
    qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

dispatch_continuation_init函数只是一个初始化,主要就是保存Block上下文,指定block的执行函数

static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
        dispatch_queue_class_t dqu, dispatch_block_t work,
        dispatch_block_flags_t flags, uintptr_t dc_flags)
{
    // block对象赋值到dc_ctx
    void *ctxt = _dispatch_Block_copy(work); //拷贝任务
    dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
    if (unlikely(_dispatch_block_has_private_data(work))) {
        dc->dc_flags = dc_flags;
        dc->dc_ctxt = ctxt;//赋值
        // will initialize all fields but requires dc_flags & dc_ctxt to be set
        return _dispatch_continuation_init_slow(dc, dqu, flags);
    }

    dispatch_function_t func = _dispatch_Block_invoke(work);/封装work - 异步回调
    if (dc_flags & DC_FLAG_CONSUME) {
        func = _dispatch_call_block_and_release;//回调函数赋值 - 同步回调
    }
    return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}

_dispatch_call_block_and_release这个函数就是直接执行block了,所以dc->dc_func被调用的话就block会被直接执行了。

void
_dispatch_call_block_and_release(void *block)
{
    void (^b)(void) = block;
    b();
    Block_release(b);
}

上面的初始化过程就是这样,接着看下_dispatch_continuation_async函数

_dispatch_continuation_async(dispatch_queue_class_t dqu,
        dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
// 如果是用barrier插进来的任务或者是串行队列,直接将任务加入到队列
#if DISPATCH_INTROSPECTION
    if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
        _dispatch_trace_item_push(dqu, dc); 
//trace 轨迹 跟踪器的意思  
    }
#else
    (void)dc_flags;
#endif
    return dx_push(dqu._dq, dc, qos);//宏
}

宏:#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z),我们只关心dq_push

全局搜索我们看到:

DISPATCH_VTABLE_INSTANCE(queue,
    // This is the base class for queues, no objects of this type are made
    .do_type        = _DISPATCH_QUEUE_CLUSTER,
    .do_dispose     = _dispatch_object_no_dispose,
    .do_debug       = _dispatch_queue_debug,
    .do_invoke      = _dispatch_object_no_invoke,

    .dq_activate    = _dispatch_queue_no_activate,
);

DISPATCH_VTABLE_INSTANCE(workloop,
    .do_type        = DISPATCH_WORKLOOP_TYPE,
    .do_dispose     = _dispatch_workloop_dispose,
    .do_debug       = _dispatch_queue_debug,
    .do_invoke      = _dispatch_workloop_invoke,

    .dq_activate    = _dispatch_queue_no_activate,
    .dq_wakeup      = _dispatch_workloop_wakeup,
    .dq_push        = _dispatch_workloop_push,
);

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_serial, lane,
    .do_type        = DISPATCH_QUEUE_SERIAL_TYPE,
    .do_dispose     = _dispatch_lane_dispose,
    .do_debug       = _dispatch_queue_debug,
    .do_invoke      = _dispatch_lane_invoke,

    .dq_activate    = _dispatch_lane_activate,
    .dq_wakeup      = _dispatch_lane_wakeup,
    .dq_push        = _dispatch_lane_push,
);

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
    .do_type        = DISPATCH_QUEUE_CONCURRENT_TYPE,
    .do_dispose     = _dispatch_lane_dispose,
    .do_debug       = _dispatch_queue_debug,
    .do_invoke      = _dispatch_lane_invoke,

    .dq_activate    = _dispatch_lane_activate,
    .dq_wakeup      = _dispatch_lane_wakeup,
    .dq_push        = _dispatch_lane_concurrent_push,
);

根据队列的类型不同,这里会对应的有不同的方法
比如:
并发队列:.dq_push = _dispatch_lane_concurrent_push,
串行队列:.dq_push = _dispatch_lane_push,
等等
在项目中我们可以添加符号断点_dispatch_lane_concurrent_push验证如下:

image.png

所以我们先研究下:_dispatch_lane_concurrent_push

void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
    if (dq->dq_items_tail == NULL &&
            !_dispatch_object_is_waiter(dou) &&
            !_dispatch_object_is_barrier(dou) &&
            _dispatch_queue_try_acquire_async(dq)) {
// 进行一系列的判断后,进入该方法。
        return _dispatch_continuation_redirect_push(dq, dou, qos);
    }

    _dispatch_lane_push(dq, dou, qos);
}

符号断点调试可以发现会走_dispatch_continuation_redirect_push 这里:

DISPATCH_NOINLINE
static void
_dispatch_continuation_redirect_push(dispatch_lane_t dl,
        dispatch_object_t dou, dispatch_qos_t qos)
{
    if (likely(!_dispatch_object_is_redirection(dou))) {
        dou._dc = _dispatch_async_redirect_wrap(dl, dou);
    } else if (!dou._dc->dc_ctxt) {
        // find first queue in descending target queue order that has
        // an autorelease frequency set, and use that as the frequency for
        // this continuation.
        dou._dc->dc_ctxt = (void *)
        (uintptr_t)_dispatch_queue_autorelease_frequency(dl);
    }

    dispatch_queue_t dq = dl->do_targetq;
    if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
    // 递归调用,
    // 原因在于GCD也是对象,也存在继承封装的问题,类似于 类 父类 根类的关系。
    dx_push(dq, dou, qos);
}

这里会发现又走到了dx_push,即递归了,综合前面队列创建时可知,队列也是一个对象,有父类、根类,所以会递归执行到根类的方法

接下来,通过根类的_dispatch_root_queue_push符号断点,来验证猜想是否正确,从运行结果看出,完全是正确的

进入_dispatch_root_queue_push -> _dispatch_root_queue_push_inline ->_dispatch_root_queue_poke -> _dispatch_root_queue_poke_slow源码,经过符号断点验证,确实是走的这里,查看该方法的源码实现,主要有两步操作

通过_dispatch_root_queues_init方法注册回调

通过do-while循环创建线程,使用pthread_create方法

DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
    int remaining = n;
    int r = ENOSYS;

    _dispatch_root_queues_init();//重点
    
    ...
    //do-while循环创建线程
    do {
        _dispatch_retain(dq); // released in _dispatch_worker_thread
        while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
            if (r != EAGAIN) {
                (void)dispatch_assume_zero(r);
            }
            _dispatch_temporary_resource_shortage();
        }
    } while (--remaining);
    ...
}

首先我们先分析一下:_dispatch_root_queues_init

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queues_init(void)
{
    dispatch_once_f(&_dispatch_root_queues_pred, NULL, _dispatch_root_queues_init_once);
}

进入_dispatch_root_queues_init源码实现,发现是一个dispatch_once_f单例(这里不作说明,以后重点分析),其中传入的func是_dispatch_root_queues_init_once。

_dispatch_root_queues_init_once 开始进入底层os的处理了。通过断点,bt打印堆栈信息如下:

(lldb) bt
* thread #7, queue = 'cooci', stop reason = breakpoint 1.1
  * frame #0: 0x0000000100449144 001---函数与队列`__29-[ViewController viewDidLoad]_block_invoke(.block_descriptor=0x000000010044c0e0) at ViewController.m:48:9
    frame #1: 0x00000001004efb68 libdispatch.dylib`_dispatch_call_block_and_release + 32
    frame #2: 0x00000001004f15f0 libdispatch.dylib`_dispatch_client_callout + 20
    frame #3: 0x00000001004f44e4 libdispatch.dylib`_dispatch_continuation_pop + 584
    frame #4: 0x00000001004f3888 libdispatch.dylib`_dispatch_async_redirect_invoke + 692
    frame #5: 0x00000001005042ec libdispatch.dylib`_dispatch_root_queue_drain + 364
    frame #6: 0x0000000100504c94 libdispatch.dylib`_dispatch_worker_thread2 + 140
    frame #7: 0x00000001d52b48cc libsystem_pthread.dylib`_pthread_wqthread + 216
(lldb) 

综上所述,异步函数的底层分析如下

【准备工作】:首先,将异步任务拷贝并封装,并设置回调函数func

【block回调】:底层通过dx_push递归,会重定向到根队列,然后通过pthread_creat创建线程,最后通过dx_invoke执行block回调(注意dx_push 和 dx_invoke 是成对的)

异步函数的底层分析流程如图所示


4. 任务同步dispatch_sync

dispatch_sync直接调用的是dispatch_sync_f

void dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    // 很大可能不会走if分支,看做if(_dispatch_block_has_private_data(work))
    if (unlikely(_dispatch_block_has_private_data(work))) {
        return _dispatch_sync_block_with_private_data(dq, work, 0);
    }
    dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
}

static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
        uintptr_t dc_flags)
{
    _dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}

查看_dispatch_sync_f_inline源码,其中width = 1表示是串行队列,其中有两个重点:

static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
// 串行队列会走到这个if分支
    if (likely(dq->dq_width == 1)) {
        return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
    }

    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
        DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }
        
    dispatch_lane_t dl = upcast(dq)._dl;
    // Global concurrent queues and queues bound to non-dispatch threads
    // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE

// 全局获取的并行队列或者绑定的是非调度线程的队列会走进这个if分支
    if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
        return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);//死锁
    }

    if (unlikely(dq->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);

    _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
            _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}

第一种情况,串行队列:dispatch_barrier_sync_f

DISPATCH_NOINLINE
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    _dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    dispatch_tid tid = _dispatch_tid_self();

 // 队列绑定的是非调度线程就会走这里
    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
        DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }

    dispatch_lane_t dl = upcast(dq)._dl;
    
    if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
        return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
                DC_FLAG_BARRIER | dc_flags);
    }

    if (unlikely(dl->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func,
                DC_FLAG_BARRIER | dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
// 一般会走到这里
    _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
            DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                    dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}


DISPATCH_NOINLINE
static void
_dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq,
        void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
// 首先会执行这个函数
    _dispatch_sync_function_invoke_inline(dq, ctxt, func);
    _dispatch_trace_item_complete(dc);
    if (unlikely(dq->dq_items_tail || dq->dq_width > 1)) {
// 内部其实就是唤醒队列
        return _dispatch_lane_barrier_complete(dq, 0, 0);
    }

    const uint64_t fail_unlock_mask = DISPATCH_QUEUE_SUSPEND_BITS_MASK |
            DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_DIRTY |
            DISPATCH_QUEUE_RECEIVED_OVERRIDE | DISPATCH_QUEUE_SYNC_TRANSFER |
            DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;
    uint64_t old_state, new_state;

    // similar to _dispatch_queue_drain_try_unlock
   // 原子锁。检查dq->dq_state与old_state是否相等,如果相等把new_state赋值给dq->dq_state,如果不相等,把dq_state赋值给old_state。
    // 串行队列走到这里,dq->dq_state与old_state是相等的,会把new_state也就是闭包里的赋值的值给dq->dq_state
   
    os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
        new_state  = old_state - DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
        new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
        new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
        if (unlikely(old_state & fail_unlock_mask)) {
            os_atomic_rmw_loop_give_up({
                return _dispatch_lane_barrier_complete(dq, 0, 0);
            });
        }
    });
    if (_dq_state_is_base_wlh(old_state)) {
        _dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
    }
}

自定义并行队列会走_dispatch_sync_invoke_and_complete函数。

DISPATCH_NOINLINE
static void
_dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
        dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
    _dispatch_sync_function_invoke_inline(dq, ctxt, func);
    _dispatch_trace_item_complete(dc);
  // 将自定义队列加入到root队列里去
    // dispatch_async也会调用此方法,之前我们初始化的时候会绑定一个root队列,这里就将我们新建的队列交给root队列进行管理
    
    _dispatch_lane_non_barrier_complete(dq, 0);
}

static inline void _dispatch_sync_function_invoke_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func)
{
    dispatch_thread_frame_s dtf;
    _dispatch_thread_frame_push(&dtf, dq);
    // 执行任务
    _dispatch_client_callout(ctxt, func);
    _dispatch_perfmon_workitem_inc();
    _dispatch_thread_frame_pop(&dtf);
}

_dispatch_sync_f_slow 死锁
进入_dispatch_sync_f_slow,当前的主队列是挂起、阻塞的

DISPATCH_NOINLINE
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
        dispatch_function_t func, uintptr_t top_dc_flags,
        dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
    dispatch_queue_t top_dq = top_dqu._dq;
    dispatch_queue_t dq = dqu._dq;
    if (unlikely(!dq->do_targetq)) {
        return _dispatch_sync_function_invoke(dq, ctxt, func);
    }

    pthread_priority_t pp = _dispatch_get_priority();
    struct dispatch_sync_context_s dsc = {
        .dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
        .dc_func     = _dispatch_async_and_wait_invoke,
        .dc_ctxt     = &dsc,
        .dc_other    = top_dq,
        .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
        .dc_voucher  = _voucher_get(),
        .dsc_func    = func,
        .dsc_ctxt    = ctxt,
        .dsc_waiter  = _dispatch_tid_self(),
    };

    _dispatch_trace_item_push(top_dq, &dsc);
    __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

    if (dsc.dsc_func == NULL) {
        // dsc_func being cleared means that the block ran on another thread ie.
        // case (2) as listed in _dispatch_async_and_wait_f_slow.
        dispatch_queue_t stop_dq = dsc.dc_other;
        return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
    }

    _dispatch_introspection_sync_begin(top_dq);
    _dispatch_trace_item_pop(top_dq, &dsc);
    _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
            DISPATCH_TRACE_ARG(&dsc));
}

往一个队列中 加入任务,会push加入主队列,进入_dispatch_trace_item_push

static inline void
_dispatch_trace_item_push(dispatch_queue_class_t dqu, dispatch_object_t _tail)
{
    if (unlikely(DISPATCH_QUEUE_PUSH_ENABLED())) {
        _dispatch_trace_continuation(dqu._dq, _tail._do, DISPATCH_QUEUE_PUSH);
    }

    _dispatch_trace_item_push_inline(dqu._dq, _tail._do);
    _dispatch_introspection_queue_push(dqu, _tail);
}

进入DISPATCH_WAIT_FOR_QUEUE,判断dq是否为正在等待的队列,然后给出一个状态state,然后将dq的状态和当前任务依赖的队列进行匹配

static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
    uint64_t dq_state = _dispatch_wait_prepare(dq);
    if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
        DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
                "dispatch_sync called on queue "
                "already owned by current thread");
    }
   .....省略 ....
}

进入_dq_state_drain_locked_by -> _dispatch_lock_is_locked_by源码

DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
    // equivalent to _dispatch_lock_owner(lock_value) == tid
    //异或操作:相同为0,不同为1,如果相同,则为0,0 &任何数都为0
    //即判断 当前要等待的任务 和 正在执行的任务是否一样,通俗的解释就是 执行和等待的是否在同一队列
    return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}

如果当前等待的和正在执行的是同一个队列,即判断线程ID是否相乘,如果相等,则会造成死锁

同步函数 + 并发队列 顺序执行的原因

在_dispatch_sync_invoke_and_complete -> _dispatch_sync_function_invoke_inline源码中,主要有三个步骤:

  • 将任务压入队列: _dispatch_thread_frame_push
  • 执行任务的block回调: _dispatch_client_callout
  • 将任务出队:_dispatch_thread_frame_pop

从实现中可以看出,是先将任务push队列中,然后执行block回调,在将任务pop,所以任务是顺序执行的。

综上所述,同步函数的底层分析如下

  • 同步函数的底层实现实际是同步栅栏函数

  • 同步函数中如果当前正在执行的队列和等待的是同一个队列,形成相互等待的局面,则会造成死锁

步骤如下:


三 、总结

分析了这一阶段的源码之后,我觉得了解下基本流程就行了,最后都来到底层函数封装的调用。接下来我们分析GCD的另外一部分,GCD底层原理下,单例,栅栏函数、信号量等等。

上一篇下一篇

猜你喜欢

热点阅读