chapter5: 高级线程编程 - Programming

2022-05-21  本文已影响0人  my_passion

chapter5 高级线程编程

    [1] 一次性初始化
        data 初始化
    
    [2] 属性对象
        创建 thread / mutex / cv 时, 控制它们的特征
    
    [3] 线程特定数据
        数据库机制
            允许 a library 
                1] 关联 data 与其遇到的 各 threadBlocks 
                2] retrieve data later
                
    Pthreads 实时调度 工具(facilities)
        可预测 交互 

5.1 一次性(One-time) 初始化

pthread_once_t onceControl = PTHREAD_ONCE_INIT; // 控制变量
int pthread_once (pthread_once_t *onceControl,
                  void (*init_routine) (void) );

(1) 需要 且只能做一次 的 事情, 如 
    1] 初始化 mutexes
    2] 创建 thread-specific data keys 
    然后 才能真正 创建 关联的 thread 

(2) 一次性初始化
 
    顺序编程
        bool 变量 管理
            1] 静态初始化为 0
            2] 依赖 code test bool 
                若为 0, 可初始化 -> 置为 1
            3] 后续 check -> 1 -> 跳过初始化
    
    多线程 
        
        初始化 state 是 shared invariant, 必须用 mutex 保护 
            
        [1] 自定义 一次性初始化 code
            bool 变量 + statically initialized mutex
            
        [2] pthread_once
        
        vs 
            1) 大多 case, [1] 更方便 且 高效 
            
            2) 用 pthread_once 的 主要原因
            
                最初, pthreads `不允许 静态初始化 mutex`
                
                => 要想用 mutex
                        
                oneTimeInitCode <- - -
                            |        |
                必须先调    |        | 为了能 1次性初始化, 又 必须调 
                            |/       |                          
                pthread_mutex_init --
                        初始化 mutex
                    
                =>  死循环 
                        
            解决: pthread_once
            
        静态初始化 mutex 被 pthreads 支持 后,
            pthread_once 保留为 方便的函数  
                        
        Note 
            若 方便, 就 用 pthread_once, 但不必须用 pthread_once
        
(3) pthread_once 思想 
            
    双检测 + memory barrier ( compiler + cpu )
    
        第1次 check 控制变量( 标志 初始化 是否已完成)
            
            1] 已完成, 直接 return 
            
            2] else 
            
                lll_lock: gcc 内嵌宏指令 -> memory barrier => 避免了 双检测 问题 
                
                第2次检测, 若 已完成, 直接 lll_unlock 
                else 
                    调 oneTimeInitCode (不带参数)
                    modify 控制变量( record 初始化 已完成)
            
    => 只有 第1个 获得 once_lock 的 线程 真正执行 oneTimeInitCode
        其他 线程 要么直接 return, 要么 wait 直到 第1个线程 unlock 后 再 unlock -> 完成 
    
    // gcc 内嵌宏指令 
    #define lll_lock(futex, private) \
    __asm __volatile (__lll_lock_asm_start \
                   ...  // 省略了其他指令
                   : "memory");
    
    => 本质是: compiler memory barrier
    
    而 CPU memory barrier 已由 x86/x64 硬件体系 保证 
        
    => 不会存在 
        
        1] 编译器 重排问题: var 放 register / 内嵌汇编 与 之前指令 重排 
        2] CPU 重排问题: new 3 条语句 重排 (等)
        
    int
    pthread_once (pthread_once_t *once_control;
                    void (*init_routine) (void) )
    {
        if (*once_control == PTHREAD_ONCE_INIT)     // 
        {   
            lll_lock (once_lock, LLL_PRIVATE);  

            if (*once_control == PTHREAD_ONCE_INIT) // double check
            {   
                init_routine (); 

                *once_control = !PTHREAD_ONCE_INIT;
            }   

            lll_unlock (once_lock, LLL_PRIVATE); 
        }   

        return 0;
    }

