linux用户空间 - 多线程通讯

2021-07-12  本文已影响0人  404Not_Found
# 信号量
# Mutex vs Spinlock
  # 临界区较短
  # 临界区较长
# gdb 调试死锁
# 正确加锁
  ## 降低锁的粒度
# 条件变量
# 优先级反转

信号量

信号量无论在内核还是用户空间都是存在的

int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t *sem);
int sem_getvalue(sem_t * sem, int *sval);
int sem_destroy(sem_t *sem);

sem_wait 当 信号量不为0 时 才有效,相当于 p 操作, 信号量 - 1
sem_post 相当于 v 操作,信号量 + 1

初始的信号量为 1 相当于 互斥是 mutex
初始的信号量为 0 相当于 同步,强调先后顺序。

void * prduce( void * arg) {
  int i ;
  for(i = 0; i < nitems; i ++ {
    sem_wait(&shared.nempty); // 等仓库空位
    sem_wait(&shared.mutex); // 锁柱临界区,生产者消费者均要访问,且为多核准备
    shard.buff[i % NBUFF] = i;
    count << "Product" << shared.buff[i % NBUFF] <<endl;
    sem_post(&shared.mutex);  // 释放临界区
    sem_post(shared.nstored);  // 库存增加1
  }
  return (NULL);
}
void * consume ( void * arg) {
  int i;
  for( i = 0; i < nitems; i ++ ) {
      sem_wait(&shared.nstored); // 在等库存
      sem_wait(&shared.mutex):      
      if(shared.buff[i%NUBFF] != i )
        cout << "buff [" << i << "] = " << shared.buff[i %NBUFF] << endl;
      cout << "Consumer:"<< shared.buff[ i %NBUFF] << endl;
      sem_post(&shared.mutex);
      sem_post(&shared.nempty); //释放空位
  }
  return (NULL);
}
int main( int argc, char **argv) {
  pthread_t tid_produce, tid_consumer;
  if(argc !=2) { cout <<"Usage:prodcons number" << endl; exit(0); }
  nitems = atoi(argv[1]);
  sem_init(&shared.mutex, 0, 1);
  sem_init(&shared.nempty, 0, NBUFF);// 仓库空位
  sem_init(&shared.nstored, 0, 0); // 最开始的库存
  pthread_create(&tid_produce, NULL, produce, NULL);
  pthread_create(&tid_consumer, NULL, consume, NULL);
  pthread_join(tid_produce, NULL);
  pthread_join(tid_consumer, NULL);
  sem_detroy(&shared.mutex);
  sem_detroy(&shared.nempty);
  sem_detroy(&shared.nstored);
}

信号量 同步,互斥都可以做的。但是 做互斥 最好还是用专门的mutex

Mutex vs Spinlock

pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL);

pthread_mutex_lock(&mutex);
pthread_mutex_unlock(&mutex);
int pthread_spin_lock(pthread_spinlock_t * lock);
int pthread_spin_trylock(pthread_spinlock_t * lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);

Mutex: 拿不到 Mutex 线程会谁
Spinlock 拿不到 spinlock, 线程会自选,所在CPU 关闭抢占调度,其他线程根本进不来。
Spinlock 大都用在多核
Spinlock适合场景:

  1. 锁柱的区间短
  2. 区间经常发生
  3. 区间的大小成为性能瓶颈
    spinlock 锁柱大区间则可能导致很高的CPU利用率和性能下降

pthread_spinlock 是用户态的,是不会通过系统调用,把CPU 的抢占给关掉的。只是说T1拿到spinlock 后,T2还是一直在去尝试抢。用户是没有办法控制内核把抢占调度关闭

临界区较短

#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <errno.h>
#include <sys/time.h>
#include <list>
#include <pthread.h>

//#define USE_SPINLOCK
#define LOOPS 5000000
 
using namespace std;
 
list<int> the_list;
 
#ifdef USE_SPINLOCK
pthread_spinlock_t spinlock;
#else
pthread_mutex_t mutex;
#endif
 
