GCD

2019-03-14  本文已影响0人  CrystalZhu

Grand Central Dispatch (GCD)

转载自:https://blog.boolchow.com/2018/04/06/iOS-Concurrency-Programming/
GCD 是基于 C 实现的一套 API,而且是开源的,如果有兴趣,可以在 这里 down 一份源码研究一下。GCD 是由系统帮我们处理多线程调度,很是方便,也是使用频率最高的。这一章节主要讲解一下 GCD 的原理和使用。

在讲解之前,我们先有个概览,看一下 GCD 为我们提供了那些东西:

GCD总体结构

系统所提供的 API,完全可以满足我们日常开发需求了。下面就根据这些模块分别讲解一下。

1. Dispatch Queue

GCD 为我们提供了两类队列,串行队列并行队列。两者的区别是:

除此之外,还要解释一个容易混淆的概念,并发并行

最后,还有一个概念,同步异步

队列

我们使用时,一般使用这几个队列:

  - (void)viewDidLoad {
    [super viewDidLoad];
        dispatch_async(dispatch_get_main_queue(),   ^{
          // UI 相关操作
       });
}
dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
   dispatch_async(queue, ^{
       // 相关操作
   });
dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch",DISPATCH_QUEUE_SERIAL);

可以说,系统为我们提供了 5 中不同的队列,运行在主线程中的 main queue;3 个不同优先级的 global queue; 一个优先级更低的 background queue。除此之外,开发者可以自定义一些串行和并行队列,这些自定义队列中被调度的所有 block 最终都会被放到系统全局队列和线程池中,后面会讲这部分原理。盗用一张经典图:

gcd-queues
同步 VS 异步

我们大多数情况下,都是使用 dispatch_asyn() 做异步操作,因为程序本来就是顺序执行,很少用到同步操作。有时候我们会把 dispatch_syn() 当做锁来用,以达到保护的作用。

系统维护的是一个队列,根据 FIFO 的规则,将 dispatch 到队列中的任务一一执行。有时候我们想把一些任务延后执行以下,例如 App 启动时,我想让主线程中一个耗时的工作放在后,可以尝试用一下 dispatch_asyn(),相当于把任务重新追加到了队尾。

dispatch_async(dispatch_get_main_queue(), ^{
        // 想要延后的任务
    });

通常情况下,我们使用 dispatch_asyn() 是不会造成死锁的。死锁一般出现在使用 dispatch_syn()的时候。例如:

dispatch_sync(dispatch_get_main_queue(), ^{
   NSLog(@"dead lock");
});

想上面这样写,启动就会报错误。以下情况也如此:

dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
    dispatch_async(queue, ^{
        NSLog(@"dispatch asyn");
        dispatch_sync(queue, ^{
            NSLog(@"dispatch asyn -> dispatch syn");
        });
    });

在上面的代码中,dispatch_asyn() 整个 block(称作 blcok_asyn) 当做一个任务追加到串行队列队尾,然后开始执行。在 block_asyn 内部中,又进行了 dispatch_syn(),想想要执行 block_syn。因为是串行队列,需要前一个执行完(block_asyn),再执行后面一个(block_syn);但是要执行完 block_asyn,需要执行内部的 block_syn。互相等待,形成死锁。

现实开发中,还有更复杂的死锁场景。不过现在编译器很友好,我们能在编译执行时就检测到了。

基本原理

针对下面这几行代码,我们分析一下它的底层过程:

- (void)viewDidLoad {
    [super viewDidLoad];
    dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
    dispatch_async(queue, ^{
        NSLog(@"dispatch asyn test");
    });
}

创建队列

源码很长,但实际只有一个方法,逻辑比较清晰,如下:

/** 开发者调用的方法 */
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
    return _dispatch_queue_create_with_target(label, attr,
            DISPATCH_TARGET_QUEUE_DEFAULT, true);
}

/** 内部实际调用方法 */
DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
        dispatch_queue_t tq, bool legacy)
{
    // 1.初步判断
    if (!slowpath(dqa)) {
        dqa = _dispatch_get_default_queue_attr();
    } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
        DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
    }

    // 2.配置队列参数
    dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);
#if !HAVE_PTHREAD_WORKQUEUE_QOS
    if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
        qos = DISPATCH_QOS_USER_INITIATED;
    }
    if (qos == DISPATCH_QOS_MAINTENANCE) {
        qos = DISPATCH_QOS_BACKGROUND;
    }