(4) 例 
    /*  ====== 1. pthread_once.c 
        只能执行1次的 func/once_init_routine: 如 mutex init 
            pthread_mutex_init(&mutex, NULL)
        
        calling thread 与 newly created thread 均 
            pthread_once(&once_block, once_init_routine)
    */

    #include <pthread.h>
    #include "errors.h"

    // (0) control var: must statically init
    pthread_once_t once_block = PTHREAD_ONCE_INIT;

    // global mutex
    pthread_mutex_t mutex;

    void once_init_routine(void) // no arg 
    {
        printf("\n === mutex init: only once \n");
        pthread_mutex_init(&mutex, NULL);
    }

    void *thread_routine(void *arg)
    {
        // (2) subThread pthread_once
        pthread_once(&once_block, once_init_routine);

        pthread_mutex_lock(&mutex); // === lock
        pthread_mutex_unlock(&mutex);   // === unlock

        return NULL;
    }

    int main()
    {
        pthread_t threadId;
        
        pthread_create(&threadId, NULL, 
                                thread_routine, NULL);
        
        // (1) callingThread pthread_once
        pthread_once(&once_block, once_init_routine);
        
        pthread_mutex_lock(&mutex);     // === lock 
        pthread_mutex_unlock(&mutex);   // === unlock
        pthread_join(threadId, NULL);   // join 
    }

    // print 
     === mutex init: only once 
    Main ThreadBlock has locked the mutex.
    thread_toutine has locked the mutex.

    // ====== 2. myCallOnce.c
    // 自定义 一次性初始化 code
    #include <pthread.h>
    #include <stdio.h>

    int CallOnceFinished = 0;
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

    void once_init_routine(void) // no arg 
    {
        printf("\n === mutex init: only once \n");
        // ...
    }

    // Note: 适用于 编译器 重排 和 CPU 重排 问题 不存在 或 已被 pthread_mutex_lock 解决
    int myCallOnce( void (*init_routine) (void) )
    {
        if (CallOnceFinished == 0)
        {
            pthread_mutex_lock(&mutex);     
            if (CallOnceFinished == 0)  // double check
            {
                init_routine();
                CallOnceFinished = 1;
            }
            pthread_mutex_unlock(&mutex);   
        }
        return 0;
    }
    void *thread_routine(void *arg)
    {
        // (2) subThread myCallOnce
        myCallOnce(once_init_routine);

        pthread_mutex_lock(&mutex); // === lock
        pthread_mutex_unlock(&mutex);   // === unlock

        return NULL;
    }

    int main()
    {
        pthread_t threadId;
        
        pthread_create(&threadId, NULL, 
                        thread_routine, NULL);
        
        // (1) callingThread myCallOnce
        myCallOnce(once_init_routine);
        
        pthread_mutex_lock(&mutex);     // === lock 
        pthread_mutex_unlock(&mutex);   // === unlock
        pthread_join(threadId, NULL);   // join 
    }

    // print
     === mutex init: only once

5.2 属性 对象

目标 
    [1] 保持 interface 简单
    
    [2] 同时为 "expert" 功能 提供 `复用性`
    
        无需 更改现有代码, 即可 添加选项      
    
        无需创建 非标准参数, 即可提供 专门选项(Pthreads 标准属性之外)

可视为私有结构
    成员读/写 
        调 特殊函数
        
        // 例 
        读/写 线程属性对象 中 stacksize 属性
            pthread_attr_getstacksize 
            pthread_attr_setstacksize
                
(1) Mutex 属性    

    pthread_mutexattr_t attr;
    int pthread_mutexattr_init (pthread_mutexattr_t *attr);
    int pthread_mutexattr_destroy ( pthread_mutexattr_t *attr);
    
    #ifdef _POSIX_THREAD_PROCESS_SHARED // 进程共享 => 扩展到适用于 多进程中 各 线程
    
    int pthread_mutexattr_getpshared (pthread_mutexattr_t *attr, int *pshared); 
    int pthread_mutexattr_setpshared (pthread_mutexattr_t *attr, int pshared);

    #endif
    
(2) Cv 属性 
        
    pthread_condattr_t attr;
    int pthread_condattr_init     (pthread_condattr_t *attr) ;
    in t pthread_condattr_destroy (pthread_condattr_t *attr);
    
    #ifdef _POSIX_THREAD_PROCESS_SHARED
    int pthread_condattr_getpshared (pthread_condattr_t *attr, int *pshared);
    int pthread_condattr_setpshared (pthread_condattr_t *attr, int pshared);
    #endif

(3) ThreadBlock 属性 
    
    pthread_attr_t attr;
    int pthread_attr_init           (pthread_attr_t *attr);
    int pthread_attr_destroy        (pthread_attr_t *attr);
    int pthread_attr_getdetachstate (pthread_attr_t *attr, int *detachState);
    int pthread_attr_setdetachstate (pthread_attr_t *attr, int detachState)
    
    # ifdef _POSIX_THREAD_ATTR_STACKSI2E
    int pthread_attr_getstacksize (pthread_attr_t *attr, size_t *stackSize );   
    int pthread_attr_setstacksize (pthread_attr_t *attr, size_t stackSize);
    #endif
    
    # ifdef _posix_thread_atpr_stackaddr
    int pthreadLattr_getstackaddr (pthread_attr_t *attr, void *stackAddr);
    int pthread_attr_setstackaddr (pthread_attr_t *attr, void **stackAddr);

    Set stack size is not very portable.
    Set stack address is less portable

