pthreads-tutorial

2022-04-24  本文已影响0人  my_passion
<< ELEG652-pthreads-tutorial >>

1   First PTHREAD Example: hello world

    n 个 workerRoutine thread 均 print caller 分配给 它 的 id + "hello word"
    返回 分配给 它的 id, 使得 caller (main 线程) 知道 哪个 workerRoutine thread 返回
    
2   Incrementing Global Counter: mutex

    n 个 workerRoutine thread 均 并发 增加 1个 global couter
    
3   Reacting on Specific Events & selfDefined barrierWithCv 
    
    Barrier 
        思想: 直到 the last thread 到达 Barrier, 才 唤醒 所有 thread 继续行进
        
        内部结构 
            typedef struct BarrierTag 
            {
                pthread_mutex_t *pMutex;
                pthread_cond_t  *pCond;
                ulong_t         *pCount; // threadNumOnBarrier: Barrier 上要挡住的 线程数, 
            } Barrier;
        
        Barrier 3 函数 
            void barrier_init(Barrier* pBarrier, void *, ulong_t count);
            void barrier_destroy(Barrier* pBarrier);
            void barrier_wait(Barrier* pBarrier);
        
        workRoutine() context 
            typedef struct context_s 
            {
                Barrier* pBarrier;  
                ulong_t id;         
            } Context;
                
            pthread_create(&threads[i], NULL, workerRoutine, &contexts[i]);
        
4   Creating Barriers More Easily 

    理解了 part3 中 Barrier 的实现, 再 用 pthread 中的 pthread_barrier_t 就很容易 
        
        相当于 将 part3 中 Barrier 相关部分封装

4.1 重构 3: 封装 -> 实现出 简化版 barrier 的 数据结构 + 3个函数

    4.2 用 pthread 提供的 barrier 
        
        只需把 4.1 中 pthreadBarrier.h /pthreadBarrier.c 去掉, 改用 pthread 提供的 barrier 
// ===

1 First PTHREAD Example: hello world

    n 个 workerRoutine thread 均 print caller 分配给 它 的 id + "hello word"
    返回 分配给 它的 id, 使得 caller (main 线程) 知道 哪个 workerRoutine thread 返回
    
    Hello, World! . . . Output
    
    // === 1 hello.c
    /* $ gcc hello.c -o a -pthread
       $ ./a
    */
    #include <stdio.h>      // for snprintf(), fprintf(), printf(), puts()
    #include <stdlib.h>     // exit()
    #include <errno.h>      // errno (duh!)
    #include <pthread.h>    // pthread_* 

    #define MAX_NUM_WORKERS 4UL

    typedef struct WorkerIdTag
    { 
        unsigned long id 
    }WorkerId; // only one mem, use struct too

    void* workerRoutine(void* arg)
    {
        // Remember, pthread_t objects are descriptors, not just IDs!
        
        // (1) Retrieving allocated current thread's ID
        WorkerId* self = (WorkerId*) arg; 
        
        // (2) fill msg 
        char msg[100]; 
        int err = snprintf(msg, sizeof(msg),
                            "[%lu]\t Hello, World!\n", self->id);
        if (err < 0) 
        { 
            perror("snprintf"); 
            exit(errno); 
        }   
        
        // (3) output msg 
        puts(msg);
        
        // (4) return allocated current thread's ID 
        //      so that "main" thread knows which thread has returned
        return arg; 
    }

    // 宏 + 可变参数 __VA_ARGS__: 第2/3参数 按格式 %lu %lu 格式化
    #define ERR_MSG(prefix, ...) \
        fprintf(stderr, prefix " %lu out of %lu threads", __VA_ARGS__)

    int main(void) 
    {
        pthread_t workers [ MAX_NUM_WORKERS ];
        WorkerId workersIds [ MAX_NUM_WORKERS ];
        void *pRetVal = NULL;
        
        // (1) create n workerRoutine threads
        puts("[main]\tCreating workers...\n");
        for (unsigned long i = 0; i < MAX_NUM_WORKERS; ++i) 
        {
            workersIds[i].id = i;
            if (0 != pthread_create(&workers[i], NULL, workerRoutine, &workersIds[i]) )
            { 
                ERR_MSG("Could not create thread", i, MAX_NUM_WORKERS);
                exit(errno); 
            }
        }
        
        // (2) join 0~(n-1) threads one by one
        puts("[main]\tJoining the workers...\n");
        for (unsigned long i = 0; i < MAX_NUM_WORKERS; ++i) 
        {       
            if (0 != pthread_join(workers[i], (void**) &pRetVal) ) 
                ERR_MSG("Could not join thread", i, MAX_NUM_WORKERS);
            else
            {
                // (3) convert pRetVal to target type
                WorkerId* wid = (WorkerId*) pRetVal;
                printf("[main]\tWorker N.%lu has returned!\n", wid->id);
            }
        }
        return 0;
    }

    // one possible print
    [main]  Creating workers...
    [main]  Joining the workers...
    [1]  Hello, World!
    [0]  Hello, World!
    [3]  Hello, World!
    [main]  Worker N.0 has returned!
    [main]  Worker N.1 has returned!
    [2]  Hello, World!
    [main]  Worker N.2 has returned!
    [main]  Worker N.3 has returned!