#endif // !HAVE_PTHREAD_WORKQUEUE_QOS

    _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
    if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
        if (tq->do_targetq) {
            DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
                    "a non-global target queue");
        }
    }

    if (tq && !tq->do_targetq &&
            tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
        // Handle discrepancies between attr and target queue, attributes win
        if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
            if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
                overcommit = _dispatch_queue_attr_overcommit_enabled;
            } else {
                overcommit = _dispatch_queue_attr_overcommit_disabled;
            }
        }
        if (qos == DISPATCH_QOS_UNSPECIFIED) {
            dispatch_qos_t tq_qos = _dispatch_priority_qos(tq->dq_priority);
            tq = _dispatch_get_root_queue(tq_qos,
                    overcommit == _dispatch_queue_attr_overcommit_enabled);
        } else {
            tq = NULL;
        }
    } else if (tq && !tq->do_targetq) {
        // target is a pthread or runloop root queue, setting QoS or overcommit
        // is disallowed
        if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
            DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
                    "and use this kind of target queue");
        }
        if (qos != DISPATCH_QOS_UNSPECIFIED) {
            DISPATCH_CLIENT_CRASH(tq, "Cannot specify a QoS attribute "
                    "and use this kind of target queue");
        }
    } else {
        if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
             // Serial queues default to overcommit!
            overcommit = dqa->dqa_concurrent ?
                    _dispatch_queue_attr_overcommit_disabled :
                    _dispatch_queue_attr_overcommit_enabled;
        }
    }
    if (!tq) {
        tq = _dispatch_get_root_queue(
                qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
                overcommit == _dispatch_queue_attr_overcommit_enabled);
        if (slowpath(!tq)) {
            DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
        }
    }

    // 3. 初始化队列
    if (legacy) {
        // if any of these attributes is specified, use non legacy classes
        if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
            legacy = false;
        }
    }

    const void *vtable;
    dispatch_queue_flags_t dqf = 0;
    if (legacy) {
        vtable = DISPATCH_VTABLE(queue);
    } else if (dqa->dqa_concurrent) {
        vtable = DISPATCH_VTABLE(queue_concurrent);
    } else {
        vtable = DISPATCH_VTABLE(queue_serial);
    }
    switch (dqa->dqa_autorelease_frequency) {
    case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
        dqf |= DQF_AUTORELEASE_NEVER;
        break;
    case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
        dqf |= DQF_AUTORELEASE_ALWAYS;
        break;
    }
    if (legacy) {
        dqf |= DQF_LEGACY;
    }
    if (label) {
        const char *tmp = _dispatch_strdup_if_mutable(label);
        if (tmp != label) {
            dqf |= DQF_LABEL_NEEDS_FREE;
            label = tmp;
        }
    }

    dispatch_queue_t dq = _dispatch_object_alloc(vtable,
            sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
    _dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
            DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
            (dqa->dqa_inactive ? DISPATCH_QUEUE_INACTIVE : 0));

    dq->dq_label = label;
#if HAVE_PTHREAD_WORKQUEUE_QOS
    dq->dq_priority = dqa->dqa_qos_and_relpri;
    if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
        dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    }
#endif
    _dispatch_retain(tq);
    if (qos == QOS_CLASS_UNSPECIFIED) {
        // legacy way of inherithing the QoS from the target
        _dispatch_queue_priority_inherit_from_target(dq, tq);
    }
    if (!dqa->dqa_inactive) {
        _dispatch_queue_inherit_wlh_from_target(dq, tq);
    }
    dq->do_targetq = tq;
    _dispatch_object_debug(dq, "%s", __func__);
    return _dispatch_introspection_queue_create(dq);
}

根据代码生成的流程图,不想看代码直接看图,下同:

Create_Queue

根据流程图,这个方法的步骤如下:

异步执行

这个版本异步执行的代码,因为方法拆分很多,所以显得很乱。源码如下:

/** 开发者调用 */
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;

    _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
    _dispatch_continuation_async(dq, dc);
}

/** 内部调用,包一层,再深入调用 */
DISPATCH_NOINLINE
void
_dispatch_continuation_async(dispatch_queue_t dq, dispatch_continuation_t dc)
{
    _dispatch_continuation_async2(dq, dc,
            dc->dc_flags & DISPATCH_OBJ_BARRIER_BIT);
}

/** 根据 barrier 关键字区别串行还是并行,分两支 */
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
        bool barrier)
{
    if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
        // 串行
        return _dispatch_continuation_push(dq, dc);
    }
    
    // 并行
    return _dispatch_async_f2(dq, dc);
}

/** 并行又多了一层调用,就是这个方法 */
DISPATCH_NOINLINE
static void
_dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
{
    if (slowpath(dq->dq_items_tail)) {// 少路径
        return _dispatch_continuation_push(dq, dc);
    }

    if (slowpath(!_dispatch_queue_try_acquire_async(dq))) {// 少路径
        return _dispatch_continuation_push(dq, dc);
    }
    // 多路径
    return _dispatch_async_f_redirect(dq, dc,
            _dispatch_continuation_override_qos(dq, dc));
}