//Get the thread id
pid_t gettid() { return syscall( __NR_gettid ); }
 
void *consumer(void *ptr)
{
    int i;
 
    printf("Consumer TID %lun", (unsigned long)gettid());
 
    while (1)
    {
#ifdef USE_SPINLOCK
        pthread_spin_lock(&spinlock);
#else
        pthread_mutex_lock(&mutex);
#endif
 
        if (the_list.empty())
        {
#ifdef USE_SPINLOCK
            pthread_spin_unlock(&spinlock);
#else
            pthread_mutex_unlock(&mutex);
#endif
            break;
        }
 
        i = the_list.front();
        the_list.pop_front();
 
#ifdef USE_SPINLOCK
        pthread_spin_unlock(&spinlock);
#else
        pthread_mutex_unlock(&mutex);
#endif
    }
 
    return NULL;
}
 
int main()
{
    int i;
    pthread_t thr1, thr2;
    struct timeval tv1, tv2;
 
#ifdef USE_SPINLOCK
    pthread_spin_init(&spinlock, 0);
#else
    pthread_mutex_init(&mutex, NULL);
#endif
 
    // Creating the list content...
    for (i = 0; i < LOOPS; i++)
        the_list.push_back(i);
 
    // Measuring time before starting the threads...
    gettimeofday(&tv1, NULL);
 
    pthread_create(&thr1, NULL, consumer, NULL);
    pthread_create(&thr2, NULL, consumer, NULL);
 
    pthread_join(thr1, NULL);
    pthread_join(thr2, NULL);
 
    // Measuring time after threads finished...
    gettimeofday(&tv2, NULL);
 
    if (tv1.tv_usec > tv2.tv_usec)
    {
        tv2.tv_sec--;
        tv2.tv_usec += 1000000;
    }
 
    printf("Result - %ld.%ldn", tv2.tv_sec - tv1.tv_sec,
        tv2.tv_usec - tv1.tv_usec);
 
#ifdef USE_SPINLOCK
    pthread_spin_destroy(&spinlock);
#else
    pthread_mutex_destroy(&mutex);
#endif
 
    return 0;
}

time ./a.out 进行观察

临界区较长

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/syscall.h>
 
#define        THREAD_NUM     2

//#define USE_SPINLOCK

pthread_t g_thread[THREAD_NUM];
#ifdef USE_SPINLOCK
pthread_spinlock_t g_spin;
#else
pthread_mutex_t g_mutex;
#endif
__uint64_t g_count;
 
pid_t gettid()
{
    return syscall(SYS_gettid);
}
 
void *run_amuck(void *arg)
{
       int i, j;
 
       printf("Thread %lu started.n", (unsigned long)gettid());
 
       for (i = 0; i < 10000; i++) {
#ifdef USE_SPINLOCK
           pthread_spin_lock(&g_spin);
#else
               pthread_mutex_lock(&g_mutex);
#endif
               for (j = 0; j < 100000; j++) {
                       if (g_count++ == 123456789)
                               printf("Thread %lu wins!n", (unsigned long)gettid());
               }
#ifdef USE_SPINLOCK
           pthread_spin_unlock(&g_spin);
#else
               pthread_mutex_unlock(&g_mutex);
#endif
       }
        
       printf("Thread %lu finished!n", (unsigned long)gettid());
 
       return (NULL);
}
 
int main(int argc, char *argv[])
{
       int i, threads = THREAD_NUM;
 
       printf("Creating %d threads...n", threads);
#ifdef USE_SPINLOCK
       pthread_spin_init(&g_spin, 0);
#else
       pthread_mutex_init(&g_mutex, NULL);
#endif
       for (i = 0; i < threads; i++)
               pthread_create(&g_thread[i], NULL, run_amuck, (void *) i);
 
       for (i = 0; i < threads; i++)
               pthread_join(g_thread[i], NULL);
 
       printf("Done.n");
 
       return (0);
}

time ./a.out 进行观察

gdb 调试死锁

死锁程序:

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
pthread_mutex_t mutex_1;
pthread_mutex_t mutex_2;