2 Incrementing Global Counter: mutex

    n 个 workerRoutine thread 均 并发 增加 1个 global couter

    // ======1 globalSum.h
    #ifndef GLOBAL_SUM_H
    #define GLOBAL_SUM_H

    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <pthread.h>
    // #include "utils.h"
    typedef struct globalSumTag
    {
        unsigned long *pValue;
        pthread_mutex_t *pMutex;
    } globalSum;

    #endif // GLOBAL_SUM_H
    
    // ======2 IncreaseGlobalCounter.char
    #include "globalSum.h"
    #define MAX_NUM_WORKERS 20UL

    typedef unsigned long ulong_t;

    void* sum(void* arg) 
    {
        globalSum* gs = (globalSum*) arg;
        
        pthread_mutex_lock ( gs->pMutex );   /* Critical section starts here */
        ++*gs->pValue; // ++*(gs->pValue)
        pthread_mutex_unlock ( gs->pMutex ); /* Critical section ends here */
        
        return NULL;
    }
    int main(void) 
    {
        pthread_t threads [ MAX_NUM_WORKERS ];
        globalSum gSs [ MAX_NUM_WORKERS ];
        
        // (1) Note: "global" counter: calling thread stack var's ptr pass to newly created thread's start routine
        //      => stack var pass by ref => simulate global var 
        ulong_t counter = 0;
        pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
        
        // (2) create n threads 
        for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i) 
        {
            // 1) every thread's internal ptr point to global counter 
            gSs[i] = (globalSum){ .pValue = &counter, .pMutex = &mut };
            
            pthread_create(&threads[i], NULL, sum, &gSs[i]);
        }
        
        // (3) join
        for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
            pthread_join(threads[i], NULL);
        
        // (4) output global counter's value
        printf("%lu threads were running. Sum final value: %lu\n", MAX_NUM_WORKERS, counter);
        
        return 0;
    }
    
    $ ./a
    20 threads were running. Sum final value: 20

