底层原理

GCD 底层源码分析(一)

2021-08-15  本文已影响0人  晨曦的简书

GCD 初探函数和队列

GCD 简介

什么是GCD?

全称是 Grand Central Dispatch,纯 C 语言,提供了非常多强大的函数。

GCD的优势

函数

队列

队列分为串行队列和并发队列(队列也是一种数据结构),队列只有调度没有执行,执行需要线程来操作,而线程依赖于线程池。

函数与队列

主队列与全局队列

主队列

全局并发队列

死锁现象

这里我们通过两个案例来看一下。

  1. 案例 1


这里先创建一个并发队列,因为 2, 3, 4 是在异步函数中执行的,且不管是异步函数还是同步函数都是耗时操作,因为这里是并发队列,所以异步函数中的任务并不会影响后面任务的执行,所以 5 的打印大概率比异步函数中的任务先执行。又因为 3 的打印是在同步函数中执行的,所以同步函数中的任务执行完才会打印 4,所以最后的打印顺序是 1, 5, 2, 3, 4。这里是并发队列执行的情况,那么当我们改成同步队列会是什么情况呢?下面我们来看一下。

  1. 案例 2



当我们改成同步队列之后可以看到就会报死锁错误,具体原因就是:首先同步队列遵循 FIFO 特性,而 dispatch_sync 的特性就是保证块内的任务先执行完才能执行后面的任务,所以 4 任务在 dispatch_sync 块任务之后,而块内的 3 任务又在 4 任务之后,因为 3 是块内的任务,所以 4 要等待 dispatch_sync 块内的任务执行完才能执行,而 3 任务又要等待 4 任务,所以就造成了循环等待。

串行队列与并发队列底层源码

// 串行队列
dispatch_queue_t serial = dispatch_queue_create("chenxi", DISPATCH_QUEUE_SERIAL);
// 并发队列
dispatch_queue_t conque = dispatch_queue_create("chenxi", DISPATCH_QUEUE_CONCURRENT);

当我们创建并发队列或者串行队列的时候都会调用 dispatch_queue_create 方法,现在我们来探究一下 dispatch_queue_create 方法底层是如何实现的。

dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
    return _dispatch_lane_create_with_target(label, attr,
            DISPATCH_TARGET_QUEUE_DEFAULT, true);
}

_dispatch_lane_create_with_target 方法中我们主要看返回值 _dispatch_trace_queue_create(dq)._dq,因为 _dispatch_trace_queue_create 主要是开启追踪,所以我们重点看 dq

dispatch_lane_t dq = _dispatch_object_alloc(vtable,
            sizeof(struct dispatch_lane_s));
// _dispatch_queue_init 第三个参数会判断 dqai.dqai_concurrent(是否是并发),是的话就传 DISPATCH_QUEUE_WIDTH_MAX,不是的话就传 1
_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
            DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
            (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));

dq 我们重点看这两行代码,申请内存跟初始化,_dispatch_queue_init 方法第三个参数会判断 dqai.dqai_concurrentdqai.dqai_concurrent 有值就传 DISPATCH_QUEUE_WIDTH_MAX,没有值话就传 1,dqai = _dispatch_queue_attr_to_info(dqa),接着我们看 _dispatch_queue_attr_to_info 方法。