(4) 例 
    // ====== 1. threadAttr.c
    #include <limits.h>
    #include <pthread.h>
    #include "errors.h"

    void *thread_routine(void *arg)
    {
        printf("The thread is here\n");
        return NULL;
    }

    int main()
    {
        pthread_attr_t threadAttr;
        size_t stack_size;
        pthread_t threadId;
        
        pthread_attr_init(&threadAttr);
        pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_DETACHED);

        pthread_attr_getstacksize(&threadAttr, &stack_size);
        printf("Default stack size is %u; minimum is %u\n", 
            stack_size, PTHREAD_STACK_MIN);
        pthread_attr_setstacksize(&threadAttr, PTHREAD_STACK_MIN*2);

        pthread_create(&threadId, &threadAttr, thread_routine, NULL);

        pthread_exit(NULL);
        
        return 0;
    }

5.3 线程特定数据

(0) summary 
        
    1) 跨 函数调用 之 private data & shared data 
        
        ——————————————————————————————————————
            单线程 下 
        ——————————————————————————————————————      
        目标  |   private data 跨 函数调用 
        ——————————————————————————————————————      
        方法  |   静态(statically) 内存分配
        ——————————————————————————————————————
        name’s  |   function/file: static 
        scope   |   global       : extern 
        ——————————————————————————————————————          
        
        ——————————————————————————————————————————————————
            多线程 
        ——————————————————————————————————————————————————
        进程内 所有线程 共享 进程地址空间 =>
        
            1] static
            2] extern
            3] 进程 heap          
                
            vaule 是 被任意线程 最后写入的值
                the pTSD last written by any thread
        ——————————————————————————————————————————————————
        
        [1] 线程 唯一的真正 "private" storage(存储) 是 处理器 registers

            stack addresses 可 共享
                (thread) "owner" 将 stack address 暴露给 another thread 时

        [2] register 和 "private" stack 不能替代 非线程代码中 持久静态存储

        [3] 当需要 private variable 时, 必须先决定 
            
            1] 是 多线程 share 
            
                1> static
                
                2> extern 
                
                与 单线程 一样, 但必须 synchronize 对 shared data 的 访问
                
            2] 还是 各线程 private: have its own pTSD
                
                store 到 somewhere -> 能被 each thread 找到(locate)

                1>  some cases 可用
                
                    static data
                        table 
                            itemValue(如 thread’s pthread_t) 是 
                                unique(唯一) to each thread
                        
                2> many 有趣的 cases 
                    
                    无法预测 有多少个线程 调用该 function
                        |
                        |  解决
    2) 通用方案 - - - - 

        [1] 在 每个线程 中 分配 some heap & 存 pTSD 
        
            难点: 如何 find 任一 thread 的 private data
            
            解决: 
                struct PrivateData 
                {   
                    private pTSD
                    thread’s identifier (pthread_t)
                };  
                
                list<PrivateData>
            
            问题
                1] search list -> 慢

                2] 很难恢复 由 terminated threadBlocks 分配的 storage 
                        
                    threadFunc 无法知道 thread 何时终止
                    
        [2] 新接口不应依赖 implicit 持久存储
            
            应该要求 caller 分配必要的 persistent state, 告诉你 state location 
            
            优点

            1] 许多情况下
                可 避免 内部同步
                
                极少数情况下  
                    caller 希望在 线程间 share 持久状态 时, caller 可提供 同步

            2] caller 可选择 在 单个线程中 分配 多个 state buf 
            
                => 多个 独立 函数调用 序列 -> 无冲突

        [3] 问题是我们 经常需要支持 implicit persistent states
            
            making 现有 interface thread-safe, 但
                不能
                1] 增加 arg 
                2] 要求 caller 为了我们方便而 维持 a new data structure
                    
            => 引入 thread-specific data(TSD)

    3)  TSD 机制 

                    shared key
                   /           \ 关联
                  /             \
        线程 1 private pTSD ... 线程 n private pTSD 
        
(1) 创建 ...
    
    pthread_key_t key;
    int pthread_key_create(pthread_key_t *key, void (*engineeThreadDtor)(void * ) );
    int pthread_key_delete(pthread_key_t key );

    1) pthread_key_t 变量
        
        不透明
        
        create 
            只创建1次: 调 pthread_key_create
            最简方法 pthread_once

        delete 
            当 no thread has a pTSD for that key
            
            引用计数 
        
    2) dtor
        = NULL 的 含义: key has no pTSD
        
        调用 时机
            线程终止时   

    3) key 关联的 TSD's pTSD
    
        [1] 初始: 在所有线程中均为 null   
        
        [2] 线程终止 & 调 dtor 前: 被置 null 
            那 dtor 中 如何获取 key 关联的 previous TSD's pTSD ?             
                通过 dtor 的 参数 pass

(2) 使用 ...

    int pthread_setspecific   (pthread_key_t key, const void *pTSD);
    void *pthread_getspecific (pthread_key_t key);

(3) 用 dtor