/** 主要用来重定向 */
DISPATCH_NOINLINE
static void
_dispatch_async_f_redirect(dispatch_queue_t dq,
        dispatch_object_t dou, dispatch_qos_t qos)
{
    if (!slowpath(_dispatch_object_is_redirection(dou))) {
        dou._dc = _dispatch_async_redirect_wrap(dq, dou);
    }
    dq = dq->do_targetq;

    // Find the queue to redirect to
    while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
        if (!fastpath(_dispatch_queue_try_acquire_async(dq))) {
            break;
        }
        if (!dou._dc->dc_ctxt) {
            dou._dc->dc_ctxt = (void *)
                    (uintptr_t)_dispatch_queue_autorelease_frequency(dq);
        }
        dq = dq->do_targetq;
    }

    // 同步异步最终都是调用的这个方法,将任务追加到队列中
    dx_push(dq, dou, qos);
}

... 省略一些调用层级,

/** 核心方法,通过 dc_flags 参数区分了是 group,还是串行,还是并行 */
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_invoke_inline(dispatch_object_t dou, voucher_t ov,
        dispatch_invoke_flags_t flags)
{
    dispatch_continuation_t dc = dou._dc, dc1;
    dispatch_invoke_with_autoreleasepool(flags, {
        uintptr_t dc_flags = dc->dc_flags;
        _dispatch_continuation_voucher_adopt(dc, ov, dc_flags);
        if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) { // 并行
            dc1 = _dispatch_continuation_free_cacheonly(dc);
        } else {
            dc1 = NULL;
        }
        if (unlikely(dc_flags & DISPATCH_OBJ_GROUP_BIT)) { // group
            _dispatch_continuation_with_group_invoke(dc);
        } else { // 串行
            _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
            _dispatch_introspection_queue_item_complete(dou);
        }
        if (unlikely(dc1)) {
            _dispatch_continuation_free_to_cache_limit(dc1);
        }
    });
    _dispatch_perfmon_workitem_inc();
}

不想看代码,直接看图:

Dispatch_Asyn

根据流程图描述一下过程:

同步执行

同步执行,相对来说比较简单,源码如下 :

/** 开发者调用 */
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    if (unlikely(_dispatch_block_has_private_data(work))) {
        return _dispatch_sync_block_with_private_data(dq, work, 0);
    }
    dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
}

/** 内部调用 */
DISPATCH_NOINLINE
void
dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
    if (likely(dq->dq_width == 1)) {
        return dispatch_barrier_sync_f(dq, ctxt, func);
    }

    // Global concurrent queues and queues bound to non-dispatch threads
    // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq))) {
        return _dispatch_sync_f_slow(dq, ctxt, func, 0);
    }

    _dispatch_introspection_sync_begin(dq);
    if (unlikely(dq->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dq, ctxt, func, 0);
    }
    _dispatch_sync_invoke_and_complete(dq, ctxt, func);
}

同步执行,相对来说简单些,大体逻辑差不多。偷懒一下,就不画图了,直接描述:

dispatch_after

dispatch_after 一般用于延后执行一些任务,可以用来代替 NSTimer,因为有时候 NSTimer 问题太多了。在后面的一章里,我会总体讲一下多线程中的问题,这里就不详细说了。一般我们这样来使用 dispatch_after :

- (void)viewDidLoad {
    [super viewDidLoad];
    dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
    dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(NSEC_PER_SEC * 2.0f)),queue, ^{
        // 2.0 second execute
    });
}

在做页面过渡时,刚进入到新的页面我们并不会立即更新一些 view,为了引起用户注意,我们会过会儿再进行更新,可以中此 API 来完成。

源码如下:

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
        void *ctxt, void *handler, bool block)
{
    dispatch_timer_source_refs_t dt;
    dispatch_source_t ds;
    uint64_t leeway, delta;

    if (when == DISPATCH_TIME_FOREVER) {
#if DISPATCH_DEBUG
        DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
#endif
        return;
    }

    delta = _dispatch_timeout(when);
    if (delta == 0) {
        if (block) {
            return dispatch_async(queue, handler);
        }
        return dispatch_async_f(queue, ctxt, handler);
    }
    leeway = delta / 10; // <rdar://problem/13447496>

    if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
    if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;

    // this function can and should be optimized to not use a dispatch source
    ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue);
    dt = ds->ds_timer_refs;

    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    if (block) {
        _dispatch_continuation_init(dc, ds, handler, 0, 0, 0);
    } else {
        _dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0);
    }
    // reference `ds` so that it doesn't show up as a leak
    dc->dc_data = ds;
    _dispatch_trace_continuation_push(ds->_as_dq, dc);
    os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed);

    if ((int64_t)when < 0) {
        // wall clock
        when = (dispatch_time_t)-((int64_t)when);
    } else {
        // absolute clock
        dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH;
        leeway = _dispatch_time_nano2mach(leeway);
    }
    dt->dt_timer.target = when;
    dt->dt_timer.interval = UINT64_MAX;
    dt->dt_timer.deadline = when + leeway;
    dispatch_activate(ds);
}

dispatch_after() 内部会调用 _dispatch_after() 方法,然后先判断延迟时间。如果为 DISPATCH_TIME_FOREVER(永远不执行),则会出现异常;如果为 0 则立即执行;否则的话会创建一个 dispatch_timer_source_refs_t 结构体指针,将上下文相关信息与之关联。然后使用 dispatch_source 相关方法,将定时器和 block 任务关联起来。定时器时间到时,取出 block 任务开始执行。