3 Reacting on Specific Events & selfDefined barrierWithCv

    Barrier 
        思想: 直到 the last thread 到达 Barrier, 才 唤醒 所有 thread 继续行进
        
        内部结构 
            typedef struct BarrierTag 
            {
                pthread_mutex_t *pMutex;
                pthread_cond_t  *pCond;
                ulong_t         *pCount; // threadNumOnBarrier: Barrier 上要挡住的 线程数, 
            } Barrier;

    // ======1 barrier.h
    #ifndef BARRIER_H
    #define BARRIER_H

    #include <pthread.h>

    #define SET_BARRIER_MSG(...) \
        snprintf(buffer, sizeof(buffer), __VA_ARGS__)

    #define NOT_LAST_TO_REACH \
        "[%lu]\tI’m NOT the last one to reach the barrier!"
        
    #define LAST_TO_REACH \
        "[%lu]\tI am the last to reach the barrier! Waking up the others."
        
    #define MAX_NUM_WORKERS 3

    typedef unsigned long ulong_t;

    // Note Barrier 思想: 直到 the last thread 到达 Barrier, 才 唤醒 所有 thread 继续行进 
    typedef struct BarrierTag 
    {
        pthread_mutex_t *pMutex;
        pthread_cond_t  *pCond;
        ulong_t         *pCount; // threadNumOnBarrier: Barrier 上要挡住的 线程数, 
    } Barrier;

    typedef struct context_s 
    {
        Barrier* pBarrier;
        ulong_t id;
    } Context;
        
    #endif // BARRIER_H
    
    // ======2 barrier.c
    #include "barrier.h"
    #include <stdio.h>

    void *workerRoutine(void *pContext) 
    {
        char buffer[81];
        
        // (1) 取 arg
        Context *pC = (Context*) pContext;
        
        printf("[%lu]\tReaching the barrier...\n", pC->id);
        
        // ======= barrier start
        pthread_mutex_lock ( pC->pBarrier->pMutex ); // === (2) lock 
        
        // (3) Barrier 上 阻塞的 线程数 --
        --*pC->pBarrier->pCount;
        
        // (4) current thread is not the last to reach Barrier
        // print + cv.wait
        printf("*pC->pBarrier->pCount = %lu\n", *pC->pBarrier->pCount);
        if (*pC->pBarrier->pCount > 0) 
        {
            SET_BARRIER_MSG(NOT_LAST_TO_REACH, pC->id);
            pthread_cond_wait ( pC->pBarrier->pCond, pC->pBarrier->pMutex ); // === unlock & relock
        } 
        else // ... the last: print
        {
            SET_BARRIER_MSG(LAST_TO_REACH, pC->id);
        }
        
        puts(buffer); // Note: buffer 在 SET_BARRIER_MSG 中被 fill
        
        pthread_mutex_unlock ( pC->pBarrier->pMutex ); // === (5) unlock
        
        // (6) wake up others
        pthread_cond_broadcast( pC->pBarrier->pCond ); 
        // ====== barrier end 
        
        printf("[%lu]\tAfter the barriern\n", pC->id);
        
        return NULL;
    }
        
    
    // ======3 barrierWithCv.c
    #include "barrier.h"
    void *workerRoutine(void *pContext);

    int main(void) 
    {
        pthread_t threads [ MAX_NUM_WORKERS ];
        Context contexts [ MAX_NUM_WORKERS ];
        
        pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
        pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
        
        ulong_t count = MAX_NUM_WORKERS;
        
        // (1) 建 + init a Barrier
        Barrier barrier = {.pMutex = &mut, .pCond = &cond, .pCount = &count};
            
        // (2) create n threads: 所有 thread's arg 的 pBarrier 均指向 该 barrier
        for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
        {
            contexts[i] = (Context){ .pBarrier = &barrier, .id = i };
            
            pthread_create(&threads[i], NULL, workerRoutine, &contexts[i]);
        }
        
        // (2) join
        for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
            pthread_join(threads[i], NULL);
        
        return 0;
    }

    $ gcc barrierWithCv.c barrier.c -o a -pthread
    $ ./a
    [1] Reaching the barrier...
    *pC->pBarrier->pCount = 2
    [0] Reaching the barrier...
    *pC->pBarrier->pCount = 1
    [2] Reaching the barrier...
    *pC->pBarrier->pCount = 0
    [2] I am the last to reach the barrier! Waking up the others.
    [1] I’m NOT the last one to reach the barrier!
    [1] After the barriern
    [0] I’m NOT the last one to reach the barrier!
    [0] After the barriern
    [2] After the barriern

4 Creating Barriers More Easily

理解了 part3 中 Barrier 的实现, 再 用 pthread 中的 pthread_barrier_t 就很容易 
    
    相当于 将 part3 中 Barrier 相关部分封装

