Chapter1: alarm 程序 3 种 版本 - Prog

2022-05-17  本文已影响0人  my_passion
<< Programming with POSIX threads >> 

作者 R. Butenhof

https://gitee.com/msntec/posix-threadBlocks-programming

编译
    安装 CMake
    git clone https://github.com/Veinin/programming-with-POSIX-threadBlocks-tutorials.git
    cd programming-with-POSIX-threadBlocks-tutorials
    ./build.sh

运行
    第1个程序

    $ ./bin/barrier_main
    00: (10)0000045001 0000045002 0000045003 0000045004 0000045005 0000045006
    ...

chapter1 概述

1.1 定义 + 术语

(1) 异步
    
    操作(event) 间 关系: 独立
    
        操作 独立 发生/前进: 不必 一个等另一个完成后才能开始
                
             除非 被 强制依赖性
    
(2) 并发 
    
    多个 序列/操作 前进(执行) 的方式: 分时 交错
        
        表面上 同时执行
        实际上 `同时只有 1 个执行` 
    
(3) 并行  
    
    并发 序列 同时前进 
    
    并发/性 序列 可/只能 在 单/多 处理器 系统上出现 

(4) 单处理器 与 多处理器

    多处理器    
        共享 指令集 和 物理内存

(5) 线程安全

        不要求 高效性
       
    大多 现有函数 -> 改为 线程安全的版本 的方法 
        
        用 Pthreads 的 mutexes / cv / thread private data 
        
        [1] 函数 串行化: 函数 进入时 lock, 退出时 unlock
            
            用于: 不要求 持续的 context 的 func
                
        [2] 临界区 串行化 + 非临界区 可并行
        
            函数 -> 分解为 -> 各 临界区
            
        [3] protect `临界 data` 而不是 `临界 code`: 重新设计

            1] 不同时使用 临界 data 时
                    
                可完全 `并行 执行 code`
                            
            2] 同时使用 临界 data 时
                            
                仍可   `并发 访问 临界 data` 

        // 例 
        putchar 函数: 写 字符 到 I/O buffer -> 线程安全的 版本
        
            1]  protect `临界 code`: 关联 mutex 与 func
            
                lock "putchar mutex" 
                write 字符
                unlock "putchar mutex"

            2]  protect `临界 data`: 关联 mutex 与 stream 
                
                                
            比较
                2 个线程 putchar 到 不同 streams
                    1]只能有1个线程 / 2] 2个线程 
                        同时执行 putchar 
                        
(6) 可重入

    "高效的 线程安全"
            
    可重入 code 应 避免 依赖于
        
        1] static data
        
        2] 线程间 同步 
        
    vs. mutex 和 线程特定数据
            
        通常需要 改变 函数 interface
                
    避免 函数内部同步 的 方法 
        
        [1] 函数 保存 状态到 "context 结构" -> 让 caller 控制 
            
            caller 负责 data 的 同步 
            
                UNIX readdir()
                    
                    顺序地 返回每个 目录入口 
                    进入 时 lock mutex 
                    返回 前 unlock 
        
        [2] caller 分配 维持函数 的 `context 结构` 
            
            Pthreads readdir_r() 
                
            1] 表面上, 只是将 func 的 责任移交给 caller 
                    
            2] 实际上, 只有 caller 知道 如何使用 the data (context)
                
                1> 若 只有1个线程 用该 contexct, 
                    
                    则 不必同步 
                
                2> 有 多个线程 共享数据, 
                
                    用于该 `context` 的 mutex 还可以用于 `其他 data`