dispatch_once

如果我们有一段代码,在 App 生命周期内最好只初始化一次,这时候使用 dispatch_once 最好不过了。例如我们单例中经常这样用:

+ (instancetype)sharedManager {
    static BLDispatchManager *sharedInstance = nil;
    static dispatch_once_t onceToken;
    dispatch_once(&onceToken, ^{
        sharedInstance = [[BLDispatchManager alloc] initPrivate];
    });
    
    return sharedInstance;
}

还有在定义 NSDateFormatter 时使用:

- (NSString *)todayDateString {
    static NSDateFormatter *formatter = nil;
    static dispatch_once_t onceToken;
    dispatch_once(&onceToken, ^{
        formatter = [NSDateFormatter new];
        formatter.locale = [NSLocale localeWithLocaleIdentifier:@"en_US_POSIX"];
        formatter.timeZone = [NSTimeZone timeZoneForSecondsFromGMT:8 * 3600];
        formatter.dateFormat = @"yyyyMMdd";
    });
    
    return [formatter stringFromDate:[NSDate date]];
}

因为这是很常用的一个代码片段,所以被加在了 Xcode 的 code snippet 中。

它的源代码如下:

/** 一个结构体,里面为当前的信号量、线程端口和指向下一个节点的指针 */
typedef struct _dispatch_once_waiter_s {
    volatile struct _dispatch_once_waiter_s *volatile dow_next;
    dispatch_thread_event_s dow_event;
    mach_port_t dow_thread;
} *_dispatch_once_waiter_t;

/** 我们调用的方法 */
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
    dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}

/** 实际执行的方法 */
DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
#if !DISPATCH_ONCE_INLINE_FASTPATH
    if (likely(os_atomic_load(val, acquire) == DLOCK_ONCE_DONE)) {
        return;
    }
#endif // !DISPATCH_ONCE_INLINE_FASTPATH
    return dispatch_once_f_slow(val, ctxt, func);
}

DISPATCH_ONCE_SLOW_INLINE
static void
dispatch_once_f_slow(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
#if DISPATCH_GATE_USE_FOR_DISPATCH_ONCE
    dispatch_once_gate_t l = (dispatch_once_gate_t)val;

    if (_dispatch_once_gate_tryenter(l)) {
        _dispatch_client_callout(ctxt, func);
        _dispatch_once_gate_broadcast(l);
    } else {
        _dispatch_once_gate_wait(l);
    }
#else
    _dispatch_once_waiter_t volatile *vval = (_dispatch_once_waiter_t*)val;
    struct _dispatch_once_waiter_s dow = { };
    _dispatch_once_waiter_t tail = &dow, next, tmp;
    dispatch_thread_event_t event;

    if (os_atomic_cmpxchg(vval, NULL, tail, acquire)) {
        dow.dow_thread = _dispatch_tid_self();
        _dispatch_client_callout(ctxt, func);

        next = (_dispatch_once_waiter_t)_dispatch_once_xchg_done(val);
        while (next != tail) {
            tmp = (_dispatch_once_waiter_t)_dispatch_wait_until(next->dow_next);
            event = &next->dow_event;
            next = tmp;
            _dispatch_thread_event_signal(event);
        }
    } else {
        _dispatch_thread_event_init(&dow.dow_event);
        next = *vval;
        for (;;) {
            if (next == DISPATCH_ONCE_DONE) {
                break;
            }
            if (os_atomic_cmpxchgv(vval, next, tail, &next, release)) {
                dow.dow_thread = next->dow_thread;
                dow.dow_next = next;
                if (dow.dow_thread) {
                    pthread_priority_t pp = _dispatch_get_priority();
                    _dispatch_thread_override_start(dow.dow_thread, pp, val);
                }
                _dispatch_thread_event_wait(&dow.dow_event);
                if (dow.dow_thread) {
                    _dispatch_thread_override_end(dow.dow_thread, val);
                }
                break;
            }
        }
        _dispatch_thread_event_destroy(&dow.dow_event);
    }
#endif
}

不想看代码直接看图 (emmm… 根据逻辑画完图才发现,其实这个图也挺乱的,所以我将两个主分支用不同颜色标记处理):

Dispatch_Once

根据这个图,我来表述一下主要过程:

    if (*vval == NULL) {
    *vval = tail = &dow;
    return true;
} else {
    return false
}

我们初始化的 once_token,也就是 *vval 实际是 0,所以第一次执行时是返回 true 的。if() 中的这个方法是原子操作,也就是说,如果多个线程同时调用这个方法,只有一个线程会进入 true 的分支,其他都进入 else 分支。

    next = (_dispatch_once_waiter_t)_dispatch_once_xchg_done(val);
// 实际执行时这样的
next = *vval;
*vval = DISPATCH_ONCE_DONE;