(4) 例 

    // ====== 1. thraedSpecificData_Once.c
    #include <pthread.h>
    #include <stdio.h>
    #include <stdlib.h> // malloc 
    #include <unistd.h> // sleep

    // (1) TSD struct 
    typedef struct TSD 
    {
        pthread_t   threadId;
        char        *str;
    } TSD;

    // (2) key 
    pthread_key_t key;
    pthread_once_t onceFlag = PTHREAD_ONCE_INIT;

    void onceRoutine(void)
    {
        printf("init key\n");
        
        // (5)
        pthread_key_create(&key, NULL);
    }

    void *thread_routine(void *arg)
    {
        TSD *pTSD;

        // (4)
        pthread_once(&onceFlag, onceRoutine);

        pTSD = (TSD*)malloc(sizeof(TSD));
        
        // (6) setspecific
        pthread_setspecific(key, pTSD);
        pTSD->threadId = pthread_self();
        pTSD->str = (char*)arg;

        // (7) getspecific
        pTSD = (TSD*)pthread_getspecific(key);
        printf("%s starting...\n", pTSD->str);

        sleep(2);

        pTSD = (TSD*)pthread_getspecific(key);
        printf("%s done...\n", pTSD->str);

        return NULL;
    }

    int main()
    {
        pthread_t thread1, thread2;
        
        // (3) Note: 字符串 字面值: 作 arg 传递, 传的是 其 ptr 
        pthread_create(&thread1, NULL, thread_routine, "thread 1");
        pthread_create(&thread2, NULL, thread_routine, "thread 2");

        pthread_exit(NULL);
    }

    // print 
    init key
    thread 1 starting...
    thread 2 starting...
    thread 1 done...
    thread 2 done...

    // ====== 2. thraedSpecificData_Destory.c
    #include <pthread.h>
    #include <stdio.h>
    #include <stdlib.h> // malloc 
    #include <unistd.h> // sleep

    typedef struct PrivateTag 
    {
        pthread_t   threadId;
        char        *str;
    } ThreadPrivateData;

    pthread_key_t key;
    pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
    long keyRefCount = 0;

    void keyDtor(void *pArg)
    {
        // (1) Note: use previous TSD value: from keyDtor's para
        ThreadPrivateData *pTPD = (ThreadPrivateData*)pArg;
        printf("thread \"%s\" exiting...\n", pTPD->str);
        
        // (2) free TSD memory 
        free(pArg);

        pthread_mutex_lock(&mut);   // === lock 
        
        // (3) Note: delete key under refCount & mutex 
        keyRefCount--;
        if (keyRefCount <= 0)
        {
            pthread_key_delete(key);
            printf("key delete...\n");
        }
        
        pthread_mutex_unlock(&mut); // === unlock 
    }

    void *getPrivateDataFromKey(void)
    {
        void *pTPD;

        pTPD = pthread_getspecific(key);
        if (pTPD == NULL) 
        {
            pTPD = malloc(sizeof(ThreadPrivateData) );
            pthread_setspecific(key, pTPD);
        }
        return pTPD;
    }

    void *threadRoutine(void *arg)
    {
        ThreadPrivateData *pTPD;

        pTPD = (ThreadPrivateData*) getPrivateDataFromKey();
        pTPD->threadId = pthread_self();
        pTPD->str = (char*)arg;
        
        printf("thread \"%s\" starting...\n", pTPD->str);
        
        sleep(2);
        
        return NULL;
    }

    int main()
    {
        pthread_t thread_1, thread_2;
        ThreadPrivateData *pTPD;

        // (4)
        pthread_key_create(&key, keyDtor);

        keyRefCount = 3; // 3 个线程 用 key
        
        // main thread 
        pTPD = (ThreadPrivateData*)getPrivateDataFromKey();
        pTPD->threadId = pthread_self();
        pTPD->str = "Main thread";

        pthread_create(&thread_1, NULL, threadRoutine, "thread 1");
        pthread_create(&thread_2, NULL, threadRoutine, "thread 2");
        
        pthread_exit(NULL);
    }
    
    // print 
    thread "thread 1" starting...
    thread "thread 2" starting...
    thread "Main thread" exiting...
    thread "thread 1" exiting...
    thread "thread 2" exiting...
    key delete...
5.4 取消 
    1   延迟 取消
    2   异步 取消 
    3   Cleaning up 
    
5.5 实时调度 
    (1) posix 实时选择
    (2) 调度策略 和 优先性
    (3) 竞争范围 和 分配域
    (4) 实时调度 的 问题
    (5) 有优先意识 的 mutexes 
        1) 优先 ceiling mutexs
        2) 优先 继承 mutexes

5.6 线程 和 内核 实体 
    (1) 多 对 1 (用户级)
    (2) 1 对 1  (内核级)
    (3) 多 对 少 (2 个级别)
上一篇 下一篇

猜你喜欢

热点阅读