dispatch_queue_attr_info_t
_dispatch_queue_attr_to_info(dispatch_queue_attr_t dqa)
{
    dispatch_queue_attr_info_t dqai = { };

    if (!dqa) return dqai;

#if DISPATCH_VARIANT_STATIC
    if (dqa == &_dispatch_queue_attr_concurrent) {
        dqai.dqai_concurrent = true;
        return dqai;
    }
#endif

    if (dqa < _dispatch_queue_attrs ||
            dqa >= &_dispatch_queue_attrs[DISPATCH_QUEUE_ATTR_COUNT]) {
#ifndef __APPLE__
        if (memcmp(dqa, &_dispatch_queue_attrs[0],
                sizeof(struct dispatch_queue_attr_s)) == 0) {
            dqa = (dispatch_queue_attr_t)&_dispatch_queue_attrs[0];
        } else
#endif // __APPLE__
        DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
    }

    size_t idx = (size_t)(dqa - _dispatch_queue_attrs);

    dqai.dqai_inactive = (idx % DISPATCH_QUEUE_ATTR_INACTIVE_COUNT);
    idx /= DISPATCH_QUEUE_ATTR_INACTIVE_COUNT;

    dqai.dqai_concurrent = !(idx % DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT);
    idx /= DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT;

_dispatch_queue_attr_to_info 方法中 dqa 就是我们平时创建并发队列或者串行队列调用 dispatch_queue_create 方法传的第二个参数,当 !dqa 就直接返回 dqai,有值才会往下走 dqai.dqai_concurrent 赋值,我们平时传的 DISPATCH_QUEUE_SERIAL 参数宏定义其实就是 NULL#define DISPATCH_QUEUE_SERIAL NULL),所以 dqai.dqai_concurrent 有值代表是并发队列。接着我们再看一下 _dispatch_queue_init 方法。

static inline dispatch_queue_class_t
_dispatch_queue_init(dispatch_queue_class_t dqu, dispatch_queue_flags_t dqf,
        uint16_t width, uint64_t initial_state_bits)
{
    uint64_t dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(width);
    dispatch_queue_t dq = dqu._dq;

    dispatch_assert((initial_state_bits & ~(DISPATCH_QUEUE_ROLE_MASK |
            DISPATCH_QUEUE_INACTIVE)) == 0);

    if (initial_state_bits & DISPATCH_QUEUE_INACTIVE) {
        dq->do_ref_cnt += 2; // rdar://8181908 see _dispatch_lane_resume
        if (dx_metatype(dq) == _DISPATCH_SOURCE_TYPE) {
            dq->do_ref_cnt++; // released when DSF_DELETED is set
        }
    }

    dq_state |= initial_state_bits;
    dq->do_next = DISPATCH_OBJECT_LISTLESS;
    // 这里 width 为 1 的时候为串行队列
    dqf |= DQF_WIDTH(width);
    os_atomic_store2o(dq, dq_atomic_flags, dqf, relaxed);
    dq->dq_state = dq_state;
    dq->dq_serialnum =
            os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed);
    return dqu;
}

这里可以看到 DQF_WIDTH(width)width 为 1 的时候为串行队列,而 dq_serialnum 就是一个标识。

主队列

dispatch_queue_t mainQueue = dispatch_get_main_queue();

* Because the main queue doesn't behave entirely like a regular serial queue,
 * it may have unwanted side-effects when used in processes that are not UI apps
 * (daemons). For such processes, the main queue should be avoided.
 *
 * @see dispatch_queue_main_t
 *
 * @result
 * Returns the main queue. This queue is created automatically on behalf of
 * the main thread before main() is called.
 */
DISPATCH_INLINE DISPATCH_ALWAYS_INLINE DISPATCH_CONST DISPATCH_NOTHROW
dispatch_queue_main_t
dispatch_get_main_queue(void)
{
    return DISPATCH_GLOBAL_OBJECT(dispatch_queue_main_t, _dispatch_main_q);
}

我们平时调用 dispatch_get_main_queue 方法来获取主队列,但我们点进去可以看到相关注释,讲主队列与普通串行队列不同,代表是一种特殊的串行队列,在 main 函数之前执行,接着我们来探究一下 dispatch_get_main_queue 方法的底层实现。

dispatch_queue_main_t
dispatch_get_main_queue(void)
{
    return DISPATCH_GLOBAL_OBJECT(dispatch_queue_main_t, _dispatch_main_q);
}

// 通过宏定义可以看到,type 只是类型,真正的 对象 object 为第二个参数,也就是 _dispatch_main_q
#define DISPATCH_GLOBAL_OBJECT(type, object) ((type)&(object))
struct dispatch_queue_static_s _dispatch_main_q = {
    DISPATCH_GLOBAL_OBJECT_HEADER(queue_main),
#if !DISPATCH_USE_RESOLVERS
    .do_targetq = _dispatch_get_default_queue(true),
#endif
    .dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
            DISPATCH_QUEUE_ROLE_BASE_ANON,
    .dq_label = "com.apple.main-thread",
// 上面分析串行队列的时候已经分析过了,DQF_WIDTH(width),width 为 1 的时候为串行队列
    .dq_atomic_flags = DQF_THREAD_BOUND | DQF_WIDTH(1),
    .dq_serialnum = 1,
};

全局并发队列

dispatch_get_global_queue(0, 0)

进入全局队列定义的地方,见下图


创建全局并发队列时可以传参数,根据不同服务质量或者优先等级提供不同的并发队列。那么我们可以得出一个结论:应该有一个全局的集合,去维护这些并发队列。

得到一个队列集合,根据不同的服务质量提供不同的全局队列,见下图:

总结:系统会维护一个全局队列集合,根据不同的服务质量或者优先等级提供不同的全局队列。我们在开发工作中默认使用:dispatch_get_global_queue(0, 0)

GCD的任务执行堆栈(同步)

dispatch_sync(dispatch_get_global_queue(0, 0), ^{
        NSLog(@"gcd 函数分析");
    });

我们平时执行类似这段代码的时候,会自动执行打印,并没有我们平时看到的 block() 调用,那么这里的 block 是什么时候调用的呢?我们再来看一下源码。

