iOS

iOS-底层原理25:GCD底层原理分析

2021-01-12  本文已影响0人  AcmenL

在上一篇文章iOS-底层原理24:GCD 之 函数与队列中从函数和队列去认识GCD,本文将更深入的通过源码去了解GCD底层原理。

1 查找GCD源码

step1:用一个案例去寻找GCD的源码,在此处打上断点

dispatch_queue_t conque = dispatch_queue_create("lbh", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(conque, ^{
    NSLog(@"异步函数");
});

等程序运行到此处,Debug --> Debug Workflow 勾上Always show Disassembly

step2: 程序会自动跳转到汇编代码

可以看到一些熟悉的符号

step3: 给程序打上符号断点

step4: 继续运行,发现程序程序自动跳转到汇编

已经定位到对应的库libdispatch

step5:苹果开源地址搜索libdispatch

找到一个相关的库将它下载下来

2 队列

队列类型: dispatch_queue_t
队列的创建方法:dispatch_queue_create

2.1 队列类型 dispatch_queue_t

step1: 查找dispatch_queue_t的定义

typedef struct dispatch_queue_s *dispatch_queue_t; 

可以看到dispatch_queue_t本身只是dispatch_queue_s这个结构体指针

step2: 全局搜索dispatch_queue_s {,查找dispatch_queue_s定义

struct dispatch_queue_s {
    DISPATCH_QUEUE_CLASS_HEADER(queue, void *__dq_opaque1);
    /* 32bit hole on LP64 */
} DISPATCH_ATOMIC64_ALIGN;

step3: 查找DISPATCH_QUEUE_CLASS_HEADER

#define DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
    _DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__); \
    /* LP64 global queue cacheline boundary */ \
    unsigned long dq_serialnum; \// queue的编号
    const char *dq_label; \   //标签
    DISPATCH_UNION_LE(uint32_t volatile dq_atomic_flags, \
        const uint16_t dq_width, \
        const uint16_t __dq_opaque2 \
    ); \
    dispatch_priority_t dq_priority; \//优先级
    union { \
        struct dispatch_queue_specific_head_s *dq_specific_head; \
        struct dispatch_source_refs_s *ds_refs; \
        struct dispatch_timer_source_refs_s *ds_timer_refs; \
        struct dispatch_mach_recv_refs_s *dm_recv_refs; \
        struct dispatch_channel_callbacks_s const *dch_callbacks; \
    }; \
    int volatile dq_sref_cnt

在这个宏里我们找到相关的方法_DISPATCH_QUEUE_CLASS_HEADER(x, pointer_sized_field);

step4: 继续展开搜索查看里面的内容如下:

// 展开_DISPATCH_QUEUE_CLASS_HEADER

#define _DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
    DISPATCH_OBJECT_HEADER(x); \
    DISPATCH_UNION_LE(uint64_t volatile dq_state, \
            dispatch_lock dq_state_lock, \
            uint32_t dq_state_bits \
    ); \

// 持续展开DISPATCH_OBJECT_HEADER

#define DISPATCH_OBJECT_HEADER(x) \
    struct dispatch_object_s _as_do[0]; \
    _DISPATCH_OBJECT_HEADER(x)
    
// 进一步查看 _DISPATCH_OBJECT_HEADER

#define _DISPATCH_OBJECT_HEADER(x) \
    struct _os_object_s _as_os_obj[0]; \
    OS_OBJECT_STRUCT_HEADER(dispatch_##x); \  // 这个宏,可以理解为dispatch_object_s继承自_os_object_s
    struct dispatch_##x##_s *volatile do_next; \
    struct dispatch_queue_s *do_targetq; \
    void *do_ctxt; \
    void *do_finalizer
    

step5: 查看OS_OBJECT_STRUCT_HEADER

#define OS_OBJECT_STRUCT_HEADER(x) \
    _OS_OBJECT_HEADER(\
    const void *_objc_isa, \
    do_ref_cnt, \
    do_xref_cnt); \
// 注意这个成员变量,后面将任务Push到队列就是通过这个变量
    const struct x##_vtable_s *do_vtable
    

来到OS_OBJECT_STRUCT_HEADER之后,我们需要注意一个成员变量,记住这个成员变量名字叫做do_vtable

step6: 查看_OS_OBJECT_HEADER

// 进一步查看 _OS_OBJECT_HEADER

#define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt) \
        isa; /* must be pointer-sized */ \ // isa指针
        int volatile ref_cnt; \ // gcd对象内部引用计数
        int volatile xref_cnt// gcd对象外部引用计数(内外部都要减到0时,对象会被释放)

_OS_OBJECT_HEADER包含isa指针引用计数等信息。

【总结】

dispatch_queue_t 的本质是一个结构体指针对象,指向一个dispatch_queue_s 类型的结构体,里面包含了label(标签)、priority(优先级)等一些信息。

GCD源码中的数据结构为dispatch_object_t联合抽象类

typedef union {
    struct _os_object_s *_os_obj;// 基类
    struct dispatch_object_s *_do;// 基类继承os_object
    struct dispatch_queue_s *_dq;// 队列结构
    struct dispatch_queue_attr_s *_dqa;// 队列相关属性
    struct dispatch_group_s *_dg;// group结构
    struct dispatch_source_s *_ds;
    struct dispatch_channel_s *_dch;
    struct dispatch_mach_s *_dm;
    struct dispatch_mach_msg_s *_dmsg;
    struct dispatch_semaphore_s *_dsema;// 信号量
    struct dispatch_data_s *_ddata;
    struct dispatch_io_s *_dchannel;
} dispatch_object_t DISPATCH_TRANSPARENT_UNION;

2.2 创建队列 dispatch_queue_create

我们知道队列的创建是通过dispatch_queue_create,让我们看下它在源码中是如何创建的

step1: 打开源码,全局搜索dispatch_queue_create,在queue.c文件中找到源码

dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
    return _dispatch_lane_create_with_target(label, attr,
            DISPATCH_TARGET_QUEUE_DEFAULT, true);
}

label : 标签,我们平时传入队列的名字
attr :我们知道创建队列时, attr 属性有三个值可选,nilDISPATCH_QUEUE_SERIAL(实际上就是 nil) 或 DISPATCH_QUEUE_CONCURRENT

step2: 搜索_dispatch_lane_create_with_target

DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
        dispatch_queue_t tq, bool legacy)
{
    // dqai 创建 -
    dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
    
    //第一步:规范化参数,例如qos, overcommit, tq
    ...
    
    //拼接队列名称
    const void *vtable;
    dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
    if (dqai.dqai_concurrent) { //vtable表示类的类型
        // OS_dispatch_queue_concurrent
        vtable = DISPATCH_VTABLE(queue_concurrent);
    } else {
        vtable = DISPATCH_VTABLE(queue_serial);
    }
    
    ....
    
    //创建队列,并初始化
    dispatch_lane_t dq = _dispatch_object_alloc(vtable,
            sizeof(struct dispatch_lane_s)); // alloc
    //根据dqai.dqai_concurrent的值,就能判断队列 是 串行 还是并发
    _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
            DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
            (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0)); // init
    //设置队列label标识符
    dq->dq_label = label;//label赋值
    dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos, dqai.dqai_relpri);//优先级处理
    
    ...
    
    //类似于类与元类的绑定,不是直接的继承关系,而是类似于模型与模板的关系
    dq->do_targetq = tq;
    _dispatch_object_debug(dq, "%s", __func__);
    return _dispatch_trace_queue_create(dq)._dq;//研究dq
}
2.2.1 _dispatch_lane_create_with_target 分析