以上为全部的执行过程。通过源码可以看出,使用的是 原子操作 + 信号量来保证 block 只会被执行多次,哪怕是在多线程情况下。

这样一个关于 dispatch_once 递归调用会产生死锁的现象,也就很好解释了。看下面代码:

- (void)dispatchOnceTest {
    static dispatch_once_t onceToken;
    dispatch_once(&onceToken, ^{
        [self dispatchOnceTest];
    });
}

通过上面分析,在 block 执行完,并将 *vval 置为 DISPATCH_ONCE_DONE 之前,其他的调用都会进入 else 分支。第二次递归调用,信号量处于等待状态,需要等到第一个 block 执行完才能被唤起;但是第一个 block 所执行的内容就是进行第二次调用,这个任务被 wait 了,也即是说 block 永远执行不完。死锁就这样发生了。

dispatch_apply

有时候没有时序性依赖的时候,我们会用 dispatch_apply 来代替 for loop。例如我们下载一组图片:

/** 使用 for loop */
- (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
    for (NSURL *imageURL in imageURLs) {
        [self downloadImageWithURL:imageURL];
    }
}

/** dispatch_apply */
- (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
    dispatch_queue_t downloadQueue = dispatch_queue_create("com.bool.download", DISPATCH_QUEUE_CONCURRENT);
    dispatch_apply(imageURLs.count, downloadQueue, ^(size_t index) {
        NSURL *imageURL = imageURLs[index];
        [self downloadImageWithURL:imageURL];
    });
}

进行替换是需要注意几个问题:

至于原理,就不大篇幅讲了。大概是这个样子:这个方法是同步的,会阻塞当前线程,直到所有的 block 任务都完成。如果提交到并发队列,每个任务执行顺序是不一定的。

更多时候,我们执行下载任务,并不希望阻塞当前线程,这时我们可以使用 dispatch_group

dispatch_group

当处理批量异步任务时,dispatch_group 是一个很好的选择。针对上面说的下载图片的例子,我们可以这样做:

- (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
    dispatch_group_t taskGroup = dispatch_group_create();
    dispatch_queue_t queue = dispatch_queue_create("com.bool.group", DISPATCH_QUEUE_CONCURRENT);
    for (NSURL *imageURL in imageURLs) {
        dispatch_group_enter(taskGroup);
        // 下载方法是异步的
        [self downloadImageWithURL:imageURL withQueue:queue completeHandler:^{
            dispatch_group_leave(taskGroup);
        }];
    }
    
    dispatch_group_notify(taskGroup, queue, ^{
        // all task finish
    });
    
    /** 如果使用这个方法,内部执行异步任务,会立即到 dispatch_group_notify 方法中,因为是异步,系统认为已经执行完了。所以这个方法使用不多。
     */
    dispatch_group_async(taskGroup, queue, ^{
        
    })
}

关于原理方面,和 dispatch_async() 方法类似,前面也提到。这里只说一段代码:

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_continuation_t dc)
{
    dispatch_group_enter(dg);
    dc->dc_data = dg;
    _dispatch_continuation_async(dq, dc);
}

这段代码中,调用了 dispatch_group_enter(dg) 方法进行标记,最终都会和 dispatch_async() 走到同样的方法里 _dispatch_continuation_invoke_inline()。在里面判断类型为 group,执行 task,执行结束后调用 dispatch_group_leave((dispatch_group_t)dou),和之前的 enter 对应。

以上是 Dispatch Queues 内容的介绍,我们平时使用 GCD 的过程中,60% 都是使用的以上内容。

2. Dispatch Block

在 iOS 8 中,Apple 为我们提供了新的 API,Dispatch Block 相关。虽然之前我们可以向 dispatch 传递 block 参数,作为任务,但是这里和之前的不一样。之前经常说,使用 NSOperation 创建的任务可以 cancel,使用 GCD 不可以。但是在 iOS 8 之后,可以 cancel 任务了。

基本使用
    - (void)dispatchBlockTest {
        // 不指定优先级
        dispatch_block_t dsBlock = dispatch_block_create(0, ^{
            NSLog(@"test block");
        });
        
        // 指定优先级
        dispatch_block_t dsQosBlock = dispatch_block_create_with_qos_class(0, QOS_CLASS_USER_INITIATED, -1, ^{
            NSLog(@"test block");
        });
        
        dispatch_async(dispatch_get_main_queue(), dsBlock);
        dispatch_async(dispatch_get_main_queue(), dsQosBlock);
        
        // 直接创建并执行
        dispatch_block_perform(0, ^{
             NSLog(@"test block");
        });
}
    - (void)dispatchBlockTest {
    dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
    dispatch_block_t dsBlock = dispatch_block_create(0, ^{
        NSLog(@"test block");
    });
    dispatch_async(queue, dsBlock);
    // 等到 block 执行完
    dispatch_block_wait(dsBlock, DISPATCH_TIME_FOREVER);
    NSLog(@"block was finished");
}
    - (void)dispatchBlockTest {
    dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
    dispatch_block_t dsBlock = dispatch_block_create(0, ^{
        NSLog(@"test block");
    });
    dispatch_async(queue, dsBlock);
    // block 执行完收到通知
    dispatch_block_notify(dsBlock, queue, ^{
        NSLog(@"block was finished,do other thing");
    });
     NSLog(@"execute first");
}
    - (void)dispatchBlockTest {
    dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
    dispatch_block_t dsBlock1 = dispatch_block_create(0, ^{
        NSLog(@"test block1");
    });
    dispatch_block_t dsBlock2 = dispatch_block_create(0, ^{
        NSLog(@"test block2");
    });
    dispatch_async(queue, dsBlock1);
    dispatch_async(queue, dsBlock2);
    
    // 第二个 block 将会被 cancel,不执行
    dispatch_block_cancel(dsBlock2);
}