void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    uintptr_t dc_flags = DC_FLAG_BLOCK;
    if (unlikely(_dispatch_block_has_private_data(work))) {
        return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
    }
// _dispatch_Block_invoke(work) 就是一个包装函数
    _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}

这里 work 就是传进来的 block,所以重点看一下 work 去哪了。_dispatch_sync_f 函数调用的时候参数传入了 work,所以我们看下 _dispatch_sync_f 函数的调用。

static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
        uintptr_t dc_flags)
{
    _dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}

这里 *ctxt 就是传进来的 work,因为这里调用了 _dispatch_sync_f_inline 函数,所以我们继续看 _dispatch_sync_f_inline 函数的实现。

_dispatch_sync_f_inline 函数中我们一样应该重点看 ctxt 的调用,这里调用的地方比较多,我们就来通过下符号断点看看具体执行了哪句代码。

通过断点可以看到这里执行了 _dispatch_sync_f_slow,所以继续看 _dispatch_sync_f_slow 函数的实现。

static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
        dispatch_function_t func, uintptr_t top_dc_flags,
        dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
    dispatch_queue_t top_dq = top_dqu._dq;
    dispatch_queue_t dq = dqu._dq;
    if (unlikely(!dq->do_targetq)) {
        return _dispatch_sync_function_invoke(dq, ctxt, func);
    }

    pthread_priority_t pp = _dispatch_get_priority();
    struct dispatch_sync_context_s dsc = {
        .dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
        .dc_func     = _dispatch_async_and_wait_invoke,
        .dc_ctxt     = &dsc,
        .dc_other    = top_dq,
        .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
        .dc_voucher  = _voucher_get(),
        .dsc_func    = func,
        .dsc_ctxt    = ctxt,
        .dsc_waiter  = _dispatch_tid_self(),
    };

    _dispatch_trace_item_push(top_dq, &dsc);
    __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

    if (dsc.dsc_func == NULL) {
        // dsc_func being cleared means that the block ran on another thread ie.
        // case (2) as listed in _dispatch_async_and_wait_f_slow.
        dispatch_queue_t stop_dq = dsc.dc_other;
        return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
    }

    _dispatch_introspection_sync_begin(top_dq);
    _dispatch_trace_item_pop(top_dq, &dsc);
    _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
            DISPATCH_TRACE_ARG(&dsc));
}

在这里我们同样下符号断点会执行 _dispatch_sync_function_invoke,所以看 _dispatch_sync_function_invoke 函数的实现。

static void
_dispatch_sync_function_invoke(dispatch_queue_class_t dq, void *ctxt,
       dispatch_function_t func)
{
   _dispatch_sync_function_invoke_inline(dq, ctxt, func);
}

static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
       dispatch_function_t func)
{
   dispatch_thread_frame_s dtf;
   _dispatch_thread_frame_push(&dtf, dq);
   _dispatch_client_callout(ctxt, func);
   _dispatch_perfmon_workitem_inc();
   _dispatch_thread_frame_pop(&dtf);
}

这里只有 _dispatch_client_callout 函数用了 ctxtfunc。所以继续看 _dispatch_client_callout 函数。

static inline void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
    return f(ctxt);
}

这里给 f 传了一个调用执行 ctxt,跟 block 类似。

GCD的任务执行堆栈(异步)

这里我们继续来看一下异步函数的实现。

void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME;
    dispatch_qos_t qos;

    qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
        dispatch_queue_class_t dqu, dispatch_block_t work,
        dispatch_block_flags_t flags, uintptr_t dc_flags)
{
    void *ctxt = _dispatch_Block_copy(work);

    dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
    if (unlikely(_dispatch_block_has_private_data(work))) {
        dc->dc_flags = dc_flags;
        dc->dc_ctxt = ctxt;
        // will initialize all fields but requires dc_flags & dc_ctxt to be set
        return _dispatch_continuation_init_slow(dc, dqu, flags);
    }

    dispatch_function_t func = _dispatch_Block_invoke(work);
    if (dc_flags & DC_FLAG_CONSUME) {
        func = _dispatch_call_block_and_release;
    }
    return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}
static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
        dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
        dispatch_block_flags_t flags, uintptr_t dc_flags)
{
    pthread_priority_t pp = 0;
    dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;
    dc->dc_func = f;
    dc->dc_ctxt = ctxt;
    // in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority
    // should not be propagated, only taken from the handler if it has one
    if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
        pp = _dispatch_priority_propagate();
    }
    _dispatch_continuation_voucher_set(dc, flags);
    return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}