part1: 全局搜索_dispatch_queue_attr_to_info

通过_dispatch_queue_attr_to_info方法传入dqa(即队列类型,串行、并发等)创建dispatch_queue_attr_info_t类型的对象dqai,用于存储队列的相关属性信息

part2: 设置队列相关联的属性,例如服务质量qos等

part3: 通过DISPATCH_VTABLE拼接队列名称,即vtable,其中DISPATCH_VTABLE是宏定义,如下所示,所以队列的类型是通过OS_dispatch_+队列类型(queue_concurrent or queue_serial)拼接而成的

//object_internal.h
#define DISPATCH_VTABLE(name) DISPATCH_OBJC_CLASS(name)
⬇️
#define DISPATCH_OBJC_CLASS(name)   (&DISPATCH_CLASS_SYMBOL(name))
⬇️
#define DISPATCH_CLASS_SYMBOL(name) OS_dispatch_##name##_class
⬇️
#define DISPATCH_CLASS(name) OS_dispatch_##name

part4: 通过alloc+init初始化队列,即dq,其中在_dispatch_queue_init传参中根据dqai.dqai_concurrent的布尔值,就能判断队列 是 串行 还是并发,而 vtable表示队列的类型,说明队列也是对象

part5: 通过_dispatch_trace_queue_create对创建的队列进行处理,其中_dispatch_trace_queue_create_dispatch_introspection_queue_create封装的宏定义,最后会返回处理过的_dq