3. Dispatch Barriers

Dispatch Barriers 可以理解为调度屏障,常用于多线程并发读写操作。例如:

@interface ViewController ()
@property (nonatomic, strong) dispatch_queue_t imageQueue;
@property (nonatomic, strong) NSMutableArray *imageArray;
@end

@implementation ViewController

- (void)viewDidLoad {
    [super viewDidLoad];
    
    self.imageQueue = dispatch_queue_create("com.bool.image", DISPATCH_QUEUE_CONCURRENT);
    self.imageArray = [NSMutableArray array];
}

/** 保证写入时不会有其他操作,写完之后到主线程更新 UI */
- (void)addImage:(UIImage *)image {
    dispatch_barrier_async(self.imageQueue, ^{
        [self.imageArray addObject:image];
        dispatch_async(dispatch_get_main_queue(), ^{
            // update UI
        });
    });
}

/** 这里的 dispatch_sync 起到了 lock 的作用 */
- (NSArray <UIImage *> *)images {
    __block NSArray *imagesArray = nil;
    dispatch_sync(self.imageQueue, ^{
        imagesArray = [self.imageArray mutableCopy];
    });
    return imagesArray;
}
@end

转化成图可能好理解一些:

Dispatch_Barrier

dispatch_barrier_async() 的原理和 dispatch_async() 差不多,只不过设置的 flags 不一样:

void
dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    // 在 dispatch_async() 中只设置了 DISPATCH_OBJ_CONSUME_BIT
    uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;

    _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
    _dispatch_continuation_push(dq, dc);
}

后面都是 push 到队列中,然后,获取任务时一个死循环,在从队列中获取任务一个一个执行,如果判断 flag 为 barrier,终止循环,则单独执行这个任务。它后面的任务放入一个队列,等它执行完了再开始执行。

DISPATCH_ALWAYS_INLINE
static dispatch_queue_wakeup_target_t
_dispatch_queue_drain(dispatch_queue_t dq, dispatch_invoke_context_t dic,
        dispatch_invoke_flags_t flags, uint64_t *owned_ptr, bool serial_drain)
{
    ...
    
    for (;;) {
        ...
first_iteration:
        dq_state = os_atomic_load(&dq->dq_state, relaxed);
        if (unlikely(_dq_state_is_suspended(dq_state))) {
            break;
        }
        if (unlikely(orig_tq != dq->do_targetq)) {
            break;
        }

        if (serial_drain || _dispatch_object_is_barrier(dc)) {
            if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) {
                if (!_dispatch_queue_try_upgrade_full_width(dq, owned)) {
                    goto out_with_no_width;
                }
                owned = DISPATCH_QUEUE_IN_BARRIER;
            }
            next_dc = _dispatch_queue_next(dq, dc);
            if (_dispatch_object_is_sync_waiter(dc)) {
                owned = 0;
                dic->dic_deferred = dc;
                goto out_with_deferred;
            }
        } else {
            if (owned == DISPATCH_QUEUE_IN_BARRIER) {
                // we just ran barrier work items, we have to make their
                // effect visible to other sync work items on other threads
                // that may start coming in after this point, hence the
                // release barrier
                os_atomic_xor2o(dq, dq_state, owned, release);
                owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
            } else if (unlikely(owned == 0)) {
                if (_dispatch_object_is_sync_waiter(dc)) {
                    // sync "readers" don't observe the limit
                    _dispatch_queue_reserve_sync_width(dq);
                } else if (!_dispatch_queue_try_acquire_async(dq)) {
                    goto out_with_no_width;
                }
                owned = DISPATCH_QUEUE_WIDTH_INTERVAL;
            } 
            
            next_dc = _dispatch_queue_next(dq, dc);
            if (_dispatch_object_is_sync_waiter(dc)) {
                owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
                _dispatch_sync_waiter_redirect_or_wake(dq,
                        DISPATCH_SYNC_WAITER_NO_UNLOCK, dc);
                continue;
            }
            
            ...
    }

4. Dispatch Source

关于 dispatch_source 我们使用的少之又少,他是 BSD 系统内核功能的包装,经常用来监测某些事件发生。例如监测断点的使用和取消。[这里][https://developer.apple.com/documentation/dispatch/dispatch_source_type_constants?language=objc] 介绍了可以监测的事件:

例如我们可以通过下面代码,来监测断点的使用和取消:

@interface ViewController ()
@property (nonatomic, strong) dispatch_source_t signalSource;
@property (nonatomic, assign) dispatch_once_t signalOnceToken;
@end

@implementation ViewController

- (void)viewDidLoad {
    dispatch_once(&_signalOnceToken, ^{
        dispatch_queue_t queue = dispatch_get_main_queue();
        self.signalSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_SIGNAL, SIGSTOP, 0, queue);
        
        if (self.signalSource) {
            dispatch_source_set_event_handler(self.signalSource, ^{
                // 点击一下断点,再取消断点,便会执行这里。
                NSLog(@"debug test");
            });
            dispatch_resume(self.signalSource);
        }
    });
}

还有 diapatch_after() 就是依赖 dispatch_source() 来实现的。我们可以自己实现一个类似的定时器:

- (void)customTimer {
    dispatch_source_t timerSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, DISPATCH_TARGET_QUEUE_DEFAULT);
    dispatch_source_set_timer(timerSource, dispatch_time(DISPATCH_TIME_NOW, 5.0 * NSEC_PER_SEC), 2.0 * NSEC_PER_SEC, 5);
    dispatch_source_set_event_handler(timerSource, ^{
        NSLog(@"dispatch source timer");
    });
    
    self.signalSource = timerSource;
    dispatch_resume(self.signalSource);
}
基本原理

