linux用户空间 - 多线程通讯
- 作者: 雪山肥鱼
- 时间:20210711 22:41z
- 目的:信号量,spinlock,条件变量等
# 信号量
# Mutex vs Spinlock
# 临界区较短
# 临界区较长
# gdb 调试死锁
# 正确加锁
## 降低锁的粒度
# 条件变量
# 优先级反转
信号量
信号量无论在内核还是用户空间都是存在的
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t *sem);
int sem_getvalue(sem_t * sem, int *sval);
int sem_destroy(sem_t *sem);
sem_wait 当 信号量不为0 时 才有效,相当于 p 操作, 信号量 - 1
sem_post 相当于 v 操作,信号量 + 1
初始的信号量为 1 相当于 互斥是 mutex
初始的信号量为 0 相当于 同步,强调先后顺序。
void * prduce( void * arg) {
int i ;
for(i = 0; i < nitems; i ++ {
sem_wait(&shared.nempty); // 等仓库空位
sem_wait(&shared.mutex); // 锁柱临界区,生产者消费者均要访问,且为多核准备
shard.buff[i % NBUFF] = i;
count << "Product" << shared.buff[i % NBUFF] <<endl;
sem_post(&shared.mutex); // 释放临界区
sem_post(shared.nstored); // 库存增加1
}
return (NULL);
}
void * consume ( void * arg) {
int i;
for( i = 0; i < nitems; i ++ ) {
sem_wait(&shared.nstored); // 在等库存
sem_wait(&shared.mutex):
if(shared.buff[i%NUBFF] != i )
cout << "buff [" << i << "] = " << shared.buff[i %NBUFF] << endl;
cout << "Consumer:"<< shared.buff[ i %NBUFF] << endl;
sem_post(&shared.mutex);
sem_post(&shared.nempty); //释放空位
}
return (NULL);
}
int main( int argc, char **argv) {
pthread_t tid_produce, tid_consumer;
if(argc !=2) { cout <<"Usage:prodcons number" << endl; exit(0); }
nitems = atoi(argv[1]);
sem_init(&shared.mutex, 0, 1);
sem_init(&shared.nempty, 0, NBUFF);// 仓库空位
sem_init(&shared.nstored, 0, 0); // 最开始的库存
pthread_create(&tid_produce, NULL, produce, NULL);
pthread_create(&tid_consumer, NULL, consume, NULL);
pthread_join(tid_produce, NULL);
pthread_join(tid_consumer, NULL);
sem_detroy(&shared.mutex);
sem_detroy(&shared.nempty);
sem_detroy(&shared.nstored);
}
信号量 同步,互斥都可以做的。但是 做互斥 最好还是用专门的mutex
Mutex vs Spinlock
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL);
pthread_mutex_lock(&mutex);
pthread_mutex_unlock(&mutex);
int pthread_spin_lock(pthread_spinlock_t * lock);
int pthread_spin_trylock(pthread_spinlock_t * lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);
Mutex: 拿不到 Mutex 线程会谁
Spinlock 拿不到 spinlock, 线程会自选,所在CPU 关闭抢占调度,其他线程根本进不来。
Spinlock 大都用在多核
Spinlock适合场景:
- 锁柱的区间短
- 区间经常发生
- 区间的大小成为性能瓶颈
spinlock 锁柱大区间则可能导致很高的CPU利用率和性能下降
pthread_spinlock 是用户态的,是不会通过系统调用,把CPU 的抢占给关掉的。只是说T1拿到spinlock 后,T2还是一直在去尝试抢。用户是没有办法控制内核把抢占调度关闭
临界区较短
#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <errno.h>
#include <sys/time.h>
#include <list>
#include <pthread.h>
//#define USE_SPINLOCK
#define LOOPS 5000000
using namespace std;
list<int> the_list;
#ifdef USE_SPINLOCK
pthread_spinlock_t spinlock;
#else
pthread_mutex_t mutex;
#endif
//Get the thread id
pid_t gettid() { return syscall( __NR_gettid ); }
void *consumer(void *ptr)
{
int i;
printf("Consumer TID %lun", (unsigned long)gettid());
while (1)
{
#ifdef USE_SPINLOCK
pthread_spin_lock(&spinlock);
#else
pthread_mutex_lock(&mutex);
#endif
if (the_list.empty())
{
#ifdef USE_SPINLOCK
pthread_spin_unlock(&spinlock);
#else
pthread_mutex_unlock(&mutex);
#endif
break;
}
i = the_list.front();
the_list.pop_front();
#ifdef USE_SPINLOCK
pthread_spin_unlock(&spinlock);
#else
pthread_mutex_unlock(&mutex);
#endif
}
return NULL;
}
int main()
{
int i;
pthread_t thr1, thr2;
struct timeval tv1, tv2;
#ifdef USE_SPINLOCK
pthread_spin_init(&spinlock, 0);
#else
pthread_mutex_init(&mutex, NULL);
#endif
// Creating the list content...
for (i = 0; i < LOOPS; i++)
the_list.push_back(i);
// Measuring time before starting the threads...
gettimeofday(&tv1, NULL);
pthread_create(&thr1, NULL, consumer, NULL);
pthread_create(&thr2, NULL, consumer, NULL);
pthread_join(thr1, NULL);
pthread_join(thr2, NULL);
// Measuring time after threads finished...
gettimeofday(&tv2, NULL);
if (tv1.tv_usec > tv2.tv_usec)
{
tv2.tv_sec--;
tv2.tv_usec += 1000000;
}
printf("Result - %ld.%ldn", tv2.tv_sec - tv1.tv_sec,
tv2.tv_usec - tv1.tv_usec);
#ifdef USE_SPINLOCK
pthread_spin_destroy(&spinlock);
#else
pthread_mutex_destroy(&mutex);
#endif
return 0;
}
time ./a.out 进行观察
临界区较长
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/syscall.h>
#define THREAD_NUM 2
//#define USE_SPINLOCK
pthread_t g_thread[THREAD_NUM];
#ifdef USE_SPINLOCK
pthread_spinlock_t g_spin;
#else
pthread_mutex_t g_mutex;
#endif
__uint64_t g_count;
pid_t gettid()
{
return syscall(SYS_gettid);
}
void *run_amuck(void *arg)
{
int i, j;
printf("Thread %lu started.n", (unsigned long)gettid());
for (i = 0; i < 10000; i++) {
#ifdef USE_SPINLOCK
pthread_spin_lock(&g_spin);
#else
pthread_mutex_lock(&g_mutex);
#endif
for (j = 0; j < 100000; j++) {
if (g_count++ == 123456789)
printf("Thread %lu wins!n", (unsigned long)gettid());
}
#ifdef USE_SPINLOCK
pthread_spin_unlock(&g_spin);
#else
pthread_mutex_unlock(&g_mutex);
#endif
}
printf("Thread %lu finished!n", (unsigned long)gettid());
return (NULL);
}
int main(int argc, char *argv[])
{
int i, threads = THREAD_NUM;
printf("Creating %d threads...n", threads);
#ifdef USE_SPINLOCK
pthread_spin_init(&g_spin, 0);
#else
pthread_mutex_init(&g_mutex, NULL);
#endif
for (i = 0; i < threads; i++)
pthread_create(&g_thread[i], NULL, run_amuck, (void *) i);
for (i = 0; i < threads; i++)
pthread_join(g_thread[i], NULL);
printf("Done.n");
return (0);
}
time ./a.out 进行观察
gdb 调试死锁
死锁程序:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
pthread_mutex_t mutex_1;
pthread_mutex_t mutex_2;
void *child1(void *arg)
{
while(1){
pthread_mutex_lock(&mutex_1);
sleep(3);
pthread_mutex_lock(&mutex_2);
printf("thread 1 get running \n");
pthread_mutex_unlock(&mutex_2);
pthread_mutex_unlock(&mutex_1);
sleep(5);
}
}
void *child2(void *arg)
{
while(1){
pthread_mutex_lock(&mutex_2);
pthread_mutex_lock(&mutex_1);
printf("thread 2 get running \n");
pthread_mutex_unlock(&mutex_1);
pthread_mutex_unlock(&mutex_2);
sleep(5);
}
}
int main(void)
{
pthread_t tid1,tid2;
pthread_mutex_init(&mutex_1,NULL);
pthread_mutex_init(&mutex_2,NULL);
pthread_create(&tid1,NULL,child1,NULL);
pthread_create(&tid2,NULL,child2,NULL);
do{
sleep(2);
}while(1);
pthread_exit(0);
}
使用gdb相关命令:
sudo gdb -q -p 'pidof a.out'
/*gdb:*/
i threads // 所有线程,并示意现在gdb在哪个线程
thread2 // 切换到2 号线程
bt
thread 3
bt
l child1
l child2
p mutex_2
p mutex_1


