GCD的信号量的使用及其原理
什么是信号量
信号量(semaphore)是操作系统用来解决并发中的互斥和同步问题的一种方法。
信号量对于允许多个线程并发访问的资源,它是一个很好的选择。一个初始值为N的信号量允许N个线程并发访问。线程访问资源时首先获取信号量,进行如下操作:
将信号量的值减1。如果信号量的值小于0,则进入等待状态,否则继续执行。访问资源之后,线程释放信号量,将信号量的值加1。如果信号量的值不小于1,唤醒一个等待中的线程。
信号量的使用
//初始化信号量,初始值N = 2,表示最多只允许两条线程同时获得信号量
dispatch_semaphore_t semaphore = dispatch_semaphore_create(2);
// 并发队列
dispatch_queue_t conQueue = dispatch_queue_create("com.hello-world.djx", DISPATCH_QUEUE_CONCURRENT);
for (int i = 1; i <= 5; i++) {
dispatch_async(conQueue, ^{
// 线程执行dispatch_semaphore_wait时,会获取当前信号量,将信号量值减一,如果信号量值小于0,则等待dispatch_semaphore_signal唤醒线程;如果信号量值大于或等于0则开始执行任务
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
NSLog(@"开始任务:%d", i);
sleep(i);
NSLog(@"结束任务:%d", i);
//线程执行完任务之后调用dispatch_semaphore_signal,将信号量值加一,如果此时信号量值大于0,说明没有等待的线程,直接返回,如果此时信号量值小于或等于0,表次此前信号量已用尽,可能有正等待的线程,这时候会试图唤起等待的线程
dispatch_semaphore_signal(semaphore);
});
}
打印结果:
2021-08-25 15:51:47.942058+0800 GCDDemo[31306:26034698] 开始任务:2
2021-08-25 15:51:47.942087+0800 GCDDemo[31306:26034700] 开始任务:1
2021-08-25 15:51:48.945177+0800 GCDDemo[31306:26034700] 结束任务:1
2021-08-25 15:51:48.945402+0800 GCDDemo[31306:26034699] 开始任务:4
2021-08-25 15:51:49.944143+0800 GCDDemo[31306:26034698] 结束任务:2
2021-08-25 15:51:49.944354+0800 GCDDemo[31306:26034697] 开始任务:3
2021-08-25 15:51:52.947185+0800 GCDDemo[31306:26034699] 结束任务:4
2021-08-25 15:51:52.947173+0800 GCDDemo[31306:26034697] 结束任务:3
2021-08-25 15:51:52.947391+0800 GCDDemo[31306:26034696] 开始任务:5
2021-08-25 15:51:57.947824+0800 GCDDemo[31306:26034696] 结束任务:5
可以看出只能同时执行两个任务,一个任务结束之后才能唤醒其他等待的线程继续执行。信号量在这里起到控制队列并发数的作用。其实这里的初始值如果改成1的话,信号量就相当于互斥锁的作用了。
信号量的底层原理
dispatch_semaphore_create源码解读
这是创建信号量并初始化信号量的函数,参数value就是信号量的值。
dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
dispatch_semaphore_t dsema;
// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) {
return DISPATCH_BAD_INPUT;
}
dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
sizeof(struct dispatch_semaphore_s));
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_default_queue(false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
dsema->dsema_orig = value;
return dsema;
}
DISPATCH_VTABLE这里可以获取到信号量的类信息,通过_dispatch_object_alloc创建一个信号量。value值被保留到属性dsema_value和dsema_orig中(为什么这里会有两个变量来接收value?请看后面的dispose方法),后面的调用的dispatch_semaphore_wait和dispatch_semaphore_signal都会对dsema_value进行操作。_dispatch_sema4_init初始化一个信号。
dispatch_semaphore_wait源码解读
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
os_atomic_dec2o:os_atomic_dec2o
->os_atomic_sub2o
->os_atomic_sub
->_os_atomic_c11_op((p), (v), m, sub, -)
->atomic_fetch_sub_explicit
这里通过函数os_atomic_dec2o,最终调用atomic_fetch_sub_explicit对value进行减1的原子操作,然后返回当前信号量value。判断如果value大于0,说当前线程可以正常访问,直接返回0表次正常;反之,线程会调用函数_dispatch_semaphore_wait_slow进入等待被唤醒。
static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}
这时候会根据timeout参数设定的时间来处理。如果timeout传的是DISPATCH_TIME_NOW,则表示不等待,这时候会再次尝试获取信号量,如果信号量此时已然小于0,则返回超时提醒;如果传入的是DISPATCH_TIME_FOREVER,表示会一直等待被唤醒,这时候就调用函数_dispatch_sema4_wait进行等待;如果是给定的一段时间,走default逻辑,调用函数_dispatch_sema4_timedwait进入计时等待:
bool
_dispatch_sema4_timedwait(_dispatch_sema4_t *sema, dispatch_time_t timeout)
{
struct timespec _timeout;
int ret;
do {
uint64_t nsec = _dispatch_time_nanoseconds_since_epoch(timeout);
_timeout.tv_sec = (__typeof__(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
_timeout.tv_nsec = (__typeof__(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
ret = sem_timedwait(sema, &_timeout);
} while (unlikely(ret == -1 && errno == EINTR));
if (ret == -1 && errno == ETIMEDOUT) {
return true;
}
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
return false;
}
等线程被唤醒之后返回执行代码,否则知道时间耗尽返回超时提醒。
dispatch_semaphore_signal源码解读
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)) {
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}
os_atomic_inc2o:os_atomic_inc2o
->os_atomic_add2o
->os_atomic_add
->_os_atomic_c11_op((p), (v), m, add, +)
->atomic_fetch_add_explicit
通过函数os_atomic_inc2o释放资源(release),同时对信号量加1,然后返回加1之后的结果value。判断value大于0,直接返回0;如果value小于或等于0,说明加1之前的值小于0,可能存在等待的线程,然后调用函数_dispatch_semaphore_signal_slow,发送一个信号去唤起等待的线程。
long
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
_dispatch_sema4_signal(&dsema->dsema_sema, 1);
return 1;
}
_dispatch_semaphore_dispose源码解读
_dispatch_semaphore_dispose实际上就是信号量销毁时调用的方法:
void
_dispatch_semaphore_dispose(dispatch_object_t dou,
DISPATCH_UNUSED bool *allow_free)
{
dispatch_semaphore_t dsema = dou._dsema;
if (dsema->dsema_value < dsema->dsema_orig) {
DISPATCH_CLIENT_CRASH(dsema->dsema_orig - dsema->dsema_value,
"Semaphore object deallocated while in use");
}
_dispatch_sema4_dispose(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
}
前面我们提到,在创建信号量时有两个属性dsema_value和dsema_orig接收value。其实dsema_orig是用来保留初始值,dsema_value在信号量使用过程中不断变化。等到dispose时,会判断dsema_value和dsema_orig是否相等,如果dsema_value小于初始值dsema_orig,那么系统就会认为信号量还在使用中,此时销毁就会发生crash。所以这是为什么信号量的dispatch_semaphore_signal和dispatch_semaphore_wait要成对使用的原因。