【总结】

dispatch_queue_create底层分析流程如下图所示:

3 函数 底层原理

主要是分析 异步函数dispatch_async 和 同步函数dispatch_sync

3.1 异步函数 dispatch_async

进入dispatch_async函数源码

void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)//work 任务
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME;
    dispatch_qos_t qos;

    // 任务包装器(work在这里才有使用) - 接受work - 保存work - 并函数式编程
    // 保存 block 
    qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
    //并发处理
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

_dispatch_continuation_init:任务包装函数
_dispatch_continuation_async:并发处理函数

3.1.1 _dispatch_continuation_init 任务包装

进入_dispatch_continuation_init源码实现,主要是包装任务,并设置线程的回调函数,相当于初始化

DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
        dispatch_queue_class_t dqu, dispatch_block_t work,
        dispatch_block_flags_t flags, uintptr_t dc_flags)
{
    void *ctxt = _dispatch_Block_copy(work);//拷贝任务

    dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
    if (unlikely(_dispatch_block_has_private_data(work))) {
        dc->dc_flags = dc_flags;
        dc->dc_ctxt = ctxt;//赋值
        // will initialize all fields but requires dc_flags & dc_ctxt to be set
        return _dispatch_continuation_init_slow(dc, dqu, flags);
    }

    dispatch_function_t func = _dispatch_Block_invoke(work);//封装work - 异步回调
    if (dc_flags & DC_FLAG_CONSUME) {
        func = _dispatch_call_block_and_release;//回调函数赋值 - 同步回调
    }
    return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}

主要有以下几步:

step1: 通过_dispatch_Block_copy拷贝任务

step2: 通过_dispatch_Block_invoke封装任务,其中_dispatch_Block_invoke是个宏定义,根据以上分析得知是异步回调

#define _dispatch_Block_invoke(bb) \
        ((dispatch_function_t)((struct Block_layout *)bb)->invoke)

step3: 如果是同步的,则回调函数赋值为_dispatch_call_block_and_release

step4: 通过_dispatch_continuation_init_f方法将回调函数赋值,即f就是func,将其保存在属性中

3.1.2 _dispatch_continuation_async 并发处理

这个函数中,主要是执行block回调

step1: 进入_dispatch_continuation_async源码

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
        dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
    if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
        _dispatch_trace_item_push(dqu, dc);//跟踪日志
    }
#else
    (void)dc_flags;
#endif
    return dx_push(dqu._dq, dc, qos);//与dx_invoke一样,都是宏
}

step2: 关键代码是dx_push(dqu._dq, dc, qos)dx_push宏定义,如下所示

#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)

step3: 查看dq_push需要根据队列的类型,执行不同的函数

3.1.2.1 符号断点调试执行函数

part1: 添加如下测试代码,并打上断点

dispatch_queue_t conque = dispatch_queue_create("lbh", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(conque, ^{
    NSLog(@"异步函数");
});
截屏2021-01-15 上午11.08.07.png

part2:运行到断点处,加上符号断点_dispatch_lane_concurrent_push_dispatch_lane_push

part3: 继续运行

跳转到汇编代码,走的是_dispatch_lane_concurrent_push

part3: 进入_dispatch_lane_concurrent_push函数源码

DISPATCH_NOINLINE
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
    // <rdar://problem/24738102&24743140> reserving non barrier width
    // doesn't fail if only the ENQUEUED bit is set (unlike its barrier
    // width equivalent), so we have to check that this thread hasn't
    // enqueued anything ahead of this call or we can break ordering
    if (dq->dq_items_tail == NULL &&
            !_dispatch_object_is_waiter(dou) &&
            !_dispatch_object_is_barrier(dou) &&
            _dispatch_queue_try_acquire_async(dq)) {
        return _dispatch_continuation_redirect_push(dq, dou, qos);
    }

    _dispatch_lane_push(dq, dou, qos);
}

有两个重要的函数_dispatch_continuation_redirect_push_dispatch_lane_push

part4:_dispatch_continuation_redirect_push 打上符号断点,_dispatch_lane_push的符号断点已经存在,继续执行

走的是_dispatch_continuation_redirect_push

part5: 进入_dispatch_continuation_redirect_push源码