4.1 重构 3: 封装 -> 实现出 简化版 barrier 的 数据结构 + 3个函数

    // ====== 1 pthreadBarrier.h 
    #ifndef PTHREADBARRIER_H
    #define PTHREADBARRIER_H

    #include <pthread.h>

    /*
    #define SET_BARRIER_MSG(...) \
        snprintf(buffer, sizeof(buffer), __VA_ARGS__)

    #define NOT_LAST_TO_REACH \
        "[%lu]\tI’m NOT the last one to reach the barrier!"
        
    #define LAST_TO_REACH \
        "[%lu]\tI am the last to reach the barrier! Waking up the others."
    */

    typedef unsigned long ulong_t;

    // Note Barrier 思想: 直到 the last thread 到达 Barrier, 才 唤醒 所有 thread 继续行进 
    typedef struct BarrierTag 
    {
        pthread_mutex_t *pMutex;
        pthread_cond_t  *pCond;
        ulong_t         count; // threadNumOnBarrier: Barrier 上要挡住的 线程数, 
    } Barrier;

    void barrier_init(Barrier* pBarrier, void *, ulong_t count);
    void barrier_destroy(Barrier* pBarrier);
    void barrier_wait(Barrier* pBarrier);
        
    #endif // BARRIER_H

    // ====== 2 pthreadBarrier.c 
    #include "pthreadBarrier.h"
    #include <stdio.h>
    #include <stdlib.h>

    void barrier_init(Barrier* pBarrier, void *pMutexCvStruct, ulong_t count)
    {
        if(pMutexCvStruct == NULL)
        {
            // heap 内存: 因为 heap 在 other func 也要用
            pthread_mutex_t* pMutex = malloc( sizeof(pthread_mutex_t) );
            pBarrier->pMutex = pMutex;
            pthread_mutex_init(pBarrier->pMutex, NULL); // (1) 动态初始化 动态 mutex
            
            pthread_cond_t* pCond = malloc( sizeof(pthread_cond_t) );
            pBarrier->pCond = pCond;
            pthread_cond_init(pBarrier->pCond, NULL);  // (2) 动态初始化 动态 mutex
        }
        else
        {
            /* */
        }
        
        pBarrier->count = count;
    }

    void barrier_destroy(Barrier* pBarrier)
    {
        pthread_cond_destroy(pBarrier->pCond);
        pthread_mutex_destroy(pBarrier->pMutex);
    }

    void barrier_wait(Barrier* pBarrier)
    {
        // === (1) lock 
        pthread_mutex_lock ( pBarrier->pMutex ); 
        
        // (2) Barrier 上 阻塞的 线程数 --
        --pBarrier->count;
        
        // (3) current thread is not the last to reach Barrier
        // print + cv.wait
        printf("pBarrier->count = %lu\n", pBarrier->count);
        if (pBarrier->count > 0) 
        {
            //SET_BARRIER_MSG(NOT_LAST_TO_REACH, id);
            pthread_cond_wait ( pBarrier->pCond, pBarrier->pMutex ); // === unlock & relock
        } 
        else // ... the last: print
        {
            // SET_BARRIER_MSG(LAST_TO_REACH, id);
        }
        
        // puts(buffer); // Note: buffer 在 SET_BARRIER_MSG 中被 fill
        
        // === (4) unlock
        pthread_mutex_unlock ( pBarrier->pMutex ); 
        
        // (5) wake up others (thread)
        pthread_cond_broadcast( pBarrier->pCond ); 
    }

    // ====== 3 barrier.h
    #ifndef BARRIER_H
    #define BARRIER_H

    #include "pthreadBarrier.h"

    #define MAX_NUM_WORKERS 3

    typedef struct ContextTag 
    {
        Barrier* pBarrier;
        ulong_t id;
    } Context;
        
    #endif // BARRIER_H

    // ====== 4 barrier.c
    #include "barrier.h"
    #include <stdio.h>

    void *workerRoutine(void *pContext) 
    {
        // char buffer[81];
        
        // (1) 取 arg
        Context *pC = (Context*) pContext;
        
        printf("[%lu]\tReaching the barrier...\n", pC->id);
        
        // ======= barrier start
        barrier_wait(pC->pBarrier);
        // ====== barrier end 
        
        printf("[%lu]\tAfter the barriern\n", pC->id);
        
        return NULL;
    }

    // ====== 5 barrierWithCv.c
    #include "barrier.h"

    void *workerRoutine(void *pContext);

    int main(void) 
    {
        pthread_t threads [ MAX_NUM_WORKERS ];
        Context contexts [ MAX_NUM_WORKERS ];
        ulong_t count = MAX_NUM_WORKERS;
        
        // (0) 建 Barrier 
        Barrier barrier;

        // (1) init Barrier
        barrier_init(&barrier, NULL, count);
        
        // (2) create n threads: 所有 thread's arg 的 pBarrier 均指向 该 barrier
        for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
        {
            contexts[i] = (Context){ .pBarrier = &barrier, .id = i };
            
            pthread_create(&threads[i], NULL, workerRoutine, &contexts[i]);
        }
        
        // (3) join
        for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
            pthread_join(threads[i], NULL);
        
        // (4)
        barrier_destroy(&barrier);
        
        return 0;
    }
            

    $ gcc barrierWithCv.c barrier.c pthreadBarrier.c -o a -pthread
    $ ./a
    [2] Reaching the barrier...
    pBarrier->count = 2
    [1] Reaching the barrier...
    pBarrier->count = 1
    [0] Reaching the barrier...
    pBarrier->count = 0
    [0] After the barriern
    [2] After the barriern
    [1] After the barriern