(7) 并发 控制函数
    
    并发系统 

    [1] 要提供的 必要函数, 创建 并发执行 contexts 
    
    [2] 控制 这些函数 如何 运行
    
    3 种 便捷方法(facilities)
        
        [1] `执行 context` 是 并发实体 的 state
                
            并发系统 要能对 多个 contexts 
                
                创建 / 删除 / 独立 `维持` 其 state 
                
            save context 的 state 
            dispatch(分发) 到 另一 context 
                    
            外部事件 -> 中断 -> 回到最后的执行处: 相同的 寄存器 内容 
                
        [2] schedule 
            
            决定 任意给定时刻, 哪个 context 被 执行 
            
        [3] synchronize 
            
            `并发 执行 contexts` 时, 协调 `shared resources`

    TABLE 1.1 Execution contexts, schedulers, and synchronization
    ——————————————————————————————————————————————————————————————————————————————————————————————————————————      
                            Execution context   Scheduling                      Synchronization
    ——————————————————————————————————————————————————————————————————————————————————————————————————————————
    Real traffic            automobile          traffic lights and signs    turn signals and brake lights
    ——————————————————————————————————————————————————————————————————————————————————————————————————————————
    UNIX(before threadBlocks)   process             priority (nice)             wait and pipes
    ——————————————————————————————————————————————————————————————————————————————————————————————————————————
    Pthreads                thread              policy, priority            condition variables and mutexes
    ——————————————————————————————————————————————————————————————————————————————————————————————————————————

    调度 
        run until block
            自动让出 cpu
        
        round-robin
            时间片 => 周期性 让出 
            
    同步 
        
        4种机制 
            
            mutex
            cv 
            信号量 
            事件
             
            消息传递机制 
                
                UNIX pipes 
                sockets 
                POSIX 消息队列 

1.2 异步编程 是 直观的
    
    UNIX shells
        shell 是 异步编程
    
(1) UNIX 是 异步的

    UNIX 系统中,`进程 间 异步执行`
    
    向 shell 键入命令时, 
        实际上启动了 `1个 独立的程序` —— 若您在 `后台运行` 该程序, 它会
            与 shell 异步运行
            
1.4 异步编程 例子 

    程序在循环中 提示 `输入行`, 直到 在 stdin 上 收到 错误或结束
        每一行中, 
            第1个 非空白 被解释为 等待的秒数
            其余部分(最多 64 个字符)是一条消息, 等待完成时将打印
        
(1) 基线: 同步 (sleep) 版本 

    循环 
        从 stdin 读 1行
        解析: 为 要等待的秒数 + 要打印的 msg
        等待 + 打印
        
    问题
        1次 只能有1个 alarm 请求 被激活

(2) 多进程 版本: 异步
    
    1个请求 1个子进程 去 处理
    
    父进程 waitpid 回收 terminated 子进程

    1) 思路

        [1] `为 每个 command, fork 1个 子进程`: copy 主进程 地址空间 -> execute 

        [2] 可 随时输入 command, 各 commands 独立进行

        [3] 与 同步版本 区别
            
                不直接调 sleep
            
            1] 用 fork 创建 子进程
            
            2] 子进程 异步调 sleep
            
            3] 父进程 继续

    2) `回收` 任何 `已终止的子进程`
            
        必要 
            否则, 系统将保存 这些子进程, 直到程序终止
        
        方法
        
            waitpid()
                
            与 wait() 区别  
                可指定 要清理的 pid 进程
                `可 不阻塞`
                    允许 caller 指定 WNOHANG

            return 

                1]  0 (进程 ID 0)
                
                2] -1 : 出错
                
                3] 非 0 && 非 -1 
                    `还有 需要回收的 已终止子进程`, 
                        立即回收, 返回 `非 0` 
                
(3) 多线程 版本: 异步

    1个 请求, 1 个线程

    1) 与 多进程 区别
    
        1]  `线程` 而非进程 
        2]  `堆内存` 而非 栈内存 
                
    2) 必要时, Pthreads 会 持有 线程资源, 以便 
        
        另一线程 可 
            1] 确定 当前线程 已退出 
            2] 获取最终结果

