Chapter1: alarm 程序 3 种 版本 - Prog
2022-05-17 本文已影响0人
my_passion
<< Programming with POSIX threads >>
作者 R. Butenhof
https://gitee.com/msntec/posix-threadBlocks-programming
编译
安装 CMake
git clone https://github.com/Veinin/programming-with-POSIX-threadBlocks-tutorials.git
cd programming-with-POSIX-threadBlocks-tutorials
./build.sh
运行
第1个程序
$ ./bin/barrier_main
00: (10)0000045001 0000045002 0000045003 0000045004 0000045005 0000045006
...
chapter1 概述
1.1 定义 + 术语
(1) 异步
操作(event) 间 关系: 独立
操作 独立 发生/前进: 不必 一个等另一个完成后才能开始
除非 被 强制依赖性
(2) 并发
多个 序列/操作 前进(执行) 的方式: 分时 交错
表面上 同时执行
实际上 `同时只有 1 个执行`
(3) 并行
并发 序列 同时前进
并发/性 序列 可/只能 在 单/多 处理器 系统上出现
(4) 单处理器 与 多处理器
多处理器
共享 指令集 和 物理内存
(5) 线程安全
不要求 高效性
大多 现有函数 -> 改为 线程安全的版本 的方法
用 Pthreads 的 mutexes / cv / thread private data
[1] 函数 串行化: 函数 进入时 lock, 退出时 unlock
用于: 不要求 持续的 context 的 func
[2] 临界区 串行化 + 非临界区 可并行
函数 -> 分解为 -> 各 临界区
[3] protect `临界 data` 而不是 `临界 code`: 重新设计
1] 不同时使用 临界 data 时
可完全 `并行 执行 code`
2] 同时使用 临界 data 时
仍可 `并发 访问 临界 data`
// 例
putchar 函数: 写 字符 到 I/O buffer -> 线程安全的 版本
1] protect `临界 code`: 关联 mutex 与 func
lock "putchar mutex"
write 字符
unlock "putchar mutex"
2] protect `临界 data`: 关联 mutex 与 stream
比较
2 个线程 putchar 到 不同 streams
1]只能有1个线程 / 2] 2个线程
同时执行 putchar
(6) 可重入
"高效的 线程安全"
可重入 code 应 避免 依赖于
1] static data
2] 线程间 同步
vs. mutex 和 线程特定数据
通常需要 改变 函数 interface
避免 函数内部同步 的 方法
[1] 函数 保存 状态到 "context 结构" -> 让 caller 控制
caller 负责 data 的 同步
UNIX readdir()
顺序地 返回每个 目录入口
进入 时 lock mutex
返回 前 unlock
[2] caller 分配 维持函数 的 `context 结构`
Pthreads readdir_r()
1] 表面上, 只是将 func 的 责任移交给 caller
2] 实际上, 只有 caller 知道 如何使用 the data (context)
1> 若 只有1个线程 用该 contexct,
则 不必同步
2> 有 多个线程 共享数据,
用于该 `context` 的 mutex 还可以用于 `其他 data`
(7) 并发 控制函数
并发系统
[1] 要提供的 必要函数, 创建 并发执行 contexts
[2] 控制 这些函数 如何 运行
3 种 便捷方法(facilities)
[1] `执行 context` 是 并发实体 的 state
并发系统 要能对 多个 contexts
创建 / 删除 / 独立 `维持` 其 state
save context 的 state
dispatch(分发) 到 另一 context
外部事件 -> 中断 -> 回到最后的执行处: 相同的 寄存器 内容
[2] schedule
决定 任意给定时刻, 哪个 context 被 执行
[3] synchronize
`并发 执行 contexts` 时, 协调 `shared resources`
TABLE 1.1 Execution contexts, schedulers, and synchronization
——————————————————————————————————————————————————————————————————————————————————————————————————————————
Execution context Scheduling Synchronization
——————————————————————————————————————————————————————————————————————————————————————————————————————————
Real traffic automobile traffic lights and signs turn signals and brake lights
——————————————————————————————————————————————————————————————————————————————————————————————————————————
UNIX(before threadBlocks) process priority (nice) wait and pipes
——————————————————————————————————————————————————————————————————————————————————————————————————————————
Pthreads thread policy, priority condition variables and mutexes
——————————————————————————————————————————————————————————————————————————————————————————————————————————
调度
run until block
自动让出 cpu
round-robin
时间片 => 周期性 让出
同步
4种机制
mutex
cv
信号量
事件
消息传递机制
UNIX pipes
sockets
POSIX 消息队列
1.2 异步编程 是 直观的
UNIX shells
shell 是 异步编程
(1) UNIX 是 异步的
UNIX 系统中,`进程 间 异步执行`
向 shell 键入命令时,
实际上启动了 `1个 独立的程序` —— 若您在 `后台运行` 该程序, 它会
与 shell 异步运行
1.4 异步编程 例子
程序在循环中 提示 `输入行`, 直到 在 stdin 上 收到 错误或结束
每一行中,
第1个 非空白 被解释为 等待的秒数
其余部分(最多 64 个字符)是一条消息, 等待完成时将打印
(1) 基线: 同步 (sleep) 版本
循环
从 stdin 读 1行
解析: 为 要等待的秒数 + 要打印的 msg
等待 + 打印
问题
1次 只能有1个 alarm 请求 被激活
(2) 多进程 版本: 异步
1个请求 1个子进程 去 处理
父进程 waitpid 回收 terminated 子进程
1) 思路
[1] `为 每个 command, fork 1个 子进程`: copy 主进程 地址空间 -> execute
[2] 可 随时输入 command, 各 commands 独立进行
[3] 与 同步版本 区别
不直接调 sleep
1] 用 fork 创建 子进程
2] 子进程 异步调 sleep
3] 父进程 继续
2) `回收` 任何 `已终止的子进程`
必要
否则, 系统将保存 这些子进程, 直到程序终止
方法
waitpid()
与 wait() 区别
可指定 要清理的 pid 进程
`可 不阻塞`
允许 caller 指定 WNOHANG
return
1] 0 (进程 ID 0)
2] -1 : 出错
3] 非 0 && 非 -1
`还有 需要回收的 已终止子进程`,
立即回收, 返回 `非 0`
(3) 多线程 版本: 异步
1个 请求, 1 个线程
1) 与 多进程 区别
1] `线程` 而非进程
2] `堆内存` 而非 栈内存
2) 必要时, Pthreads 会 持有 线程资源, 以便
另一线程 可
1] 确定 当前线程 已退出
2] 获取最终结果
(4) 总结
1) 地址空间
进程版本
各 进程 有 `独立的地址空间`, 从 主程序 copy 而来
=> 进程要处理的 data 可放 stack/局部变量
父子进程 用 2套独立数据 => 相互不影响
线程版本
各 线程 share 所属进程的 地址空间
=> 每个 新线程 要处理的 data 可用 malloc + 传 pData
2)
进程版本
主程序要通过调 waitpid 或 wait 等,
来告诉 内核 释放 子进程 资源
recycle 所有已完成的子进程
线程版本
除非 需要线程的 返回值, 否则不需要等待线程
每个线程会 自行 detach, 以便 线程 terminate 时, 线程持有的 resource 立即返回
更复杂的 线程版本
两个线程
线程1: 读 用户输入
线程2: 等待下一个到期 警报
// ====== 1. alarm.c
#include "errors.h" // 包含 <unistd.h> and <stdio.h> 错误报告宏
int main(void)
{
int seconds;
char line[128]; // 存 从 stdin 中 读取的 行
char message[64]; // 存 解析出的 msg
while(1)
{
// (1) fgets 从 stdin 读取1行 放到 char 数组 line, error 或 eof 时 返回 NULL
fgets(line, sizeof(line), stdin);
if (strlen(line) == 0)
continue;
// (2) sscanf 解析 fgets 读取的 行: 分离 由 空格(blank) 分隔的
// 1] 要 wait 的 秒数 2] 要 print 的 msg (最多 64 个字符, 不含 '\n' )
sscanf(line, "%d %64[^\n]", &seconds, message);
// (3)
sleep(seconds);
printf("(%d) %s\n", seconds, message);
}
}
// ====== 2. alarm_fork
#include <sys/types.h>
#include <wait.h>
#include "errors.h"
int main(void)
{
pid_t pid;
int seconds;
char line[128];
char message[64];
while(1)
{
fgets(line, sizeof(line), stdin);
if (strlen(line) <= 1)
continue;
if (sscanf(line, "%d %64[^\n]", &seconds, message) < 2)
{
fprintf(stderr, "Bad command\n");
}
else
{
// (1) fork
pid = fork();
// (2) -1: 出错
if (pid == -1)
errno_abort("Fork");
// (3) 0: 子进程
if ( pid == (pid_t)0 )
{
sleep(seconds);
printf("\n");
// 取 本/子 进程 ID
pid = getpid();
printf("Child process pid is (%d)\n", pid);
printf("(%d) %s\n", seconds, message);
// Note
exit(0);
}
else // (4) >0 (子进程 ID): 父进程
{
// 1) 取 本/父 进程 pid
pid = getpid();
printf("Parent process pid is (%d)\n", pid);
// 2) 回收 `已终止的子进程`
do
{
pid = waitpid((pid_t)-1, NULL, WEXITED);
if (pid == (pid_t) -1)
errno_abort("Wait for child");
} while( pid != (pid_t)0 );
}
}
}
}
// ====== 3. alarm_thread.c
#include "errors.h"
#include <pthread.h>
// 控制包 / control packet
typedef struct AlarmTag
{
int seconds;
char message[64];
} Alarm;
void *alarm_thread(void *arg)
{
// 子线程 `自行 分离`
// (3) pthread_detach: 允许 Pthread 线程终止后 立即回收 线程的资源
// (4) pthread_self: 返回 调用线程的标识符
pthread_detach( pthread_self() );
Alarm *alarm = (Alarm*)arg;
// (5)
sleep(alarm->seconds);
printf("(%d) %s\n", alarm->seconds, alarm->message);
// (1-2) 子线程 内 free 主线程 malloc 的 内存
free(alarm);
return NULL;
}
int main(void)
{
int seconds;
char line[128];
Alarm *alarm;
// (0) 线程对象
pthread_t thread;
while(1)
{
fgets(line, sizeof(line), stdin);
if (strlen(line) <= 1)
continue;
// (1) malloc + free 在 异常 / 子线程 中都要有
alarm = (Alarm*)malloc( sizeof(Alarm) );
sscanf(line, "%d %64[^\n]", &alarm->seconds, alarm->message);
pthread_create(&thread, NULL, alarm_thread, alarm);
}
}
1.5 线程 收益
多线程编程模型 优点
(1) 在 CPU 上 并行性
(2) 更有效地利用程序的自然 并发性
程序可 `等待 慢速 I/O 操作完成 时, 执行 计算`
(3) 模块化编程模型
清晰表达 code 中 独立 "事件" 间 关系
1.6 线程 代价
(1) 计算开销
(2) 编程 更严格
(3) 难调试
1.7 用 线程 还是 不用 ?
(1) 不该用线程的 case
问题 "本质上 非并发"
线程版本
减慢程序 + 复杂化
(2) 该用线程 的 case
[1] 大量计算 可 并行/分解 为 多个线程, 且 想要运行到 多 CPU 上
[2] 大量 I/O
多线程 可同时等待 不同的 I/O 请求
分布式 server 适合用 多线程
1] 要 响应多个 client
2] 要为 较慢的网络连接 上 `不请自来的 I/O` 做好准备
1.8 POSIX 线程 概念
(1) 架构 概述
线程系统 3个基本方面
context
schedule
synchronize
1) create 执行上下文(线程)
pthread_create
2) Pthreads 指定 `调度 参数` 的时机
[1] 创建线程
[2] 线程运行 时
3) 线程 `终止` 时机
[1] 调 pthread_exit 时
[2] 从 线程启动函数 返回 时
4) Pthreads 同步模型
[1] 用 mutex 保护
mutex 允许线程
用 shared data 时, lock 它
以免其他线程 干扰
[2] 用 cv 通信
cv 允许线程
wait 共享数据 `到达 某种 期望的状态` ( "队列不为空" / "资源可用" )
[3] 信号量 / 管道 / 消息队列
(2) 类型 和 接口
TABLE 1.2 POSIX threadBlocks types
————————————————————————————————————————————————————————
Type Section Description
pthread_t 2 i thread identifier
pthread_mutex_t 3.2 mutex
pthread_cond_t 3.3 cv
pthread_key_t 5.4 线程特定数据 的 “access key”
pthread_attr_t 5.2.3 thread attributes object
pthread mutexattr_t 5.2.1 mutex attributes object
pthread_condattr_t 5.2.2 cv attributes object
pthread_once_t 5.1 "one time initialization" control context
————————————————————————————————————————————————————————
(3) 报告错误
Pthreads 函数 出错时 不置 errno -> 用 新方法 报告错误
// errors.h
#ifndef ERRORS_H
#define ERRORS_H
#include <unistd.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define err_abort(code, text) \
do {\
fprintf(stderr, "%s at \"%s\":%d: %s\n",\
text, __FILE__, __LINE__, strerror(code));\
abort();\
} while(0)
#define errno_abort(text) \
do {\
fprintf(stderr, "%s at \"%s\":%d: %s\n",\
text, __FILE__, __LINE__, strerror(errno));\
abort();\
} while(0)
#endif // ERRORS_H
// thread_error.c
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
int main()
{
pthread_t thread;
pthread_join(thread, NULL);
}