使用 dispatch_source 时,大致过程是这样的:我们创建一个 source,然后加到队列中,并调用 dispatch_resume() 方法,便会从队列中唤起 source,执行对应的 block。下面是一个详细的流程图,我们结合这张图来说一下:

Dispatch_Source
dispatch_source_t
dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle,
        unsigned long mask, dispatch_queue_t dq)
{
    dispatch_source_refs_t dr;
    dispatch_source_t ds;

    dr = dux_create(dst, handle, mask)._dr;
    if (unlikely(!dr)) {
        return DISPATCH_BAD_INPUT;
    }
    
    // 申请内存空间
    ds = _dispatch_object_alloc(DISPATCH_VTABLE(source),
            sizeof(struct dispatch_source_s));
    // 初始化一个队列,然后配置参数,完全被当做一个 queue 来处理
    _dispatch_queue_init(ds->_as_dq, DQF_LEGACY, 1,
            DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER);
    ds->dq_label = "source";
    ds->do_ref_cnt++; // the reference the manager queue holds
    ds->ds_refs = dr;
    dr->du_owner_wref = _dispatch_ptr2wref(ds);

    if (slowpath(!dq)) {
        dq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
    } else {
        _dispatch_retain((dispatch_queue_t _Nonnull)dq);
    }
    ds->do_targetq = dq;
    if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_INTERVAL)) {
        _dispatch_source_set_interval(ds, handle);
    }
    _dispatch_object_debug(ds, "%s", __func__);
    return ds;
}
    void
dispatch_resume(dispatch_object_t dou)
{
    DISPATCH_OBJECT_TFB(_dispatch_objc_resume, dou);
    if (dx_vtable(dou._do)->do_suspend) {
        dx_vtable(dou._do)->do_resume(dou._do, false);
    }
}
    DISPATCH_ALWAYS_INLINE