(4) 总结 

    1) 地址空间

        进程版本 
            
            各 进程 有 `独立的地址空间`, 从 主程序 copy 而来

                => 进程要处理的 data 可放 stack/局部变量
            
                    父子进程 用 2套独立数据 => 相互不影响
                
        线程版本
            各 线程 share 所属进程的 地址空间
                
                => 每个 新线程 要处理的 data 可用 malloc + 传 pData

    2)
        进程版本
            
            主程序要通过调 waitpid 或 wait 等,
            
                来告诉 内核 释放 子进程 资源
                
                recycle 所有已完成的子进程

        线程版本
        
            除非 需要线程的 返回值, 否则不需要等待线程
            
                每个线程会 自行 detach, 以便 线程 terminate 时, 线程持有的 resource 立即返回

    更复杂的 线程版本 
        
        两个线程
        
            线程1: 读 用户输入
            线程2: 等待下一个到期 警报
            
    // ====== 1. alarm.c
    #include "errors.h" // 包含 <unistd.h> and <stdio.h> 错误报告宏

    int main(void) 
    {
        int seconds;
        char line[128];   // 存 从 stdin 中 读取的 行
        char message[64]; // 存 解析出的 msg

        while(1) 
        {
            // (1) fgets 从 stdin 读取1行 放到 char 数组 line, error 或 eof 时 返回 NULL
            fgets(line, sizeof(line), stdin);

            if (strlen(line) == 0)
                continue;

            // (2) sscanf 解析 fgets 读取的 行: 分离 由 空格(blank) 分隔的 
            //      1] 要 wait 的 秒数  2] 要 print 的 msg (最多 64 个字符, 不含 '\n' )
            sscanf(line, "%d %64[^\n]", &seconds, message);
            
            // (3)
            sleep(seconds); 
            printf("(%d) %s\n", seconds, message);
        }
    }
    
    // ====== 2. alarm_fork
    #include <sys/types.h>
    #include <wait.h>
    #include "errors.h"

    int main(void) 
    {
        pid_t pid;
        int seconds;
        char line[128];
        char message[64];

        while(1) 
        {
            fgets(line, sizeof(line), stdin);

            if (strlen(line) <= 1)
                continue;

            if (sscanf(line, "%d %64[^\n]", &seconds, message) < 2) 
            {
                fprintf(stderr, "Bad command\n");
            } 
            else 
            {
                // (1) fork
                pid = fork();
                
                // (2) -1: 出错
                if (pid == -1)
                    errno_abort("Fork");
                
                // (3) 0: 子进程
                if ( pid == (pid_t)0 ) 
                {
                    sleep(seconds);
                    printf("\n");
                    
                    // 取 本/子 进程 ID 
                    pid = getpid();
                    printf("Child process pid is (%d)\n", pid);
                    printf("(%d) %s\n", seconds, message);
                    
                    // Note
                    exit(0);
                } 
                else // (4) >0 (子进程 ID): 父进程
                {
                    // 1) 取 本/父 进程 pid
                    pid = getpid();
                    printf("Parent process pid is (%d)\n", pid);
                    
                    // 2) 回收 `已终止的子进程`
                    do 
                    {
                        pid = waitpid((pid_t)-1, NULL, WEXITED);
                        
                        if (pid == (pid_t) -1)
                            errno_abort("Wait for child");
                            
                    } while( pid != (pid_t)0 );
                }
            }
        }
    }
        
    // ====== 3. alarm_thread.c
    #include "errors.h"
    #include <pthread.h>

    // 控制包 / control packet 
    typedef struct AlarmTag 
    {
        int  seconds;
        char message[64];
    } Alarm;

    void *alarm_thread(void *arg) 
    {
        // 子线程 `自行 分离`
        // (3) pthread_detach: 允许 Pthread 线程终止后 立即回收 线程的资源
        // (4) pthread_self: 返回 调用线程的标识符
        pthread_detach( pthread_self() ); 
    
        Alarm *alarm = (Alarm*)arg;
        
        // (5)
        sleep(alarm->seconds);
        
        printf("(%d) %s\n", alarm->seconds, alarm->message);
        
        // (1-2) 子线程 内 free 主线程 malloc 的 内存
        free(alarm);
        
        return NULL;
    }

    int main(void) 
    {
        int seconds;
        char line[128];
        Alarm *alarm;
        
        // (0) 线程对象
        pthread_t thread;

        while(1) 
        {
            fgets(line, sizeof(line), stdin);

            if (strlen(line) <= 1)
                continue;

            // (1) malloc + free 在 异常 / 子线程 中都要有
            alarm = (Alarm*)malloc( sizeof(Alarm) );

            sscanf(line, "%d %64[^\n]", &alarm->seconds, alarm->message);

            pthread_create(&thread, NULL, alarm_thread, alarm);

        }
    }
                