T2, pid:1475,wait mutex_2, owern: 1476

T3, pid: 1476, wait mutex_1, owner: 1475
非常明显的死锁
//查看所有线程的 bt
thread apply all bt

正确加锁
三要素:
-
同一把锁
-
语义整体(事务的概念)
-
粒度最小
-
可接受
- 我没少, 你没多
- 我少了, 你多了
- 不可接受
a. 我少了,你没多
b. 你多了,我没少
降低锁的粒度
大粒度锁举例:
学生报名,报名到每个老师名下,假如有10个老师,每次报名,都会锁整体数据。这样导致锁的粒度非常大
小粒度锁举例:
学生报名,先报到每个老师名下,由每个老师节点去同步数据,锁的粒度会降低。
条件变量
pthread_mutex_t count_lock;
pthread_cond_t count_nonzero;
unsigned count;
decrement_count()
{
pthread_mutex_lock(&count_lock);
while(count == 0) // 可以把count 换成 list.is_empty
pthread_cond_wait(&count_nonzero, &count_lock);
count = count -1;
pthread_mutex_unlock(&count_lock);
}
increment_count()
{
pthread_mutex_lock(&count_lock);
if(count == 0)
pthread_cond_signal(&count_nonzero);
count = count +1;
pthread_mutex_unlock(&count_lockS);
}
pthread_cond_wait 自带的三步S:
- 把锁丢掉进入sleep
- 等待被schedule - > 因为pthread_cond_signal()所以 进入 调度队列,等待被调度
signal只是 将sleep的线程丢到schedule里,此时线程还不能run,因为还未拿到锁!S - 等待 increment_count 释放锁,继续拿mutex,锁柱临界区
优先级反转s
至少设计3个线程
现象:高优先级线程等待低优先级线程释放锁的过程中,中等优先级线程打断优先级线程。 高优先级线程延迟很大,看起来像高优先级线程在等待低优先级的线程。
int pthread_mutexattr_setprotocol(pthread_mutexattr_t *attr, int protocal);
PTHREAD_PRIO_NONE:
PTHREAD_PRIO_INHERIT:优先级继承
低优先级拿到锁后,高优先级想拿锁,在这一瞬间,linux内核会把低优先级线程提升到和高优先级线程的优先级一样高。
代码范例:
#define _GNU_SOURCE /* See feature_test_macros(7) */
#include <sched.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>
#include <sched.h>
#define USE_PRI_INHERIT /* high-priority task only wait LOW_SPIN time */
#if 1
#undef USE_PRI_INHERIT /* high-priority task wait LOW+MID_SPIN time */
#endif
/*
A simple demonstration of a 3-thread priority inversion that can
be improved with priority inheritance.
Written by Kevin Dankwardt, k@kcomputing.com
*/
sem_t low_go, mid_go, high_go, high_ready, mid_ready;
pthread_mutex_t shared_mutex;
const int minutes = 60;
const int seconds = 1;
pthread_mutexattr_t mutex_attr;
#define LOW_SPIN 2
#define MID_SPIN 5
int gettime ()
{
return time(NULL);
}
void spin_for (int n)
{
int now = gettime();
int counter=0;
while ( ( gettime() - now) < n)
{ counter++;
//if ((counter % 1000000) == 0) printf("gettime()-now = %d n=%d\n",gettime()-now,n);
}
//printf("done spinning for %d seconds\n",n);
}
void *low (void *n)
{
int now = gettime();
struct sched_param the_priority;
the_priority.sched_priority = 1;
pthread_setschedparam(pthread_self(), SCHED_FIFO, &the_priority);
sem_wait(&low_go);
pthread_mutex_lock(&shared_mutex);
sem_wait(&mid_ready);
sem_wait(&high_ready);
sem_post(&high_go);
sem_post(&mid_go);
spin_for(LOW_SPIN*seconds);
pthread_mutex_unlock(&shared_mutex);
printf ("low took %d seconds wanted about %d (critical section + mid time)\n",gettime() - now,LOW_SPIN+MID_SPIN);
return NULL;
}
void *mid (void *n)
{
struct sched_param the_priority;
int now;
the_priority.sched_priority = 25;
pthread_setschedparam(pthread_self(), SCHED_FIFO, &the_priority);
sem_post(&mid_ready);
sem_wait(&mid_go);
now = gettime();
spin_for(MID_SPIN*seconds);
printf ("mid took %d seconds wanted about %d\n",gettime() - now,MID_SPIN);
return NULL;
}
void *high (void *n)
{
int now ;
struct sched_param the_priority;
the_priority.sched_priority = 50;
pthread_setschedparam(pthread_self(), SCHED_FIFO, &the_priority);
sem_post(&high_ready);
sem_wait(&high_go);
now=gettime();
pthread_mutex_lock(&shared_mutex);
pthread_mutex_unlock(&shared_mutex);
printf ("high took %d seconds wanted about %d (low critical section)\n",gettime() - now,LOW_SPIN);
return NULL;
}
int main ()
{
pthread_t tid1, tid2, tid3;
cpu_set_t thecpus;
CPU_ZERO(&thecpus);
CPU_SET(0, &thecpus);
if (sched_setaffinity(getpid(), sizeof(cpu_set_t), &thecpus)< 0)
perror("set affinity");
if (pthread_mutexattr_init(&mutex_attr))
{perror("mutex init"); exit(1);}
#if defined(_POSIX_THREAD_PRIO_INHERIT) && _POSIX_THREAD_PRIO_INHERIT != -1 && defined(USE_PRI_INHERIT)
printf("Using priority inheritance\n");
if (pthread_mutexattr_setprotocol(&mutex_attr, PTHREAD_PRIO_INHERIT))
{perror("mutex init"); exit(1);}
#else
printf("Not Using priority inheritance\n");
#endif
if (pthread_mutex_init(&shared_mutex, &mutex_attr))
{perror("mutex init"); exit(1);}
// all initialized to zero. Must wait on a post
sem_init (&low_go,0,0);
sem_init (&mid_go,0,0);
sem_init (&high_go,0,0);
sem_init (&high_ready,0,0);
sem_init (&mid_ready,0,0);
pthread_create(&tid1, NULL, low, NULL);
pthread_create(&tid2, NULL, mid, NULL);
pthread_create(&tid3, NULL, high, NULL);
sem_post(&low_go);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
pthread_join(tid3, NULL);
return 0;
}
代码解读:
- 整体思路, 也是线程之间的同步范例
- 通过信号量控制low优先级线程先跑起来,低优先级先拿锁
low 通过 mid_ready & high_ready 信号量控制 mid 和 high - high 和 mid 通过 mid_ready 和 high_ready 通知 low 自己已经就绪
- low 通过 post mid_go, high_go 让两个线程都进入调度
- 但只有 high是去抢锁的,mid 与 low 进行抢CPU
- 验证了优先级继承
复现了优先级反转
- 通过信号量控制low优先级线程先跑起来,低优先级先拿锁
- main 函数内容解读:
- CPU 系列函数 设置 进程的CPU亲和性
- 初始化 mutex_attr 属性
通过 pthread_mutex_init 将 mutex 与 mutex_attr 绑定 - 所有信号量清0 保证线程运行的时序性
- sem_post(&low_go) 先让 低优先级的线程跑起来
- 各线程动作
- 设置线程调度属性,high mid low
- low 线程 通过 信号量 通知 其他两线程是否进入调度
- low 和 high 互相抢锁
- low 和 mid 存在 race condition 的关系