DISPATCH_NOINLINE
static void
_dispatch_continuation_redirect_push(dispatch_lane_t dl,
        dispatch_object_t dou, dispatch_qos_t qos)
{
    if (likely(!_dispatch_object_is_redirection(dou))) {
        dou._dc = _dispatch_async_redirect_wrap(dl, dou);
    } else if (!dou._dc->dc_ctxt) {
        // find first queue in descending target queue order that has
        // an autorelease frequency set, and use that as the frequency for
        // this continuation.
        dou._dc->dc_ctxt = (void *)
        (uintptr_t)_dispatch_queue_autorelease_frequency(dl);
    }

    dispatch_queue_t dq = dl->do_targetq;
    if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
    dx_push(dq, dou, qos); //递归
}

发现又调用了dx_push,即递归了,综合前面队列创建时可知,队列也是一个对象,有父类、根类,所以会递归执行到根类的方法

part6: 将根类_dispatch_root_queue_push打上符号断点,来验证猜想是否正确

从运行结果看,猜想是正确的,队列是一个对象,递归会执行到根类

part7: 进入源码_dispatch_root_queue_push --> _dispatch_root_queue_push_inline --> _dispatch_root_queue_poke --> _dispatch_root_queue_poke_slow,将_dispatch_root_queue_poke_slow打上符号断点,继续运行

part8: 进入_dispatch_root_queue_poke_slow源码

DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
    int remaining = n;
    int r = ENOSYS;

    _dispatch_root_queues_init();//重点
    
    ...
    //do-while循环创建线程
    do {
        _dispatch_retain(dq); // released in _dispatch_worker_thread
        while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
            if (r != EAGAIN) {
                (void)dispatch_assume_zero(r);
            }
            _dispatch_temporary_resource_shortage();
        }
    } while (--remaining);
    
    ...
}

_dispatch_root_queue_poke_slow源码中主要有两步操作:

3.1.2.2 _dispatch_root_queues_init

part1: 进入_dispatch_root_queues_init源码

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queues_init(void)
{
    dispatch_once_f(&_dispatch_root_queues_pred, NULL, _dispatch_root_queues_init_once);
}

dispatch_once_f是个单例(后面会对单例底层进行分析)

part2: 进入_dispatch_root_queues_init_once源码

内部不同事务的调用句柄都是_dispatch_worker_thread2

part3: 可运行案例打印堆栈信息

_dispatch_root_queues_init回调路径: _dispatch_worker_thread2 --> _dispatch_root_queue_drain --> _dispatch_async_redirect_invoke --> _dispatch_continuation_pop --> _dispatch_client_callout --> _dispatch_call_block_and_release

说明

单例的block回调异步函数的block回调不同

  • 单例的block回调中的func是_dispatch_Block_invoke(block)
  • 异步函数的block回调中的func是dispatch_call_block_and_release

总结

所以,综上所述,异步函数的底层分析如下

*【准备工作】:首先,将异步任务拷贝并封装,并设置回调函数func

*【block回调】:底层通过dx_push递归,会重定向到根队列,然后通过pthread_creat创建线程,最后通过dx_invoke执行block回调(注意dx_push 和 dx_invoke 是成对的)

异步函数的底层分析流程如图所示

3.2 同步函数 dispatch_sync

step1: 进入dispatch_sync源码实现,其底层的实现是通过栅栏函数实现的(后续会进行分析)

DISPATCH_NOINLINE
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    uintptr_t dc_flags = DC_FLAG_BLOCK;
    if (unlikely(_dispatch_block_has_private_data(work))) {
        return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
    }
    _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}

step2: 进入_dispatch_sync_f源码

DISPATCH_NOINLINE
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
        uintptr_t dc_flags)
{
    _dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}

step3: 进入_dispatch_sync_f_inline源码

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    if (likely(dq->dq_width == 1)) {//表示是串行队列
        return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);//栅栏
    }

    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
        DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }

    dispatch_lane_t dl = upcast(dq)._dl;
    // Global concurrent queues and queues bound to non-dispatch threads
    // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
        return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);//死锁
    }

    if (unlikely(dq->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);//处理当前信息
    _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
            _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));//block执行并释放
}

_dispatch_sync_f_inline源码中有两个重要的函数:

3.2.1 死锁 _dispatch_sync_f_slow

part1: 进入_dispatch_sync_f_slow源码