static inline dispatch_queue_wakeup_target_t
_dispatch_source_invoke2(dispatch_object_t dou, dispatch_invoke_context_t dic,
        dispatch_invoke_flags_t flags, uint64_t *owned)
{
    dispatch_source_t ds = dou._ds;
    dispatch_queue_wakeup_target_t retq = DISPATCH_QUEUE_WAKEUP_NONE;
    // 获取当前 queue
    dispatch_queue_t dq = _dispatch_queue_get_current();
    dispatch_source_refs_t dr = ds->ds_refs;
    dispatch_queue_flags_t dqf;

    ...
    
    // timer 事件处理
    if (dr->du_is_timer &&
            os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) {
        dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
        if (!(dqf & (DSF_CANCELED | DQF_RELEASED))) {
            // timer has to be configured on the kevent queue
            if (dq != dkq) {
                return dkq;
            }
            _dispatch_source_timer_configure(ds);
        }
    }

    // 是否安装 source
    if (!ds->ds_is_installed) {
        // The source needs to be installed on the kevent queue.
        if (dq != dkq) {
            return dkq;
        }
        _dispatch_source_install(ds, _dispatch_get_wlh(),
                _dispatch_get_basepri());
    }

    // 是否暂停,因为之前判断过,一般不可能走到这里
    if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
        // Source suspended by an item drained from the source queue.
        return ds->do_targetq;
    }

    // 是否在
    if (_dispatch_source_get_registration_handler(dr)) {
        // The source has been registered and the registration handler needs
        // to be delivered on the target queue.
        if (dq != ds->do_targetq) {
            return ds->do_targetq;
        }
        // clears ds_registration_handler
        _dispatch_source_registration_callout(ds, dq, flags);
    }

    ...
        
    if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) &&
            os_atomic_load2o(ds, ds_pending_data, relaxed)) {
        // 有些 source 还有未完成的数据,需要通过目标队列上的回调进行传送;有些 source 则需要切换到管理队列上去。
        if (dq == ds->do_targetq) {
            _dispatch_source_latch_and_call(ds, dq, flags);
            dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
            prevent_starvation = dq->do_targetq ||
                    !(dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT);
            if (prevent_starvation &&
                    os_atomic_load2o(ds, ds_pending_data, relaxed)) {
                retq = ds->do_targetq;
            }
        } else {
            return ds->do_targetq;
        }
    }

    if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !(dqf & DSF_DEFERRED_DELETE)) {
        // 已经被取消的 source 需要从管理队列中卸载。卸载完成后,取消的 handler 需要交付到目标队列。
        if (!(dqf & DSF_DELETED)) {
            if (dr->du_is_timer && !(dqf & DSF_ARMED)) {
                // timers can cheat if not armed because there's nothing left
                // to do on the manager queue and unregistration can happen
                // on the regular target queue
            } else if (dq != dkq) {
                return dkq;
            }
            _dispatch_source_refs_unregister(ds, 0);
            dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
            if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
                if (!(dqf & DSF_ARMED)) {
                    goto unregister_event;
                }
                // we need to wait for the EV_DELETE
                return retq ? retq : DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;
            }
        }
        if (dq != ds->do_targetq && (_dispatch_source_get_event_handler(dr) ||
                _dispatch_source_get_cancel_handler(dr) ||
                _dispatch_source_get_registration_handler(dr))) {
            retq = ds->do_targetq;
        } else {
            _dispatch_source_cancel_callout(ds, dq, flags);
            dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
        }
        prevent_starvation = false;
    }

    if (_dispatch_unote_needs_rearm(dr) &&
            !(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) {
        // 需要在管理队列进行 rearm 的
        if (dq != dkq) {
            return dkq;
        }
        if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
            // 如果我们可以直接注销,不需要 resume
            goto unregister_event;
        }
        if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
            // 如果 source 已经暂停,不需要在管理队列 rearm
            return ds->do_targetq;
        }
        if (prevent_starvation && dr->du_wlh == DISPATCH_WLH_ANON) {
            return ds->do_targetq;
        }
        if (unlikely(!_dispatch_source_refs_resume(ds))) {
            goto unregister_event;
        }
        if (!prevent_starvation && _dispatch_wlh_should_poll_unote(dr)) {
            _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
        }
    }
    return retq;
}

5. Dispatch I/O

我们可以使用 Dispatch I/O 快速读取一些文件,例如这样 :

- (void)readFile {
    NSString *filePath = @"/.../青花瓷.m";
    dispatch_queue_t queue = dispatch_queue_create("com.bool.readfile", DISPATCH_QUEUE_SERIAL);
    dispatch_fd_t fd = open(filePath.UTF8String, O_RDONLY,0);
    dispatch_io_t fileChannel = dispatch_io_create(DISPATCH_IO_STREAM, fd, queue, ^(int error) {
        close(fd);
    });
    
    NSMutableData *fileData = [NSMutableData new];
    dispatch_io_set_low_water(fileChannel, SIZE_MAX);
    dispatch_io_read(fileChannel, 0, SIZE_MAX, queue, ^(bool done, dispatch_data_t  _Nullable data, int error) {
        if (error == 0 && dispatch_data_get_size(data) > 0) {
            [fileData appendData:(NSData *)data];
        }
        
        if (done) {
            NSString *str = [[NSString alloc] initWithData:fileData encoding:NSUTF8StringEncoding];
            NSLog(@"read file completed, string is :\n %@",str);
        }
    });
}

输出结果:

ConcurrencyTest[41479:5357296] read file completed, string is :
 天青色等烟雨 而我在等你
月色被打捞起 晕开了结局

如果读取大文件,我们可以进行切片读取,将文件分割多个片,放在异步线程中并发执行,这样会比较快一些。

关于源码,简单看了一下,调度逻辑和之前的任务类似。然后读写操作,是调用的一些底层接口实现,这里就偷懒一下不详细说了。使用 Dispatch I/O,多数情况下是为了并发读取一个大文件,提高读取速度。

6. Other

上面已经讲了概览图中的大部分东西,还有一些未讲述,这里简单描述一下:

以上为 GCD 相关知识,这次使用的源码版本为最新版本 —— 912.30.4.tar.gz,和之前看的版本代码差距很大,因为代码量的增加,新版本代码比较乱,不过基本原理还是差不多的。曾经我一度认为,最上面的是最新版本…

上一篇下一篇

猜你喜欢

热点阅读