1.5 线程 收益 
    
    多线程编程模型 优点

        (1) 在 CPU 上 并行性
        
        (2) 更有效地利用程序的自然 并发性
        
            程序可 `等待 慢速 I/O 操作完成 时, 执行 计算`
                
        (3) 模块化编程模型
                
                清晰表达 code 中 独立 "事件" 间 关系
    
1.6 线程 代价 

(1) 计算开销
(2) 编程 更严格
(3) 难调试 
    
1.7 用 线程 还是 不用 ?

(1) 不该用线程的 case 
    
    问题 "本质上 非并发"
    
        线程版本 
            减慢程序 + 复杂化
            
(2) 该用线程 的 case

    [1] 大量计算 可 并行/分解 为 多个线程, 且 想要运行到 多 CPU 上 

    [2] 大量 I/O 
        
        多线程 可同时等待 不同的 I/O 请求
    
        分布式 server 适合用 多线程 
            
            1] 要 响应多个 client 
            
            2] 要为 较慢的网络连接 上 `不请自来的 I/O` 做好准备
            
            
1.8 POSIX 线程 概念 

(1) 架构 概述

    线程系统 3个基本方面
        context 
        schedule 
        synchronize
    
    1) create 执行上下文(线程) 
            
        pthread_create
    
    2) Pthreads 指定 `调度 参数` 的时机 
    
        [1] 创建线程 
        
        [2] 线程运行 时

    3) 线程 `终止` 时机 
        
        [1] 调 pthread_exit 时
        
        [2] 从 线程启动函数 返回 时

    4) Pthreads 同步模型
    
        [1] 用 mutex 保护
            
            mutex 允许线程
                用 shared data 时, lock 它
                    以免其他线程 干扰
            
        [2] 用 cv 通信
        
            cv 允许线程
                wait 共享数据 `到达 某种 期望的状态` ( "队列不为空" / "资源可用" )
        
        [3] 信号量 / 管道 / 消息队列

(2) 类型 和 接口 
    
    TABLE 1.2 POSIX threadBlocks types
    ————————————————————————————————————————————————————————
    Type                Section Description
    
    pthread_t           2 i     thread identifier
    
    pthread_mutex_t     3.2     mutex
    pthread_cond_t      3.3     cv
    
    pthread_key_t       5.4     线程特定数据 的 “access key” 
    
    pthread_attr_t      5.2.3   thread attributes object
    pthread mutexattr_t 5.2.1   mutex attributes object
    pthread_condattr_t  5.2.2   cv attributes object
    
    pthread_once_t      5.1     "one time initialization" control context
    ————————————————————————————————————————————————————————
    
(3) 报告错误
    
    Pthreads 函数 出错时 不置 errno -> 用 新方法 报告错误
    
    // errors.h
    #ifndef ERRORS_H
    #define ERRORS_H

    #include <unistd.h>
    #include <errno.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>

    #define err_abort(code, text) \
        do {\
            fprintf(stderr, "%s at \"%s\":%d: %s\n",\
                text, __FILE__, __LINE__, strerror(code));\
            abort();\
        } while(0)

    #define errno_abort(text) \
        do {\
            fprintf(stderr, "%s at \"%s\":%d: %s\n",\
                text, __FILE__, __LINE__, strerror(errno));\
            abort();\
        } while(0)

    #endif // ERRORS_H
    
    // thread_error.c
    #include <pthread.h>
    #include <stdio.h>
    #include <string.h>
    #include <errno.h>

    int main()
    {
        pthread_t thread;
        pthread_join(thread, NULL);
    }
上一篇下一篇

猜你喜欢

热点阅读