part2: 进入_dispatch_trace_item_push源码

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_trace_item_push(dispatch_queue_class_t dqu, dispatch_object_t _tail)
{
    if (unlikely(DISPATCH_QUEUE_PUSH_ENABLED())) {
        _dispatch_trace_continuation(dqu._dq, _tail._do, DISPATCH_QUEUE_PUSH);
    }

    _dispatch_trace_item_push_inline(dqu._dq, _tail._do);
    _dispatch_introspection_queue_push(dqu, _tail);
}

part3:进入__DISPATCH_WAIT_FOR_QUEUE__,判断dq是否为正在等待的队列,然后给出一个状态state,然后将dq的状态和当前任务依赖的队列进行匹配

DISPATCH_NOINLINE
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
        // 判断qd是否为正在等待的主队列
    uint64_t dq_state = _dispatch_wait_prepare(dq);
    if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
        DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
                "dispatch_sync called on queue "
                "already owned by current thread");
    }

    // Blocks submitted to the main thread MUST run on the main thread, and
    // dispatch_async_and_wait also executes on the remote context rather than
    // the current thread.
    //
    // For both these cases we need to save the frame linkage for the sake of
    // _dispatch_async_and_wait_invoke
    _dispatch_thread_frame_save_state(&dsc->dsc_dtf);

    if (_dq_state_is_suspended(dq_state) ||
            _dq_state_is_base_anon(dq_state)) {
        dsc->dc_data = DISPATCH_WLH_ANON;
    } else if (_dq_state_is_base_wlh(dq_state)) {
        dsc->dc_data = (dispatch_wlh_t)dq;
    } else {
        _dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
    }

    if (dsc->dc_data == DISPATCH_WLH_ANON) {
        dsc->dsc_override_qos_floor = dsc->dsc_override_qos =
                (uint8_t)_dispatch_get_basepri_override_qos_floor();
        _dispatch_thread_event_init(&dsc->dsc_event);
    }
    dx_push(dq, dsc, _dispatch_qos_from_pp(dsc->dc_priority));
    _dispatch_trace_runtime_event(sync_wait, dq, 0);
    if (dsc->dc_data == DISPATCH_WLH_ANON) {
        _dispatch_thread_event_wait(&dsc->dsc_event); // acquire
    } else {
        _dispatch_event_loop_wait_for_ownership(dsc);
    }
    if (dsc->dc_data == DISPATCH_WLH_ANON) {
        _dispatch_thread_event_destroy(&dsc->dsc_event);
        // If _dispatch_sync_waiter_wake() gave this thread an override,
        // ensure that the root queue sees it.
        if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
            _dispatch_set_basepri_override_qos(dsc->dsc_override_qos);
        }
    }
}

part4: 进入_dq_state_drain_locked_by --> _dispatch_lock_is_locked_by源码

DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
    // equivalent to _dispatch_lock_owner(lock_value) == tid
    //异或操作:相同为0,不同为1,如果相同,则为0,0 &任何数都为0
    //即判断 当前要等待的任务 和 正在执行的任务是否一样,通俗的解释就是 执行和等待的是否在同一队列
    return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}

如果当前等待的和正在执行的是同一个队列,即判断线程ID是否相乘,如果相等,则会造成死锁

同步函数 + 并发队列 顺序执行的原因

_dispatch_sync_invoke_and_complete --> _dispatch_sync_function_invoke_inline源码中,主要有三个步骤:

  • 将任务压入队列: _dispatch_thread_frame_push
  • 执行任务的block回调: _dispatch_client_callout
  • 将任务出队:_dispatch_thread_frame_pop

从实现中可以看出,是先将任务push队列中,然后执行block回调,在将任务pop,所以任务是顺序执行的。

总结

同步函数的底层实现如下:

  • 同步函数的底层实现实际是同步栅栏函数

  • 同步函数中如果当前正在执行的队列和等待的是同一个队列,形成相互等待的局面,则会造成死锁

所以,综上所述,同步函数的底层实现流程如图所示

4 单例

在日常开发中,我们一般使用GCD的dispatch_once来创建单例,如下所示:

static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
    NSLog(@"单例应用");
});

首先对于单例,我们需要了解两点:

  • 执行一次的原因】单例的流程只执行一次,底层是如何控制的,即为什么只能执行一次?

  • block调用时机】单例的block是在什么时候进行调用的?

4.1 单例 底层分析

step1: dispatch_once有两个参数:

void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
    dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}