void *child1(void *arg)
{
    while(1){
        pthread_mutex_lock(&mutex_1);
        sleep(3);
        pthread_mutex_lock(&mutex_2);
        printf("thread 1 get running \n");
        pthread_mutex_unlock(&mutex_2);
        pthread_mutex_unlock(&mutex_1);
        sleep(5);
    }
}

void *child2(void *arg)
{
    while(1){
        pthread_mutex_lock(&mutex_2);
        pthread_mutex_lock(&mutex_1);
        printf("thread 2 get running \n");
        pthread_mutex_unlock(&mutex_1);
        pthread_mutex_unlock(&mutex_2);
        sleep(5);
    }
}

int main(void)
{
    pthread_t tid1,tid2;
    pthread_mutex_init(&mutex_1,NULL);
    pthread_mutex_init(&mutex_2,NULL);
    pthread_create(&tid1,NULL,child1,NULL);
    pthread_create(&tid2,NULL,child2,NULL);
    do{
        sleep(2);
    }while(1);
    pthread_exit(0);
}

使用gdb相关命令:

sudo gdb -q -p 'pidof a.out'
/*gdb:*/
i threads // 所有线程,并示意现在gdb在哪个线程
thread2 // 切换到2 号线程 
bt
thread 3
bt
l child1
l child2
p mutex_2
p mutex_1
gdb 调试.png
mutex_2 信息.png
T2, pid:1475,wait mutex_2, owern: 1476 mutex_1 信息.png

T3, pid: 1476, wait mutex_1, owner: 1475
非常明显的死锁

//查看所有线程的 bt
thread apply all bt
所有线程堆栈调用信息.png

正确加锁

三要素:

  1. 我没少, 你没多
  2. 我少了, 你多了

降低锁的粒度

大粒度锁举例:
学生报名,报名到每个老师名下,假如有10个老师,每次报名,都会锁整体数据。这样导致锁的粒度非常大
小粒度锁举例:
学生报名,先报到每个老师名下,由每个老师节点去同步数据,锁的粒度会降低。

条件变量

pthread_mutex_t count_lock;
pthread_cond_t count_nonzero;
unsigned count;

decrement_count()
{
  pthread_mutex_lock(&count_lock);
  while(count == 0) // 可以把count 换成 list.is_empty
    pthread_cond_wait(&count_nonzero, &count_lock);
  count = count -1;
  pthread_mutex_unlock(&count_lock);
}

increment_count()
{
  pthread_mutex_lock(&count_lock);
  if(count == 0)
    pthread_cond_signal(&count_nonzero);
  count = count +1;
  pthread_mutex_unlock(&count_lockS);
}

pthread_cond_wait 自带的三步S:

  1. 把锁丢掉进入sleep
  2. 等待被schedule - > 因为pthread_cond_signal()所以 进入 调度队列,等待被调度
    signal只是 将sleep的线程丢到schedule里,此时线程还不能run,因为还未拿到锁!S
  3. 等待 increment_count 释放锁,继续拿mutex,锁柱临界区

优先级反转s

至少设计3个线程
现象:高优先级线程等待低优先级线程释放锁的过程中,中等优先级线程打断优先级线程。 高优先级线程延迟很大,看起来像高优先级线程在等待低优先级的线程。

int pthread_mutexattr_setprotocol(pthread_mutexattr_t *attr, int protocal);
PTHREAD_PRIO_NONE:
PTHREAD_PRIO_INHERIT:优先级继承

低优先级拿到锁后,高优先级想拿锁,在这一瞬间,linux内核会把低优先级线程提升到和高优先级线程的优先级一样高。
代码范例:

#define _GNU_SOURCE             /* See feature_test_macros(7) */
#include <sched.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>
#include <sched.h>

#define USE_PRI_INHERIT /* high-priority task only wait LOW_SPIN time */
#if 1
#undef USE_PRI_INHERIT /* high-priority task wait LOW+MID_SPIN time */
#endif

/*
   A simple demonstration of a 3-thread priority inversion that can
   be improved with priority inheritance.

   Written by Kevin Dankwardt, k@kcomputing.com

 */

