GCD的dispatch_group_t的原理及使用

2021-08-25  本文已影响0人  希尔罗斯沃德_董

什么是dispatch_group_t

dispatch_group_t是提交给队列用于异步调用的一组block。可以将队列的任务block进行分组,把队列任务关联到当前的组里面,然后还可以监听已关联的任务是否全部执行完成。dispatch_group_t可以通过dispatch_group_create函数创建:

dispatch_group_t group = dispatch_group_create();

dispatch_group_t的使用

dispatch_group_t的使用有两种方式,一种dispatch_group_enter() 和dispatch_group_leave()配合的方式;另一种是使用dispatch_group_async函数。不管哪种方式,都可以通过dispatch_group_notify函数来监听group里面的任务是否都已经执行完毕,或者可以通过dispatch_group_wait函数等待所有任务执行完毕。dispatch_group_wait和dispatch_group_notify的不同是,dispatch_group_notify可以指定所有任务完成之后执行的队列,它是异步的,不会阻塞当前线程;而dispatch_group_wait是同步的,会阻塞当前线程,但是dispatch_group_wait可以设置超时时间。下面就来看看dispatch_group_t使用示例。

dispatch_group_enter()&dispatch_group_leave()

这两个函数必须成对出现,这很类似信号量。dispatch_group_enter在任务添加之前调用,相当于告诉group要添加任务,而dispatch_group_leave则在任务结束的时候调用。以下是demo示例:

    dispatch_queue_t conQueue = dispatch_queue_create("com.hello-world.djx", DISPATCH_QUEUE_CONCURRENT);
    dispatch_group_t group = dispatch_group_create();
    dispatch_group_enter(group);
    dispatch_async(conQueue, ^{
        NSLog(@"1");
        dispatch_group_leave(group);
    });
    dispatch_group_enter(group);
    dispatch_async(conQueue, ^{
        NSLog(@"2");
        dispatch_group_leave(group);
    });
    dispatch_group_enter(group);
    dispatch_async(conQueue, ^{
        NSLog(@"3");
        dispatch_group_leave(group);
    });

    dispatch_group_notify(group, dispatch_get_main_queue(), ^{
        NSLog(@"任务完成,返回主线程");
    });
使用dispatch_group_async

这个函数直接实现了dispatch_group_enter和dispatch_group_leave的作用,以下是demo示例:

    dispatch_queue_t conQueue = dispatch_queue_create("com.hello-world.djx", DISPATCH_QUEUE_CONCURRENT);
    dispatch_group_t group = dispatch_group_create();
    dispatch_group_async(group, conQueue, ^{
        NSLog(@"1");
    });
    dispatch_group_async(group, conQueue, ^{
        NSLog(@"2");
    });
    dispatch_group_async(group, conQueue, ^{
        NSLog(@"3");
    });
    dispatch_group_notify(group, dispatch_get_main_queue(), ^{
        NSLog(@"任务完成,返回主线程");
    });

到这里的时候我们可能发现它跟栅栏dispatch_barrier_async函数很像,可以等某个队列的某几个任务都执行了在执行我们想要的操作。但还是group跟栅栏函数是不一样的,group不会阻塞当前队列,而且一个group不止可以关联一个队列,可以同时关联多个队列,下面我们就来看看group关联多个队列的用法。

可以关联多个队列任务、两种方式一起使用
    dispatch_queue_t conQueue = dispatch_queue_create("com.hello-world.djx", DISPATCH_QUEUE_CONCURRENT);
    dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
    dispatch_group_t group = dispatch_group_create();

    dispatch_group_enter(group);
    dispatch_async(conQueue, ^{
        NSLog(@"1");
        dispatch_group_leave(group);
    });
    dispatch_group_enter(group);
    dispatch_async(globalQueue, ^{
        NSLog(@"2");
        dispatch_group_leave(group);
    });
    dispatch_group_enter(group);
    dispatch_async(conQueue, ^{
        NSLog(@"3");
        dispatch_group_leave(group);
    });
    dispatch_group_enter(group);
    dispatch_async(globalQueue, ^{
        NSLog(@"4");
        dispatch_group_leave(group);
    });
    dispatch_group_enter(group);
    dispatch_async(conQueue, ^{
        NSLog(@"5");
        dispatch_group_leave(group);
    });
    dispatch_group_enter(group);
    dispatch_async(globalQueue, ^{
        NSLog(@"6");
        dispatch_group_leave(group);
    });
    
    dispatch_group_async(group, conQueue, ^{
        NSLog(@"7");
    });

    dispatch_group_async(group, globalQueue, ^{
        NSLog(@"8");
    });
    
    dispatch_group_notify(group, dispatch_get_main_queue(), ^{
        NSLog(@"任务完成,返回主线程");
    });
    NSLog(@"开始等待");
    dispatch_time_t time = dispatch_walltime(NULL, 5* NSEC_PER_SEC);
    dispatch_group_wait(group, time);
    NSLog(@"等待结束")