进入dispatch_once源码,底层是通过dispatch_once_f实现的,

step2: 进入dispatch_once_f源码

DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
    dispatch_once_gate_t l = (dispatch_once_gate_t)val;

#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    uintptr_t v = os_atomic_load(&l->dgo_once, acquire);//load
    if (likely(v == DLOCK_ONCE_DONE)) {//已经执行过了,直接返回
        return;
    }
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    if (likely(DISPATCH_ONCE_IS_GEN(v))) {
        return _dispatch_once_mark_done_if_quiesced(l, v);
    }
#endif
#endif
    if (_dispatch_once_gate_tryenter(l)) {//尝试进入
        return _dispatch_once_callout(l, ctxt, func);
    }
    return _dispatch_once_wait(l);//无限次等待
}

其中的val 是外界传入的onceToken静态变量,而func_dispatch_Block_invoke(block),其中单例的底层主要分为以下几步:

  1. val,也就是静态变量转换为dispatch_once_gate_t类型的变量l
  2. 通过os_atomic_load获取此时的任务的标识符v
  • 如果v等于DLOCK_ONCE_DONE,表示任务已经执行过了,直接return

  • 如果任务执行后,加锁失败了,则走到_dispatch_once_mark_done_if_quiesced函数,再次进行存储,将标识符置为DLOCK_ONCE_DONE

  • 反之,则通过_dispatch_once_gate_tryenter尝试进入任务,即解锁,然后执行_dispatch_once_callout执行block回调

  1. 如果此时有任务正在执行,再次进来一个任务2,则通过_dispatch_once_wait函数让任务2进入无限次等待
4.1.1 _dispatch_once_gate_tryenter 解锁

查看其源码,主要是通过底层os_atomic_cmpxchg方法进行对比,如果比较没有问题,则进行加锁,即任务的标识符置为DLOCK_ONCE_UNLOCKED

DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
    return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
            (uintptr_t)_dispatch_lock_value_for_self(), relaxed);//首先对比,然后进行改变
}
4.1.2 _dispatch_once_callout 回调

step1: 进入_dispatch_once_callout

DISPATCH_NOINLINE
static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
        dispatch_function_t func)
{
    _dispatch_client_callout(ctxt, func);//block调用执行
    _dispatch_once_gate_broadcast(l);//进行广播:告诉别人有了归属,不要找我了

主要就两步:

  1. _dispatch_client_callout:block回调执行

  2. _dispatch_once_gate_broadcast:进行广播

step2: 进入_dispatch_client_callout源码,主要就是执行block回调,其中的f等于_dispatch_Block_invoke(block),即异步回调

#undef _dispatch_client_callout
void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
    @try {
        return f(ctxt);
    }
    @catch (...) {
        objc_terminate();
    }
}

step3: 进入 _dispatch_once_gate_broadcast --> _dispatch_once_mark_done源码

DISPATCH_ALWAYS_INLINE
static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
    //如果不相同,直接改为相同,然后上锁 -- DLOCK_ONCE_DONE
    return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}

主要就是给dgo->dgo_once一个值,然后将任务的标识符为DLOCK_ONCE_DONE,即解锁

总结

针对单例的底层实现,主要说明如下:

单例只执行一次的原理】:GCD单例中,有两个重要参数,onceTokenblock,其中onceToken是静态变量,具有唯一性,在底层被封装成了dispatch_once_gate_t类型的变量l,l主要是用来获取底层原子封装性的关联,即变量v,通过v来查询任务的状态,如果此时v等于DLOCK_ONCE_DONE,说明任务已经处理过一次了,直接return

block调用时机】:如果此时任务没有执行过,则会在底层通过C++函数的比较,将任务进行加锁,即任务状态置为DLOCK_ONCE_UNLOCK,目的是为了保证当前任务执行的唯一性,防止在其他地方有多次定义。加锁之后进行block回调函数的执行,执行完成后,将当前任务解锁,将当前的任务状态置为DLOCK_ONCE_DONE,在下次进来时,就不会在执行,会直接返回

多线程影响】:如果在当前任务执行期间,有其他任务进来,会进入无限次等待,原因是当前任务已经获取了锁,进行了加锁,其他任务是无法获取锁的

单例的底层流程分析如下如所示:

5 栅栏函数

GCD中常用的栅栏函数,主要有两种:

名称 作用 缺点
同步栅栏函数dispatch_barrier_sync,在主线程中执行 前面的任务执行完毕才会来到这里 堵塞线程
异步栅栏函数dispatch_barrier_async 前面的任务执行完毕才会来到这里 堵塞队列