4.2 用 pthread 提供的 barrier

    只需把 4.1 中 pthreadBarrier.h /pthreadBarrier.c 去掉, 改用 pthread 提供的 barrier
        效果 与 4.1 相同 
        
    int pthread barrier init( pthread barrier t restrict* barrier, const
                                pthread barrierattr t *restrict attr, unsigned count )  
    int pthread barrier destroy( pthread barrier t restrict* barrier )
    int pthread barrier wait( pthread barrier t restrict* barrier )
    
    // ====== 1  barrier.h
    #ifndef BARRIER_H
    #define BARRIER_H

    #include <pthread.h> // #include "pthreadBarrier.h"

    #define MAX_NUM_WORKERS 3

    typedef unsigned long ulong_t;

    typedef struct ContextTag 
    {
        pthread_barrier_t *pBarrier; // Barrier* pBarrier;
        ulong_t id;
    } Context;
        
    #endif // BARRIER_H

    // ====== 2  barrier.c
    #include "barrier.h"
    #include <stdio.h>

    void *workerRoutine(void *pContext) 
    {
        // char buffer[81];
        
        // (1) 取 arg
        Context *pC = (Context*) pContext;
        
        printf("[%lu]\tReaching the barrier...\n", pC->id);
        
        // ======= barrier start
        pthread_barrier_wait(pC->pBarrier);
        // ====== barrier end 
        
        printf("[%lu]\tAfter the barriern\n", pC->id);
        
        return NULL;
    }

    // ====== 3  barrierWithCv.c
    #include "barrier.h"

    void *workerRoutine(void *pContext);

    int main(void) 
    {
        pthread_t threads [ MAX_NUM_WORKERS ];
        Context contexts [ MAX_NUM_WORKERS ];
        ulong_t count = MAX_NUM_WORKERS;
        
        // (0) 建 Barrier 
        pthread_barrier_t barrier; // Barrier barrier;

        // (1) init Barrier
        pthread_barrier_init(&barrier, NULL, count); // barrier_init(&barrier, NULL, count);
        
        // (2) create n threads: 所有 thread's arg 的 pBarrier 均指向 该 barrier
        for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
        {
            contexts[i] = (Context){ .pBarrier = &barrier, .id = i };
            
            pthread_create(&threads[i], NULL, workerRoutine, &contexts[i]);
        }
        
        // (3) join
        for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
            pthread_join(threads[i], NULL);
        
        // (4)
        pthread_barrier_destroy(&barrier); // barrier_destroy(&barrier);
        
        return 0;
    }
上一篇下一篇

猜你喜欢

热点阅读