从demo中可以看出,group可以关联多个队列conQueque和globalQueue,而且group的两种用法dispatch_group_enter()&dispatch_group_leave()和dispatch_group_async是可以一起使用的,而且dispatch_group_notify和dispatch_group_wait并不是互斥的,他们也一样可以同时使用。这个给了我们做业务有着很多选择。

Group底层原理

这里先提出几个问题:为什么dispatch_group_enter和dispatch_group_leave要成对出现?它们是怎么关联的?以及dispatch_group_async为什么不需要enter和leave?dispatch_group_notify是如何知道所有任务都执行完成了的?
接下来通过源码分析来解释这些问题。

dispatch_group_create源码解析
dispatch_group_t
dispatch_group_create(void)
{
    return _dispatch_group_create_with_count(0);
}
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
    dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
            sizeof(struct dispatch_group_s));
    dg->do_next = DISPATCH_OBJECT_LISTLESS;
    dg->do_targetq = _dispatch_get_default_queue(false);
    if (n) {
        os_atomic_store2o(dg, dg_bits,
                (uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
        os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
    }
    return dg;
}

DISPATCH_VTABLE能够获取group的类信息。由于初始值n为0,所以这里的os_atomic_store2o不会被调用,所以这会dg_bits为0。dg_bits用于记录enter和leave的信息。

dispatch_group_enter源码解析

dispatch_group_enter函数:

void
dispatch_group_enter(dispatch_group_t dg)
{
    // The value is decremented on a 32bits wide atomic so that the carry
    // for the 0 -> -1 transition is not propagated to the upper 32bits.
    uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
            DISPATCH_GROUP_VALUE_INTERVAL, acquire);
    uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
    if (unlikely(old_value == 0)) {
        _dispatch_retain(dg); // <rdar://problem/22318411>
    }
    if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
        DISPATCH_CLIENT_CRASH(old_bits,
                "Too many nested calls to dispatch_group_enter()");
    }
}

os_atomic_sub_orig2o调用流程:os_atomic_sub_orig2o->os_atomic_sub_orig->_os_atomic_c11_op_orig((p), (v), m, sub, -)->atomic_fetch_sub_explicit
在enter函数里,首先会对dg->dg_bits进行减一的原子操作,用于统计enter次数。有前面创建group可知,dg->dg_bits初始值应该是为0的,所以减1之后正常情况下不会等于0,所以这里懒得分析_dispatch_retain函数了。

dispatch_group_leave源码解析

dispatch_group_leave函数:

void
dispatch_group_leave(dispatch_group_t dg)
{
    // The value is incremented on a 64bits wide atomic so that the carry for
    // the -1 -> 0 transition increments the generation atomically.
    uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
            DISPATCH_GROUP_VALUE_INTERVAL, release);
    uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);

    if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
        old_state += DISPATCH_GROUP_VALUE_INTERVAL;
        do {
            new_state = old_state;
            if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
                new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
                new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
            } else {
                // If the group was entered again since the atomic_add above,
                // we can't clear the waiters bit anymore as we don't know for
                // which generation the waiters are for
                new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
            }
            if (old_state == new_state) break;
        } while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
                old_state, new_state, &old_state, relaxed)));
        return _dispatch_group_wake(dg, old_state, true);
    }

    if (unlikely(old_value == 0)) {
        DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
                "Unbalanced call to dispatch_group_leave()");
    }
}

os_atomic_add_orig2o调用流程: os_atomic_add_orig2o->os_atomic_add_orig->_os_atomic_c11_op_orig->_os_atomic_c11_op_orig->atomic_fetch_add_explicit
实际上这里是实际上就是对dg->dg_state的原子加1操作,并返回旧值old_value。用于统计leave次数。dg_state操作完成之后会就会判断enter和leave是否已经成对,如果是就会调用_dispatch_group_wake唤醒group(dispatch_group_wait),同时去异步执行notify任务。如果old_value==0,说明leave次数调用过多,会导致崩溃。