sem_t low_go, mid_go, high_go, high_ready, mid_ready;
pthread_mutex_t shared_mutex;
const int minutes = 60;
const int seconds = 1;
pthread_mutexattr_t mutex_attr;

#define LOW_SPIN 2
#define MID_SPIN 5

int gettime ()
{
    return time(NULL);
}


void spin_for (int n)
{
    int now = gettime();
    int counter=0;
    while ( ( gettime() - now) < n)
    { counter++;
        //if ((counter % 1000000) == 0) printf("gettime()-now = %d n=%d\n",gettime()-now,n);
    }
    //printf("done spinning for %d seconds\n",n);
}

void *low (void *n)
{
    int now = gettime();
    struct sched_param the_priority;

    the_priority.sched_priority  = 1;
    pthread_setschedparam(pthread_self(), SCHED_FIFO, &the_priority);
    sem_wait(&low_go);
    pthread_mutex_lock(&shared_mutex);
    sem_wait(&mid_ready);
    sem_wait(&high_ready);
    sem_post(&high_go);
    sem_post(&mid_go);
    spin_for(LOW_SPIN*seconds);
    pthread_mutex_unlock(&shared_mutex);
    printf ("low took %d seconds wanted about %d (critical section + mid time)\n",gettime() - now,LOW_SPIN+MID_SPIN);
    return NULL;
}

void *mid (void *n)
{
    struct sched_param the_priority;
    int now;

    the_priority.sched_priority  = 25;
    pthread_setschedparam(pthread_self(), SCHED_FIFO, &the_priority);
    sem_post(&mid_ready);
    sem_wait(&mid_go);
    now = gettime();
    spin_for(MID_SPIN*seconds);
    printf ("mid took %d seconds wanted about %d\n",gettime() - now,MID_SPIN);
    return NULL;
}

void *high (void *n)
{
    int now ;
    struct sched_param the_priority;

    the_priority.sched_priority  = 50;
    pthread_setschedparam(pthread_self(), SCHED_FIFO, &the_priority);
    sem_post(&high_ready);
    sem_wait(&high_go);
    now=gettime();
    pthread_mutex_lock(&shared_mutex);
    pthread_mutex_unlock(&shared_mutex);
    printf ("high took %d seconds wanted about %d (low critical section)\n",gettime() - now,LOW_SPIN);
    return NULL;
}

int main ()
{
    pthread_t tid1, tid2, tid3;
    cpu_set_t thecpus;


    CPU_ZERO(&thecpus);
    CPU_SET(0, &thecpus);
    if (sched_setaffinity(getpid(), sizeof(cpu_set_t), &thecpus)< 0)
        perror("set affinity");

    if (pthread_mutexattr_init(&mutex_attr)) 
    {perror("mutex init"); exit(1);}

#if defined(_POSIX_THREAD_PRIO_INHERIT) && _POSIX_THREAD_PRIO_INHERIT != -1 && defined(USE_PRI_INHERIT)
    printf("Using priority inheritance\n");
    if (pthread_mutexattr_setprotocol(&mutex_attr, PTHREAD_PRIO_INHERIT))
    {perror("mutex init"); exit(1);}
#else
    printf("Not Using priority inheritance\n");

#endif

    if (pthread_mutex_init(&shared_mutex, &mutex_attr))
    {perror("mutex init"); exit(1);}
    // all initialized to zero. Must wait on a post
    sem_init (&low_go,0,0);
    sem_init (&mid_go,0,0);
    sem_init (&high_go,0,0);
    sem_init (&high_ready,0,0); 
    sem_init (&mid_ready,0,0); 

    pthread_create(&tid1, NULL, low, NULL);
    pthread_create(&tid2, NULL, mid, NULL);
    pthread_create(&tid3, NULL, high, NULL);

    sem_post(&low_go);
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    pthread_join(tid3, NULL);

    return 0;
}

代码解读:

优先级继承.png
无优先级继承.png
上一篇 下一篇

猜你喜欢

热点阅读