栅栏函数最直接的作用就是:控制任务执行顺序,使同步执行

栅栏函数需要注意一下几点:

  • 栅栏函数只能控制同一并发队列

  • 同步栅栏添加进入队列的时候,当前线程会被锁死直到同步栅栏之前的任务和同步栅栏任务本身执行完毕时,当前线程才会打开然后继续执行下一句代码。

  • 使用栅栏函数时,使用自定义队列才有意义,如果用的是串行队列或者系统提供的全局并发队列,这个栅栏函数的作用等同于一个同步函数的作用,没有任何意义

5.1 代码调试

5.1.1 异步栅栏函数
- (void)wbinterDemo1{
    
    dispatch_queue_t queue1 = dispatch_queue_create("com.lbh.com", DISPATCH_QUEUE_CONCURRENT);
    
    dispatch_async(queue1, ^{
        NSLog(@"1-%@",[NSThread currentThread]);
    });
    dispatch_async(queue1, ^{
//        sleep(1);
        NSLog(@"2-%@",[NSThread currentThread]);
    });
    
    dispatch_barrier_async(queue1, ^{
//        sleep(2);
        NSLog(@"3-%@",[NSThread currentThread]);
    });
    dispatch_async(queue1, ^{
        NSLog(@"4-%@",[NSThread currentThread]);
    });
}

运行结果

分析

5.1.2 同步栅栏函数
- (void)wbinterDemo2{
    
    dispatch_queue_t queue1 = dispatch_queue_create("com.lbh.com", DISPATCH_QUEUE_CONCURRENT);
    
    dispatch_async(queue1, ^{
//        sleep(2);
        NSLog(@"1=%@=%@",[NSThread currentThread],[NSDate date]);
    });
    
    dispatch_async(queue1, ^{
        NSLog(@"2=%@=%@",[NSThread currentThread],[NSDate date]);
    });
    
    dispatch_barrier_sync(queue1, ^{
        NSLog(@"3=%@=%@",[NSThread currentThread],[NSDate date]);
    });
    
    dispatch_async(queue1, ^{
        
        NSLog(@"4=%@=%@",[NSThread currentThread],[NSDate date]);
    });
}

运行结果

5.2 使用问题

代码
- (void)interDemo3
{
    dispatch_queue_t queue1 = dispatch_queue_create("com.lbh.com", DISPATCH_QUEUE_CONCURRENT);
    
    NSMutableArray *array = [NSMutableArray array];
    
    
    for (int i = 0; i < 10000; i++) {
        
        dispatch_async(queue1, ^{
            [array addObject:[NSString stringWithFormat:@"%d",i]];
        });
    }
}
运行结果

问题: 为什么会崩溃?

分析

objc源码中找到addObject:源码

- (id)addObject:anObject
{
    return [self insertObject:anObject at:numElements]; 
}

⬇️

- (id)insertObject:anObject at:(unsigned)index
{
    register id *this, *last, *prev;
    if (! anObject) return nil;
    if (index > numElements)
        return nil;
    if ((numElements + 1) > maxElements) {
    volatile id *tempDataPtr;
    /* we double the capacity, also a good size for malloc */
    // 这里在数组超过一定的空间之后就进行了双倍的扩容
    maxElements += maxElements + 1;
    // 这里数组tempDataPtr 进行了realloc操作  所以在多个线程同时访问的时候就会出现问题
    tempDataPtr = (id *) realloc (dataPtr, DATASIZE(maxElements));
    dataPtr = (id*)tempDataPtr;
    }
    this = dataPtr + numElements;
    prev = this - 1;
    last = dataPtr + index;
    while (this > last) 
    *this-- = *prev--;
    *last = anObject;
    numElements++;
    return self;
}

可以看到,当数组的容量超过maxElements的时候就会maxElements += maxElements + 1;,并且进行realloc重新创建了一个新的数组的操作,在多线程的操作,如果数组添加的元素太多就会出现给旧数组添加元素的时候,旧的数组其实已经被替代的情况,这样就出现了崩溃

解决方法

1、数组初始化时给足够大的空间

2、利用栅栏函数

问题:为什么不能使用同步栅栏函数?

3、使用互斥锁 @synchronized (self) {}

5.3 底层原理

5.3.1 异步栅栏函数 底层原理

进入dispatch_barrier_async源码实现