这里与同步函数不一样的是这里会把 f, ctxt 分别赋值给了 dc->dc_funcdc->dc_ctxt,并在 _dispatch_continuation_priority_set 函数中进行了优先级的处理。在这里进行优先级处理是因为异步函数代表异步调用,会产生无序的情况,优先级就会作为参考衡量的依据;还因为函数是异步调用的,所以要先用 dc 保存函数的调用,需要的时候再取出来进行调用。所以在 dispatch_async 函数中 qos 等于 _dispatch_continuation_init 函数的返回值,所以这里重点还是要要看下 qos 去了哪里,所以要看 _dispatch_continuation_async 函数的实现。

static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
        dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
    if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
        _dispatch_trace_item_push(dqu, dc);
    }
#else
    (void)dc_flags;
#endif
    return dx_push(dqu._dq, dc, qos);
}
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)

搜索可以看到 dx_push 的宏定义,这里 z 就是 qos,所以重点关注 z,这里就看 dq_push(x, y, z)

这里并发 .dq_push = _dispatch_lane_concurrent_push,全局并发 .dq_push = _dispatch_root_queue_push。这里我们全局搜索 _dispatch_root_queue_push 方法的实现:

在此流程中,前面只是做一些判断封装处理,最终会走到最后一行代码 _dispatch_root_queue_push_inline 中,继续跟踪器源码流程:

_dispatch_root_queue_push_inline 中调用了 _dispatch_root_queue_poke 方法,_dispatch_root_queue_poke 中的核心流程为 _dispatch_root_queue_poke_slow,见下图所示:

_dispatch_root_queue_poke_slow 中有一个关键流程,_dispatch_root_queues_init(),见下图:

进入 _dispatch_root_queues_init() 方法,在该方法中采用的单例处理,见下图:

进入 _dispatch_root_queues_init_once 方法,这里做了什么呢?见下图:

在该方法中进行了线程池的初始化处理、工作队列的配置、工作队列的初始化等工作,这也就是解释了为什么 _dispatch_root_queues_init_once 是单例的。单例可以避免重复的初始化。
同时这里有一个关键的设置,执行函数的设置,也就是将任务执行的函数被统一设置成了 _dispatch_worker_thread2,见下面代码:

cfg.workq_cb = _dispatch_worker_thread2;

我们可以通过 bt 打印运行堆栈信息,来验证异步函数最终任务是通过 _dispatch_worker_thread2 调用的。见下图所示:

总结:通过跟踪异步处理流程,系统针对不同的队列类型,执行不同的dq_push的方法,并通过单例的形式完成了线程池的初始化、工作队列的配置等工作,并且底层最终通过_dispatch_worker_thread2完成了异步函数中任务的调用执行。

相关面试题

面试题 1

- (void)wbinterDemo{
    dispatch_queue_t queue = dispatch_queue_create("com.lg.cooci.cn", DISPATCH_QUEUE_SERIAL);

    dispatch_async(queue, ^{
        NSLog(@"1");
    });
    
    dispatch_async(queue, ^{
        NSLog(@"2");
    });

    dispatch_sync(queue, ^{ NSLog(@"3"); });
    
    NSLog(@"0");

    dispatch_async(queue, ^{
        NSLog(@"7");
    });
    dispatch_async(queue, ^{
        NSLog(@"8");
    });
    dispatch_async(queue, ^{
        NSLog(@"9");
    });

    // A: 1230789
    // B: 1237890
    // C: 3120798
    // D: 2137890
}

因为 dispatch_sync 只会阻塞后面的任务,所以 0, 7, 8, 9 只会在 3 之后打印,但是 1 跟 2 是无序的,0, 7, 8, 9 也是没有执行先后顺序的,所以正确答案是 A, C。

面试题 2

- (void)MTDemo{
    self.num = 0;
    while (self.num < 5) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            self.num++;
        });
    }
    NSLog(@"end : %d",self.num); 
}

这里输出的结果会是大于等于 5,首先因为这里是 while 循环,所以 self.num 不大于等于 5 的情况下不会执行打印,然后因为是并发队列异步执行,所以这里会开启多条线程,所以就有可能 self.num 刚满足大于等于 5 的条件,跳出循环的时候别的线程正好开始执行任务,然后就会对 self.num 再次进行加加运算,所以最后的打印结果大于等于 5。

面试题 3

- (void)KSDemo {
    for (int i= 0; i<10000; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            self.num++;
        });
    }
    NSLog(@"end : %d",self.num); 
}

这里因为判断条件是一个新的变量 i,只会循环一万次,然后因为是多线程,所以当跳出循环的时候有的线程任务并没有执行完成,所以最后的打印结果小于等于 10000。

上一篇 下一篇

猜你喜欢

热点阅读