二十一、GCD底层
单例
单例(dispatch_once
),我们溯源最后找到了
#define dispatch_once _dispatch_once
void
_dispatch_once(dispatch_once_t *predicate,
DISPATCH_NOESCAPE dispatch_block_t block)
{
if (DISPATCH_EXPECT(*predicate, ~0l) != ~0l) {
dispatch_once(predicate, block);
} else {
dispatch_compiler_barrier();
}
DISPATCH_COMPILER_CAN_ASSUME(*predicate == ~0l);
}
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
if (likely(v == DLOCK_ONCE_DONE)) {
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
#endif
#endif
if (_dispatch_once_gate_tryenter(l)) {
return _dispatch_once_callout(l, ctxt, func);
}
return _dispatch_once_wait(l);
}
这里的l
来源于static
静态变量,所以有唯一性,如果发现 DLOCK_ONCE_DONE
处理过就会直接return
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
在这里就尝试对l->dgo_once
进行比较,然后进行解锁,解锁完之后,没问题就会进行_dispatch_once_callout
调用,多线程时候也只会执行一次后进行加锁,剩下的进行等待
static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
dispatch_function_t func)
{
_dispatch_client_callout(ctxt, func);
_dispatch_once_gate_broadcast(l);
}
在这里就是调用完block之后进行广播
static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
dispatch_lock value_self = _dispatch_lock_value_for_self();
uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
v = _dispatch_once_mark_quiescing(l);
#else
v = _dispatch_once_mark_done(l);
#endif
if (likely((dispatch_lock)v == value_self)) return;
_dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}
static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}
对比完之后进行DLOCK_ONCE_DONE
,下次在过来的时候if (likely(v == DLOCK_ONCE_DONE)) { return; }
发现之后直接return。
dispatch_async
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
dispatch_qos_t qos;
// 任务包装器 - 接受 - 保存 - 函数式
// 保存 block 到属性里
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
_dispatch_continuation_init
在这里对qos进行初始化
DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, dispatch_block_t work,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
void *ctxt = _dispatch_Block_copy(work);
dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
if (unlikely(_dispatch_block_has_private_data(work))) {
dc->dc_flags = dc_flags;
dc->dc_ctxt = ctxt;
// will initialize all fields but requires dc_flags & dc_ctxt to be set
return _dispatch_continuation_init_slow(dc, dqu, flags);
}
dispatch_function_t func = _dispatch_Block_invoke(work);
if (dc_flags & DC_FLAG_CONSUME) {
func = _dispatch_call_block_and_release;
}
return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}
在这里把block封装了进去
DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
pthread_priority_t pp = 0;
dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;
dc->dc_func = f;
dc->dc_ctxt = ctxt;
// in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority
// should not be propagated, only taken from the handler if it has one
if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
pp = _dispatch_priority_propagate();
}
_dispatch_continuation_voucher_set(dc, flags);
return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}
_dispatch_continuation_async
上面对信息进行了,下面这不就是创建线程和函数执行了
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
因为是从并发队列里找过来的,所以找queue_concurrent
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,
);
这里就可以直接去找_dispatch_lane_concurrent_push
DISPATCH_NOINLINE
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
// <rdar://problem/24738102&24743140> reserving non barrier width
// doesn't fail if only the ENQUEUED bit is set (unlike its barrier
// width equivalent), so we have to check that this thread hasn't
// enqueued anything ahead of this call or we can break ordering
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&//判断是否等待
!_dispatch_object_is_barrier(dou) &&//栅栏函数
_dispatch_queue_try_acquire_async(dq)) {//尝试获取
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
_dispatch_lane_push(dq, dou, qos);
}
DISPATCH_NOINLINE
static void
_dispatch_continuation_redirect_push(dispatch_lane_t dl,
dispatch_object_t dou, dispatch_qos_t qos)
{
if (likely(!_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatch_async_redirect_wrap(dl, dou);
} else if (!dou._dc->dc_ctxt) {
// find first queue in descending target queue order that has
// an autorelease frequency set, and use that as the frequency for
// this continuation.
dou._dc->dc_ctxt = (void *)
(uintptr_t)_dispatch_queue_autorelease_frequency(dl);
}
dispatch_queue_t dq = dl->do_targetq;
if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
dx_push(dq, dou, qos);
}
之前dq->do_targetq = tq;
,所以在这里的dx_push
其实就是向父级去寻找。
我们添加符号点断发现走的是_dispatch_root_queue_push
DISPATCH_NOINLINE
void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
dispatch_qos_t qos)
{
...
_dispatch_root_queue_push_inline(rq, dou, dou, 1);
}
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
dispatch_object_t _head, dispatch_object_t _tail, int n)
{
struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
return _dispatch_root_queue_poke(dq, n, 0);
}
}
DISPATCH_NOINLINE
void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
...
return _dispatch_root_queue_poke_slow(dq, n, floor);
}
DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
int remaining = n;
int r = ENOSYS;
//root_queue初始化,单例
_dispatch_root_queues_init();
//调试,是否有异常
_dispatch_debug_root_queue(dq, __func__);
//跟踪运行时事件
_dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)//如果是GLOBAL_ROOT_TYPE
#endif
{
_dispatch_root_queue_debug("requesting new worker thread for global "
"queue: %p", dq);
r = _pthread_workqueue_addthreads(remaining,//添加线程
_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
(void)dispatch_assume_zero(r);
return;
}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL//如果是线程池
...
int can_request, t_count;
// seq_cst with atomic store to tail <rdar://problem/16932833>
t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);//线程池大小
do {
//计算可以请求数量
can_request = t_count < floor ? 0 : t_count - floor;
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
remaining, can_request);
os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
remaining = can_request;
}
if (remaining == 0) {//根队列的pthread池已满
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
} while (!os_atomic_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire));
#if !defined(_WIN32)
...
#endif
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
#else // defined(_WIN32)
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (unlikely(dq == &_dispatch_mgr_root_queue)) {
_dispatch_mgr_root_queue_init();
}
#endif
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
#if DISPATCH_DEBUG
unsigned dwStackSize = 0;
#else
unsigned dwStackSize = 64 * 1024;
#endif
uintptr_t hThread = 0;
while (!(hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {//创建线程
if (errno != EAGAIN) {
(void)dispatch_assume(hThread);
}
_dispatch_temporary_resource_shortage();
}
if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
(void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
}
CloseHandle((HANDLE)hThread);
} while (--remaining);
#endif // defined(_WIN32)
#else
(void)floor;
#endif // DISPATCH_USE_PTHREAD_POOL
}
在这里获取线程池数量,开辟新线程
static inline void
_dispatch_root_queues_init(void)
{
dispatch_once_f(&_dispatch_root_queues_pred, NULL,
_dispatch_root_queues_init_once);
}
这里可以看到root_queues
初始化是个单例
static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
_dispatch_fork_becomes_unsafe();
#if DISPATCH_USE_INTERNAL_WORKQUEUE
size_t i;
for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
_dispatch_root_queue_init_pthread_pool(&_dispatch_root_queues[i], 0,
_dispatch_root_queues[i].dq_priority);
}
#else
int wq_supported = _pthread_workqueue_supported();
int r = ENOTSUP;
if (!(wq_supported & WORKQ_FEATURE_MAINTENANCE)) {
DISPATCH_INTERNAL_CRASH(wq_supported,
"QoS Maintenance support required");
}
#if DISPATCH_USE_KEVENT_SETUP
struct pthread_workqueue_config cfg = {
.version = PTHREAD_WORKQUEUE_CONFIG_VERSION,
.flags = 0,
.workq_cb = 0,
.kevent_cb = 0,
.workloop_cb = 0,
.queue_serialno_offs = dispatch_queue_offsets.dqo_serialnum,
#if PTHREAD_WORKQUEUE_CONFIG_VERSION >= 2
.queue_label_offs = dispatch_queue_offsets.dqo_label,
#endif
};
#endif
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunreachable-code"
if (unlikely(!_dispatch_kevent_workqueue_enabled)) {//工作队列的处理
#if DISPATCH_USE_KEVENT_SETUP
cfg.workq_cb = _dispatch_worker_thread2;
r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
r = _pthread_workqueue_init(_dispatch_worker_thread2,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
#if DISPATCH_USE_KEVENT_WORKLOOP
} else if (wq_supported & WORKQ_FEATURE_WORKLOOP) {//事务循环
#if DISPATCH_USE_KEVENT_SETUP
cfg.workq_cb = _dispatch_worker_thread2;
cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
cfg.workloop_cb = (pthread_workqueue_function_workloop_t) _dispatch_workloop_worker_thread;
r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
r = _pthread_workqueue_init_with_workloop(_dispatch_worker_thread2,
(pthread_workqueue_function_kevent_t)
_dispatch_kevent_worker_thread,
(pthread_workqueue_function_workloop_t)
_dispatch_workloop_worker_thread,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
#endif // DISPATCH_USE_KEVENT_WORKLOOP
#if DISPATCH_USE_KEVENT_WORKQUEUE
} else if (wq_supported & WORKQ_FEATURE_KEVENT) {
#if DISPATCH_USE_KEVENT_SETUP
cfg.workq_cb = _dispatch_worker_thread2;
cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread2,
(pthread_workqueue_function_kevent_t)
_dispatch_kevent_worker_thread,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
#endif
} else {
DISPATCH_INTERNAL_CRASH(wq_supported, "Missing Kevent WORKQ support");
}
#pragma clang diagnostic pop
if (r != 0) {
DISPATCH_INTERNAL_CRASH((r << 16) | wq_supported,
"Root queue initialization failed");
}
#endif // DISPATCH_USE_INTERNAL_WORKQUEUE
}
在这里根据内核的不同,初始化函数
栅栏函数
/**
栅栏函数的演示说明:dispatch_barrier_sync/dispatch_barrier_async
*/
- (void)demo2{
dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
/* 1.异步函数 */
dispatch_async(concurrentQueue, ^{
sleep(1);
NSLog(@"123");
});
/* 2. 栅栏函数 */
dispatch_barrier_async(concurrentQueue, ^{
NSLog(@"---------------------%@------------------------",[NSThread currentThread]);
});
/* 3. 异步函数 */
dispatch_async(concurrentQueue, ^{
NSLog(@"加载那么多,喘口气!!!");
});
NSLog(@"**********起来干!!");
}
这里的barrier
拦截的是队列,async
拦截的是线程,是用sync
就会拦截主队列
dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
// dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);
for (int i = 0; i<500; i++) {
dispatch_async(concurrentQueue, ^{
NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)];
NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil];
NSData *data = [NSData dataWithContentsOfURL:url];
UIImage *image = [UIImage imageWithData:data];
// @synchronized (self) {
[self.mArray addObject:image];
// }
});
}
换多线程中对数据的新值retain,旧址release,这个线程如果还没retain,上个线程就已经release,就会添加个空值。如果这里不做任何处理,就会崩溃。
如果这里换成全局并发,也会崩溃,因为不止这里使用全局并发,还有很多系统方法使用了,全部进行了拦截,就会崩溃
dispatch_sync
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
uintptr_t dc_flags)
{
_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {//串行
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
...
}
在这里我们可以看到如果是串行队列的话,就会barrier
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
//获取线程id
dispatch_tid tid = _dispatch_tid_self();
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// The more correct thing to do would be to merge the qos of the thread
// that just acquired the barrier lock into the queue state.
//
// However this is too expensive for the fast path, so skip doing it.
// The chosen tradeoff is that if an enqueue on a lower priority thread
// contends with this fast path, this thread may receive a useless override.
//
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
//死锁
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
//准备操作
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
static void
_dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq,
void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
//在这里就会对block的执行
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
_dispatch_trace_item_complete(dc);
if (unlikely(dq->dq_items_tail || dq->dq_width > 1)) {
return _dispatch_lane_barrier_complete(dq, 0, 0);
}
// Presence of any of these bits requires more work that only
// _dispatch_*_barrier_complete() handles properly
//
// Note: testing for RECEIVED_OVERRIDE or RECEIVED_SYNC_WAIT without
// checking the role is sloppy, but is a super fast check, and neither of
// these bits should be set if the lock was never contended/discovered.
const uint64_t fail_unlock_mask = DISPATCH_QUEUE_SUSPEND_BITS_MASK |
DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_DIRTY |
DISPATCH_QUEUE_RECEIVED_OVERRIDE | DISPATCH_QUEUE_SYNC_TRANSFER |
DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;
uint64_t old_state, new_state;
// similar to _dispatch_queue_drain_try_unlock
//对状态的释放
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
new_state = old_state - DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
if (unlikely(old_state & fail_unlock_mask)) {
os_atomic_rmw_loop_give_up({
//告诉已经执行完毕了
return _dispatch_lane_barrier_complete(dq, 0, 0);
});
}
});
if (_dq_state_is_base_wlh(old_state)) {
_dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
}
}
这是正常情况下,如果线程堵塞呢
static inline bool
_dispatch_queue_try_acquire_barrier_sync(dispatch_queue_class_t dq, uint32_t tid)
{
return _dispatch_queue_try_acquire_barrier_sync_and_suspend(dq._dl, tid, 0);
}
static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
uint32_t tid, uint64_t suspend_count)
{
uint64_t init = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
_dispatch_lock_value_from_tid(tid) |
(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
uint64_t old_state, new_state;
return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
if (old_state != (init | role)) {
os_atomic_rmw_loop_give_up(break);
}
new_state = value | role;
});
}
这里也调用了os_atomic_rmw_loop2o
,如果状态判断成功,直接放弃掉
堵塞线程之后就会进行下一步
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}
pthread_priority_t pp = _dispatch_get_priority();
//在这里对block进行封装
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),//每个队列会绑定在一个线程里
};
//把dsc干到队列里去
_dispatch_trace_item_push(top_dq, &dsc);
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
if (dsc.dsc_func == NULL) {
// dsc_func being cleared means that the block ran on another thread ie.
// case (2) as listed in _dispatch_async_and_wait_f_slow.
dispatch_queue_t stop_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
}
_dispatch_introspection_sync_begin(top_dq);
_dispatch_trace_item_pop(top_dq, &dsc);
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
uint64_t dq_state = _dispatch_wait_prepare(dq);
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
...
}
判断当前队列是否是一个要等待的队列,再将当前任务依赖的队列和状态进行匹配
static inline bool
_dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
{
return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
}
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// equivalent to _dispatch_lock_owner(lock_value) == tid
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
#define DLOCK_OWNER_MASK ((dispatch_lock)0xfffffffc)
异或操作,只要这两个相同就为0,DLOCK_OWNER_MASK
永远不会为0,这个时候上面就会进入DISPATCH_CLIENT_CRASH
产生死锁
信号量
dispatch_semaphore_create
dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
dispatch_semaphore_t dsema;
// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) {
return DISPATCH_BAD_INPUT;
}
dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
sizeof(struct dispatch_semaphore_s));
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_default_queue(false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
dsema->dsema_orig = value;
return dsema;
}
这里就是进行了 dispatch_semaphore_t
创建
dispatch_semaphore_signal
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)) {
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);//进去强等待
}
os_atomic_inc2o(dsema, dsema_value, release) =atomic_fetch_add_explicit(dsema->dsema_value, 1, \
memory_order_release)
对这个进行原子性操作,就是dsema->dsema_value + 1
dispatch_semaphore_wait
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
同上,进行-1操作
static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}
- default:主要调用了_dispatch_sema4_timedwait方法,这个方法主要是判断当前的操作是否超过指定的超时时间。
- DISPATCH_TIME_NOW中的while是一定会执行的,如果不满足条件,已经在之前的操作跳出了,不会执行到此。if操作调用os_atomic_cmpxchgvw2o,会将value进行+1,跳出阻塞,并返回_DSEMA4_TIMEOUT超时
- DISPATCH_TIME_FOREVER中即调用_dispatch_sema4_wait,表示会一直阻塞,知道等到single加1变为0为止,跳出阻塞
将信号量value进行保存为两份
dsema_value
和dsema_orig
,一个是当前的value,一个是记录初始 value。wait进行减1的操作,single进行加1的操作。如果为0或者小于0 并随后调用 wait 方法,线程将被阻塞直到别的线程调用了 signal 方法
调度组dispatch_group_t
dispatch_group_t的使用
dispatch_group_t group = dispatch_group_create();
dispatch_group_enter(group); // 0-1
dispatch_async(dispatch_get_global_queue(0, 0), ^{
sleep(1);
NSLog(@"123");
dispatch_group_leave(group);
});
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"他们回来了 我准备主线程更新UI");
});
这里简单的调用了dispatch_group_t
dispatch_group_t group = dispatch_group_create();
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"他们回来了 我准备主线程更新UI");
});
dispatch_group_enter(group); // 0-1
dispatch_async(dispatch_get_global_queue(0, 0), ^{
sleep(1);
NSLog(@"123");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(dispatch_get_global_queue(0, 0), ^{
// sleep(1);
NSLog(@"4567");
dispatch_group_leave(group);
});
NSLog(@"主线程事务正常执行");
·······
主线程事务正常执行
4567
他们回来了 我准备主线程更新UI
123
-
dispatch_group_t
只要有成对dispatch_group_enter
和dispatch_group_leave
就会执行就会执行。 - 如果
dispatch_group_enter
多执行一次就会一直等待。 - 如果
dispatch_group_leave
多执行一次就会崩溃。
dispatch_group_async(group, dispatch_get_global_queue(0, 0), ^{
NSLog(@"890");
});
dispatch_group_async
相当于自动执行了dispatch_group_enter
和dispatch_group_leave
。
dispatch_group_create
dispatch_group_t
dispatch_group_create(void)
{
return _dispatch_group_create_with_count(0);
}
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
sizeof(struct dispatch_group_s));
dg->do_next = DISPATCH_OBJECT_LISTLESS;
dg->do_targetq = _dispatch_get_default_queue(false);
if (n) {
os_atomic_store2o(dg, dg_bits,
(uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
}
return dg;
}
这里就是简单的初始化,通过os_atomic_store2o宏定义,内部维护了一个value的值
dispatch_group_leave
void
dispatch_group_leave(dispatch_group_t dg)
{
// The value is incremented on a 64bits wide atomic so that the carry for
// the -1 -> 0 transition increments the generation atomically.
uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
DISPATCH_GROUP_VALUE_INTERVAL, release);
uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
old_state += DISPATCH_GROUP_VALUE_INTERVAL;
do {
new_state = old_state;
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
} else {
// If the group was entered again since the atomic_add above,
// we can't clear the waiters bit anymore as we don't know for
// which generation the waiters are for
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
}
if (old_state == new_state) break;
} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
old_state, new_state, &old_state, relaxed)));
return _dispatch_group_wake(dg, old_state, true);
}
if (unlikely(old_value == 0)) {
DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
"Unbalanced call to dispatch_group_leave()");
}
}
os_atomic_add_orig2o
进行原子性加操作,如果old_state == new_state
,就进行唤醒操作_dispatch_group_wake
。
dispatch_group_enter
void
dispatch_group_enter(dispatch_group_t dg)
{
// The value is decremented on a 32bits wide atomic so that the carry
// for the 0 -> -1 transition is not propagated to the upper 32bits.
uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
DISPATCH_GROUP_VALUE_INTERVAL, acquire);
uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
if (unlikely(old_value == 0)) {
_dispatch_retain(dg); // <rdar://problem/22318411>
}
if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
DISPATCH_CLIENT_CRASH(old_bits,
"Too many nested calls to dispatch_group_enter()");
}
}
这里就是对dg_bits进行-1操作
dispatch_group_notify
void
dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dsn = _dispatch_continuation_alloc();
_dispatch_continuation_init(dsn, dq, db, 0, DC_FLAG_CONSUME);
_dispatch_group_notify(dg, dq, dsn);
}
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dsn)
{
uint64_t old_state, new_state;
dispatch_continuation_t prev;
dsn->dc_data = dq;
_dispatch_retain(dq);
prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
if (os_mpsc_push_was_empty(prev)) {
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
if ((uint32_t)old_state == 0) {
os_atomic_rmw_loop_give_up({
return _dispatch_group_wake(dg, new_state, false);
});
}
});
}
}
通过对os操作,获取state,最后判断结果相同,就进行唤醒操作_dispatch_group_wake
static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
dispatch_continuation_t dc, next_dc, tail;
// Snapshot before anything is notified/woken <rdar://problem/8554546>
dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
do {
dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
_dispatch_continuation_async(dsn_queue, dc,
_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
_dispatch_release(dsn_queue);
} while ((dc = next_dc));
refs++;
}
if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
_dispatch_wake_by_address(&dg->dg_gen);//地址释放
}
if (refs) _dispatch_release_n(dg, refs);//引用释放
}
这个通过循环,找到命中的队列,然后_dispatch_continuation_async
执行函数
dispatch_group_async
void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
_dispatch_continuation_group_async(dg, dq, dc, qos);
}
常规的包装处理
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc, dispatch_qos_t qos)
{
dispatch_group_enter(dg);
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
在这里执行了dispatch_group_enter
操作,然后处理异步函数
static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
struct dispatch_object_s *dou = dc->dc_data;
unsigned long type = dx_type(dou);
if (type == DISPATCH_GROUP_TYPE) {
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_trace_item_complete(dc);
// 出组
dispatch_group_leave((dispatch_group_t)dou);
} else {
DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
}
}
函数执行完毕,执行dispatch_group_leave
Dispatch_Source
- 其 CPU 负荷⾮常⼩,尽量不占⽤资源
- 联结的优势
在任⼀线程上调⽤它的的⼀个函数dispatch_source_merge_data
后,会执⾏
Dispatch Source
事先定义好的句柄(可以把句柄简单理解为⼀个 block )
这个过程叫Custom event
,⽤户事件。是dispatch source
⽀持处理的⼀种事件
句柄是⼀种指向指针的指针 它指向的就是⼀个类或者结构,它和系统有很密切的关系
HINSTANCE(实例句柄),HBITMAP(位图句柄),HDC(设备表述句柄),HICON
(图标句柄)等。这当中还有⼀个通⽤的句柄,就是HANDLE
dispatch_source_create
: 创建源
dispatch_source_set_event_handler
: 设置源事件回调
dispatch_source_merge_data
: 源事件设置数据
dispatch_source_get_data
: 获取源事件数据
dispatch_resume
: 继续
dispatch_suspend
: 挂起
dispatch_source
不依赖于runloop
而是依赖于kenel - workloop
,所以使用DISPATCH_SOURCE_TYPE_TIMER
计时器的时候回更加的准确
总结
-
dispatch_sync
将任务 block 通过 push 到队列中,然后按照 FIFO 去执行。 -
dispatch_sync
造成死锁的主要原因是堵塞的tid和现在运行的tid为同一个 -
dispatch_async
会把任务包装并保存,之后就会开辟相应线程去执行已保存的任务。 -
semaphore
主要在底层维护一个value的值,使用 signal 进行 + +1,wait进行-1。如果value的值大于或者等于0,则取消阻塞,否则根据timeout参数进行超时判断 -
dispatch_group
底层也是维护了一个 value 的值,等待 group 完成实际上就是等待value恢复初始值。而notify的作用是将所有注册的回调组装成一个链表,在 dispatch_async 完成时判断 value 是不是恢复初始值,如果是则调用dispatch_async异步执行所有注册的回调。 -
dispatch_once
通过一个静态变量来标记 block 是否已被执行,同时使用加锁确保只有一个线程能执行,执行完 block 后会唤醒其他所有等待的线程。