#ifdef __BLOCKS__
void
dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_BARRIER;
    dispatch_qos_t qos;

    qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
    _dispatch_continuation_async(dq, dc, qos, dc_flags);
}
#endif

在分析dispatch_async的底层实现时,已经知道dispatch_async的本质其实就是dispatch_barrier_async,这里就不在进行分析

5.3.2 同步栅栏函数 底层原理

进入dispatch_barrier_sync源码,实现如下

void
dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
    if (unlikely(_dispatch_block_has_private_data(work))) {
        return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
    }
    _dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
5.3.2.1 _dispatch_barrier_sync_f_inline

进入 _dispatch_barrier_sync_f --> _dispatch_barrier_sync_f_inline源码

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    dispatch_tid tid = _dispatch_tid_self();//获取线程的id,即线程的唯一标识
    
    ...
    
    //判断线程状态,需不需要等待,是否回收
    if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {//栅栏函数也会死锁
        return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,//没有回收
                DC_FLAG_BARRIER | dc_flags);
    }
    //验证target是否存在,如果存在,加入栅栏函数的递归查找 是否等待
    if (unlikely(dl->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func,
                DC_FLAG_BARRIER | dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
    _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
            DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                    dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));//执行
}

源码主要有分为以下几部分:

part1: 通过_dispatch_tid_self获取线程ID

part2: 通过_dispatch_queue_try_acquire_barrier_sync判断线程状态

part2.1: 进入_dispatch_queue_try_acquire_barrier_sync_and_suspend源码

part3: 通过_dispatch_sync_recurse递归查找栅栏函数的target

part4: 通过_dispatch_introspection_sync_begin对向前信息进行处理

part5: 通过_dispatch_lane_barrier_sync_invoke_and_complete执行block并释放

6 信号量 dispatch_semaphore_t

信号量的作用一般是用来使任务同步执行,类似于互斥锁,用户可以根据需要控制GCD最大并发数,一般是这样使用的

//信号量
dispatch_semaphore_t sem = dispatch_semaphore_create(1);

dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
dispatch_semaphore_signal(sem);

6.1 底层原理

选择几个重要的函数,对其底层进行分析

6.1.1 dispatch_semaphore_create 创建

该函数的底层实现如下,主要是初始化信号量,并设置GCD的最大并发数,其最大并发数必须大于0

dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
    dispatch_semaphore_t dsema;

    // If the internal value is negative, then the absolute of the value is
    // equal to the number of waiting threads. Therefore it is bogus to
    // initialize the semaphore with a negative value.
    if (value < 0) {
        return DISPATCH_BAD_INPUT;
    }

    dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
            sizeof(struct dispatch_semaphore_s));
    dsema->do_next = DISPATCH_OBJECT_LISTLESS;
    dsema->do_targetq = _dispatch_get_default_queue(false);
    dsema->dsema_value = value;
    _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
    dsema->dsema_orig = value;
    return dsema;
}
6.1.2 dispatch_semaphore_wait 加锁

step1: 进入dispatch_semaphore_wait源码

long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
    // dsema_value 进行 -- 操作
    long value = os_atomic_dec2o(dsema, dsema_value, acquire);
    if (likely(value >= 0)) {//表示执行操作无效,即执行成功
        return 0;
    }
    return _dispatch_semaphore_wait_slow(dsema, timeout);//长等待
}

该函数的源码实现如下,其主要作用是对信号量dsema通过os_atomic_dec2o进行了--操作,其内部是执行的C++的atomic_fetch_sub_explicit方法

  • 如果value 大于等于0,表示操作无效,即执行成功

  • 如果value 等于LONG_MIN,系统会抛出一个crash

  • 如果value 小于0,则进入长等待

其中os_atomic_dec2o的宏定义转换如下

#define os_atomic_inc2o(p, f, m) \
        os_atomic_add2o(p, f, 1, m)

👇

#define os_atomic_add2o(p, f, v, m) \
        os_atomic_add(&(p)->f, (v), m)
👇

#define os_atomic_add(p, v, m) \
        _os_atomic_c11_op((p), (v), m, add, +)

👇
#define _os_atomic_c11_op(p, v, m, o, op) \
        ({ _os_atomic_basetypeof(p) _v = (v), _r = \
        atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), _v, \
        memory_order_##m); (__typeof__(_r))(_r op _v); })

将具体的值代入为

上一篇 下一篇

猜你喜欢

热点阅读