static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
    uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>

    if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
        dispatch_continuation_t dc, next_dc, tail;

        // Snapshot before anything is notified/woken <rdar://problem/8554546>
        dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
        do {
            dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
            next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
            _dispatch_continuation_async(dsn_queue, dc,
                    _dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
            _dispatch_release(dsn_queue);
        } while ((dc = next_dc));

        refs++;
    }

    if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
        _dispatch_wake_by_address(&dg->dg_gen);
    }

    if (refs) _dispatch_release_n(dg, refs);
}

_dispatch_group_wake函数中,会循环遍历当前group里是否有notify任务,如果有则交给_dispatch_continuation_async把block提交到队列中异步调用这个block,同时会判断有没有正在等待的group任务:

if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
        _dispatch_wake_by_address(&dg->dg_gen);
    }
dispatch_group_notify源码分析
void
dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_block_t db)
{
    dispatch_continuation_t dsn = _dispatch_continuation_alloc();
    _dispatch_continuation_init(dsn, dq, db, 0, DC_FLAG_CONSUME);
    _dispatch_group_notify(dg, dq, dsn);
}
static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_continuation_t dsn)
{
    uint64_t old_state, new_state;
    dispatch_continuation_t prev;

    dsn->dc_data = dq;
    _dispatch_retain(dq);

    prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
    if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
    os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
    if (os_mpsc_push_was_empty(prev)) {
        os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
            new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
            if ((uint32_t)old_state == 0) {
                os_atomic_rmw_loop_give_up({
                    return _dispatch_group_wake(dg, new_state, false);
                });
            }
        });
    }
}

在这里会首先把当前函数的任务(block)标记位notify,因为一会所有任务执行完之后要调用。会循环(os_atomic_rmw_loop2o)监听状态,状态满足时old_state == 0执行_dispatch_group_wake去执行notify的block和唤醒group的wait。_dispatch_group_wake上面已经有分析过了。

dispatch_group_wait源码解析

dispatch_group_wait函数阻塞当前线程:

long
dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)
{
    uint64_t old_state, new_state;

    os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, relaxed, {
        if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
            os_atomic_rmw_loop_give_up_with_fence(acquire, return 0);
        }
        if (unlikely(timeout == 0)) {
            os_atomic_rmw_loop_give_up(return _DSEMA4_TIMEOUT());
        }
        new_state = old_state | DISPATCH_GROUP_HAS_WAITERS;
        if (unlikely(old_state & DISPATCH_GROUP_HAS_WAITERS)) {
            os_atomic_rmw_loop_give_up(break);
        }
    });

    return _dispatch_group_wait_slow(dg, _dg_state_gen(new_state), timeout);
}
static long
_dispatch_group_wait_slow(dispatch_group_t dg, uint32_t gen,
        dispatch_time_t timeout)
{
    for (;;) {
        int rc = _dispatch_wait_on_address(&dg->dg_gen, gen, timeout, 0);
        if (likely(gen != os_atomic_load2o(dg, dg_gen, acquire))) {
            return 0;
        }
        if (rc == ETIMEDOUT) {
            return _DSEMA4_TIMEOUT();
        }
    }
}

dispatch_group_wait等待的原理是它开启了一个循环,检查group任务相关的状态,如国状态满足就返回0,执行dispatch_group_wait后面的代码;如果超时就返回超时提醒,也会继续执行后面的代码。

dispatch_group_async源码解析

dispatch_group_async之所以能等同于dispatch_group_enter()和dispatch_group_leave()的操作,实际上它内部实现也是依赖这两个方法实现的:

void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_block_t db)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
    dispatch_qos_t qos;

    qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
    _dispatch_continuation_group_async(dg, dq, dc, qos);
}

static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_continuation_t dc, dispatch_qos_t qos)
{
    dispatch_group_enter(dg);
    dc->dc_data = dg;
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

这里是先创建一个dc,然后标记为group类型任务uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC。然后在把任务添加到队列之前调用dispatch_group_enter(dg),到这里我们其实就可以发现了,dispatch_group_async实现原理实际上就是内部也是依赖dispatch_group_enter和dispatch_group_leave函数来管理的。所以我们也很容易猜到在block任务被调用的时候,也会调用dispatch_group_leave函数来保持平衡的:

static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
    struct dispatch_object_s *dou = dc->dc_data;
    unsigned long type = dx_type(dou);
    if (type == DISPATCH_GROUP_TYPE) {
        _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
        _dispatch_trace_item_complete(dc);
        dispatch_group_leave((dispatch_group_t)dou);
    } else {
        DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
    }
}
上一篇下一篇

猜你喜欢

热点阅读