chapter3: 同步 - Programming with
2022-05-17 本文已影响0人
my_passion
chapter3 同步
3.1 不变量 (Invariants), 临界区 (critical ..), 谓词 (predicates)
1 Invariants
(1) 不变量 是 program 中 必须始终为真 的 假设/陈述
如 !queue.empty()
(2) broken invariant
只要在 other thread 遇到 invariant 前,
broken invariant 恢复, 则 no error
|
|/
同步
2 临界区
`影响 shared status` 的 代码区
`临界区 (code 角度)` 与 `Invariants (data 角度)`
临界区 易识别
两者几乎总可以 相互转换
delete queue elem
delete code : 临界区
queue status: Invariants
3 谓词
(1) 是 逻辑表达式: bool 值
描述 invariants 状态
(2) 可表示 为 语句
"队列为空" "资源可用" 等
3.2 mutex
(0) 线程间 共享数据
1) 线程间 同步
通用方式
mutex
确保对 `相同(相关) data` 的 `所有内存访问` 都是 "互斥的 / mutually exclusive"
2) mutex
[1] mutex 结合 cv
可很容易 `构建 任何 同步模型`
[2] 易使用: 与 信号量(semaphore) 等 other 同步模型 相比
[3] 令牌 (tokens)
3) 何时需要 同步 ?
1] 修改数据 时
2] `线程 1` 读 `线程 2 写 的数据`, 而 该 data 以什么顺序写 很重要时
原因: 硬件系统 不能保证 2个处理器 以相同顺序 看到对共享内存访问
这看似不合理
但是 内存系统 以这种方式工作, 比 以 `保证可预测的内存访问顺序` 工作
快很多
通用解决方案
mutex
3个线程 共享1个 mutex 的 时序图
3个区域
没拥有 mutex
等待去拥有 mutex
拥有 mutex
最初, mutex unlocked
线程 1 locks mutex
因为没 contention (竞争), lock 立即成功 —— 线程 l 拥有 mutex: line 移到 time 线 下方
(1) mutex create(declare) / init / destory
pthread_mutex_t mutex = PTHREAD_MUTEX_IN1TIALIZER;
int pthread_mutex_init(pthread_mutex_t* mutex,
pthraed_mutexattr_t* attr);
int pthread_mutex_destroy(pthread_mutex_t* mutex);
最佳实践: 如果可能,
`将 mutex 与 其保护的 数据 放在一起/明确关联` 是个好主意
1) mutex 声明
大多情况
文件范围
[1] 外部(extern) 存储 其他文件 也想使用
[2] 静态(static) 存储 + `默认属性` 用 pthread_mutex_initializer 宏
仅 声明 mutex 的文件 使用
2) mutex 初始化
[1] 静态初始化 mutex
`默认属性` 静态初始化
pthread_mutex_initializer 宏
Note
`非默认属性` 初始化 mutex: 必须使用 动态初始化
[2] 动态初始化
1> 动态初始化 `动态 (heap) mutex`
malloc + pthread_mutex_init
2> 动态初始化 `静态声明 的 mutex`, 但必须确保
mutex 使用前初始化, 且 只初始化一次
2 种方法
1] 创建 任何线程前 初始化 mutex
2] 调 pthread_once
3) mutex destroy
[1] 没有线程 `阻塞在 mutex` 上, 且
[2] 没有线程 will try to lock mutex
时, 才可以 destroy mutex
如:
1] `刚 unlocked mutex` 的 线程中
2] `程序逻辑 保证` 随后 没有线程 try to lock mutex
(2) mutex 3 函数
int pthread_mutex_lock (pthread_mutex_t* mutex);
int pthread_mutex_trylock (pthread_mutex_t* mutex)
int pthread_mutex_unlock (pthread_mutex_t* mutex);
1) blocking mutex locks
calling 线程 已 locked mutex 时, callee 线程 就不能再 lock 该 mutex
否则, 可能
1] 错误返回, 或
2] 死锁
线程永远等待自己 unlock 该 mutex
Note
递归互斥锁,
允许 线程 relock 它已拥有的 mutex
2) Nonblocking (非阻塞) mutex locks
pthread_mutex_trylock()
3类返回值
0, success
EBUSY, mutex 已被 other thread locked, 本 thread non-block
else, other error code
Note: 仅当 pthread_mutex_trylock 返回 success 状态时, 才可 unlock mutex, 否则
调 pthread_mutex_unlock 可能会
1] 返回 error , 或
2] unlock 互斥锁, 而 other thread 依赖于 lock mutex
=> 难以调试的 程序中断
// ====== alarm_oneThreadMultiRequest.c
[1] wrokerThread 线程
1] 1个 wrokerThread 从 alarmList 中 依次处理 `多个 alarm request`
wrokerThread 线程 不会终止 —— 只是在 main 返回时 "蒸发"
2] main 线程 必须 以某种方式 通知 workerThread, 在 `发现 alarmList 为空` 时终止
如: 置 alarm_done = 1 + pthread_exit()
3] 若 alarmList 中没有 item, workerThread 要在 mutex unlock 时,
自行 block(sleep 1s) 短时间, 使 main 有机会 添加新警报
[2] 主线程
alarmList 中 按 alarm 到期时间 升序排序
搜索队列, 直到找到 `到期时间 大于或等于 新警报到期时间 的 第1个条目`
前插
若没找到
`后插` 到 队列 尾
Note: 比 指向 `list 相邻结点` 的 `双指针 移动` 更高效 的方法
1] 初始
1> cur 指针 的 2级指针 cur2LevPtr 被赋值为 head 的 地址
2> next 指针 指向 head 结点
cur2LevPtr = &alarmList;
next = *cur2LevPtr;
2] 每次移动
1> 先 取 next 指针 的 next 域 的地址: &next->link
赋给 cur 指针 的 指针 cur2LevPtr ( 比 指针后移高效 )
2> 再 后移 next 指针
之前 *cur2LevPtr <=> 取出 next 前驱的 next 域
*cur2LevPtr = alarm; <=> 该 next 域 链到 newItem
list empty
尾插: init insert / while 没执行
list not empty
not found (前插位置)
尾插: next moved insert / while 执行
found
前插: next not moved & moved insert / while 执行
前插
next not moved next moved
- - cur2LevPtr = &alarmList; / cur2LevPtr = &next->link; - -
| next = *cur2LevPtr; / next = next->link; |
| |
| newItem->link = next; |
- - -> *cur2LevPtr = newItem; <- - - - - - - - - - - - - -
*cur2LevPtr == next 前驱 的 next 域
*cur2LevPtr == headPtr
// (0) init
cur2LevPtr = &alarmList; // alarmList 即 headPtr
next = *cur2LevPtr;
// 该 next != NULL 2层含义:
// 初始 时, <=> head != NULL, 即 list not empty
// else, not reach list tail
while (next != NULL) // while: 跳出前的循环 与 while 外之后的 if 条件刚好相反
{
if (next->time >= newItem->time)
{
newItem->link = next;
*cur2LevPtr = newItem; // (2)
break; |\
} | 要么 前插 到 比 newItem's time 大 的 item 前
|
// (1) every move |
cur2LevPtr = &next->link; // 后续必然
next = next->link; // |
} |
|
// next == NULL 2层含义: |
// while 没执行: head == NULL, | 即 list empty
// else, reach list tail |
if (next == NULL) | 要么 后插
{ |/
*cur2LevPtr = newItem; // (3) 后插到 队尾
newItem->link = NULL;
}
[3] 该版本
优: 所占 `资源 较少`
缺点: `响应性较差`
无论 alarmList 是否 空, alarm_thread 都会 sleep sometime,
期间 不 care 是否已有 item 被 main thread add 到 alarmList, 直到 sleep 结束
1] alarmList 空, alarm_thread sleep 1 秒, 以允许 main 接受 another alarm cmd -> add a item (alarm request)
2] alarmList 不空, alarm_thread 从 alarmList 取出 head item, sleep 直到 alarm time reach
解决
最好方法
用 cv 发出 `shared data 状态变化` 的 信号 (3.3.4 节)
(3) mutexes 用于 原子性
1) 原子方式 进行 单个更改
需要 大量的 `硬件和架构 知识`, 及 对 `执行指令 的 控制`
2) "原子"
表面含义
不可分割
真正含义
`其他线程 不会意外发现 invariant broken` 状态, 即使 线程同时在 不同 CPU 上运行
当 硬件 不支持 使 操作 不可分割 和 不可中断时, 有2种基本方法 可以做到这一点
[1]
检测到 正在查看 broken invariant
然后 try again (重试), 或 reconstruct(重建) 原始状态
[2] 同步
原子 很好, 但大多情况下 同步 也可以
当你需要 原子地 更新 数组元素 和 索引变量 时,
只需在 mutex locked 时 执行操作
关键: cooperating 线程间 使用 the same mutex
(4) mutex 尺度化(sizing)
1) protect 2个 shared var共享变量 时, 2种基本策略
1个变量 1个 小 mutex
2个变量 1个 大 mutex
2) 折中
开始 大 mutex -> 经验和性能 => 激烈的争抢(contention) 发生在哪里 -> 小 mutex
(5) 多 mutex
应用
code 跨 软件架构 边界 时, 1个 mutex 可能不够
如, 多个线程 访问 队列, 可能要 队列头 & 队列中 data 各 1个 mutex 来保护
为 线程编程 构建 `树形结构` 时, 可能 `树中 每个节点 都需要1个 mutex`
问题
死锁
2个线程 都持有1个mutex, 且需要 另一线程来继续 时
2 个 mutex 用于 完全独立的数据: 可行
若 数据独立, 需 lock 2 个 mutex 的情况 极少
经典死锁
线程1 lock mutex_a, 然后 lock mutex_b
线程2 lock mutex_b, 然后 lock mutex_a
First thread Second thread
pthread_mutex_lock(&mutex_a ); pthread_mutex_lock(&mutex_b)
pthread_mutex_lock(&mutex_b); pthread_mutex_lock(&mutex_a);
TABLE 3.1 Mutex deadlock
多 CPU: 2 个线程 同时完成 第1步
单 CPU: 1个线程 完成第1步 -> 切换出去/时间片被抢占 -> 另一线程 完成第1步
第2步, 每个线程都需要 已被另一线程 lock 的 mutex
这种类型的死锁 解决: 层次锁
(6) 死锁 解决
1) 层次锁: 固定的 locking 层次
所有需要 mutex_a 和 mutex_b 的 code 必须 始终 先lock mutex_a, 再 lock mutex_b
[1] mutex 有明显的层次顺序
1个 mutex 控制一个 队列 head, 另1个 控制队列 elem
lock 队列元素时, 可能必须 lock 队列头
[2] 没有明显的逻辑层次结构 时
可 创建 任意层次结构
创建 通用的 "lock a set of mutexes" 函数
某种顺序
thread identifier / name / 整数
在 某种程度 上, 顺序不重要, 只要 顺序始终相同
2) Try and back off
lock 某 mutexes 集合 中 第1个 mutex 后
用 pthread_mutex_trylock 去 try lock 集合中的 其他 mutex
若 try 失败, 则 释放 集合中 所有 mutexed, 并 重新开始
2 种方法比较
back off 效率低: 大量时间 尝试和退出
back off 更灵活: 无需 严格的 locking 层次
2种方法可 结合使用
总结
[1] 定义/声明/使用 mutexes 的 地方 记录它
总结经验 和 深入理解
[2] 线程以 相反顺序 unlocks three mutexes
这有助于 `避免 其他线程中 不必要的退避`
case1
线程 1: lock mutex1 -> lock mutex2 -> lock mutex3
-> unlock mutex3 -> unlock mutex2 -> unlock mutex1
线程 2: lock mutex1 -> lock mutex2 -> lock mutex3
|\
|
wait 直到 线程1 unlock mutex1 => mutex2 和 mutex3 也被 线程1 unlock
case2
线程 1: lock mutex1 -> lock mutex2 -> lock mutex3
unlock mutex1 -> unlock mutex2 -> ... -> unlock mutex3
线程 2: lock mutex1 -> lock mutex2 -> lock mutex3
|\
|
backoff
Note:
可 按最合理的顺序 随意 unlock mutexes
unlock mutexes 不会导致死锁
3) 链式 锁(Lock chaining)
"重叠 层次结构"
[1] 2 个 locks 的 scope overlap
mutex1 locked 后, code 进入区域 需 mutex2
`成功 lock mutex2 后, mutex1 不再需要, 可 释放`
应用: `遍历 树或链表`
每个 节点 都有 唯一 mutex
遍历代码 先 lock 队头 -> 找到 所需节点 -> lock it -> 释放 队头 mutex
[2] 链式锁 是 `特殊形式` 的 `层次锁`
1] 可在 平衡或修剪树 (改动 数据结构) 时, 用 `hierarchical locking / 层次锁`
2] 搜索(不改动 数据结构) 特定节点时, 用 "链式 锁"
[3] 链式锁 `何时用`
多线程 在 `层次结构 的 不同部分 几乎总是处于 激活状态` 时
// ====== alarm_oneThreadMultiRequest.c
#include <pthread.h>
#include <time.h>
#include "errors.h"
// ===1 data struct & global var: shared data's head + mutex
typedef struct alarm_tag
{
struct alarm_tag *link; // next: link to form a list
int seconds; // waiting time from cmd accepted
time_t time; // 绝对 到期时间(expiration time): 用于 sort
char message[64];
} alarm_t; // (1) elem of alarmList
// (3)
pthread_mutex_t alarm_mutex = PTHREAD_MUTEX_INITIALIZER;
// (2) alarmListHead: alarm_thread 线程 和 main 线程 shared data
alarm_t *alarmList = NULL;
void *alarm_thread(void *arg)
{
alarm_t *alarm;
int sleep_time;
time_t now;
int status;
while(1)
{
pthread_mutex_lock(&alarm_mutex); // === lock mutex
// (1) record ararmList head
alarm = alarmList;
// (2) 若 alarmList 空, sleep, 使 main thread 有机会 add item to alarmList
if (alarm == NULL)
sleep_time = 1;
else
{
// (3) remove head: head 后移到 next elem
alarmList = alarmList->link;
// (4) calc curAlarm's sleep time
now = time(NULL); // 绝对时间
if (alarm->time <= now) // alarm expire
sleep_time = 0;
else
sleep_time = alarm->time - now;
#ifdef DEBUG
printf("[waiting: %d(%d)\"%s\"]\n", alarm->time, sleep_time, alarm->message);
#endif
}
pthread_mutex_unlock(&alarm_mutex); // === unlock mutex
/* (5) sleep(blocked) or yield()
让出 CPU => 本线程 进入 Ready 态, Note: sched_yield() => detach
sched_yield 会 让出CPU 给 1个 raedy 线程,
但如果没有 Ready(如 main) 线程, sched_yield 将立即返回
这意味着 main 线程 若 有输入等待, 将被允许处理用户命令
else, sched_yield 会 立即返回
*/
if (sleep_time > 0)
sleep(sleep_time);
else
sched_yield();
// (6) 若 head 旧值 不空, 即 从 alarmList 处理了 警报 => 打印1条消息, 指示警报已过期
if (alarm != NULL)
{
printf("(%d) %s\n", alarm->seconds, alarm->message);
free(alarm); // (7) 释放 alarm memory
}
}
}
// ===3 main 线程
int main()
{
int status;
char line[128];
alarm_t *alarm, **cur2LevPtr, *next;
pthread_t thread;
// (1) create worker thread
pthread_create(&thread, NULL, alarm_thread, NULL);
while (1)
{
// (2) fgets
fgets(line, sizeof(line), stdin);
if (strlen(line) <= 1)
continue;
// (3) malloc curRequest 的 data memory
alarm = (alarm_t*)malloc(sizeof(alarm_t) );
// (4) 解析 request 到 data 的 seconds + msg 字段
sscanf(line, "%d %64[^\n]", &alarm->seconds, alarm->message);
pthread_mutex_lock(&alarm_mutex); // === (5) lock
// 1) calc curAlarm 绝对到期时间
alarm->time = time(NULL) + alarm->seconds;
// 2) found 前插位置, 则 前插; else, 后插, 到 shared alarmList
cur2LevPtr = &alarmList;
next = *cur2LevPtr;
while (next != NULL)
{
if (next->time >= alarm->time)
{
alarm->link = next;
*cur2LevPtr = alarm;
break;
}
cur2LevPtr = &next->link;
next = next->link;
}
if (next == NULL)
{
*cur2LevPtr = alarm;
alarm->link = NULL;
}
#ifdef DEBUG // debug 宏
printf("[list: ")
for (next = alarmList; next != NULL; next = next->link)
printf("%d(%d)[\"%s\"]", next->time, next->time - time(NULL), next->message);
printf("]\n");
#endif
pthread_mutex_unlock(&alarm_mutex); // === (6) unlock
}
3.3 cv
(0) cv 作用
线程间 通信 机制
signal(通知/发出信号)
由 mutex 保护 的 invariants 的 shares status 变化
!que.empty()
多线程 对某 `shared status` mutexly access, 则
other thread 更改 status 前, 某 thread 可能只能干等
work thread 发现 que.empty(), 则 必须 wait, 直到 队列 非空
Note1: unlock 和 wait 原子
等待线程 变成 blocked 态 前, other 线程 无法 lock mutex
否则
等待线程 已 check 队列为空, 已 `unlock mutex`
-> 通知线程 加 item, 但 `等待线程 不知道 队列不空`
-> 等待线程 `block 自己`
更糟糕的是, 等待线程 可能 `还没 record 它打算等待`
-> ` other thread 找不到 等待线程的 identifer`
-> 等待线程 可能会 `永远等待`
`等待线程` 先 `block 自身` (通过某种方式)
通知线程 才能 `find 等待线程 的 identifer`, 并 `唤醒 等待线程`
Note3: cv wait 返回时 mutex locked
这是 cv 存在的原因
cv ---关联--- mutex --- 关联 --- shared data
Note4: cv 用于 signal, 而不是 mutex
需要 mutex 来 同步
1] 对 share status 的 access
2] waiting 的 `谓词`
unlock 与 wait 原子
为什么不将 mutex 创建为 cv 的一部分?
1] mutex 与 cv 分开使用, mutex 与 cv 一起使用
的 频率相同
2] 1个mutex 可关联 多个 cv
waiting 的 谓词不同
必须只用1个 mutex 来 `同步 对 队列头 的 所有访问`
cv 与 谓词: 1 对 1 最安全, 1 对 多 / 多 对 1 要小心
规则:
1] 在 多个谓词 间 共享 1个 cv
必须 始终 broadcast, never signal
2] signal 比 broadcast 更有效
cv 和 谓词 都是 程序中的 shared data: 被 多个线程 使用, 可能 同时使用
因为您将 cv 和 谓词 视为 1起 locked, 所以很容易记住, 它们总是 用 the same mutex 控制
不 lock mutex 下, signal 或 broadcast 合法, 通常也 合理
但 lock mutex 更安全
(1) 创建 + 销毁 cv
pthread_cond_t cond = PTHREAD_COND_INITlALIZER;
int pthread_cond_±nit (pthread_cond_t* cond, pthread_condattr_t* condattr);
int pthread_cond_destroy (pthread_cond_t* cond) ;
cv
类型 pthread_cond_t
不 应该 copy cv
结果未定义
可传递 cv 的 ptr
用于 synchronization
声明/定义/初始化
与 mutex 相近
静态初始化
声明 默认属性的 static cv 时, 应该用 pthread_cond_initializer -> 不需要销毁
cv 与其 谓词 关联 —— 为了获得最佳结果, 请这样对待它们!
建议尝试
封装 一组 不变量 / 谓词 / mutex / n 个 cv 为 struct 的成员, 并仔细记录关联
动态初始化 cv
(1) malloc + pthread_cond_init
(2) 用 非默认属性 初始化 cv, 则 必须用 动态初始化
(3) 可 动态初始化 静态声明 的 cv 条件变量
但必须确保 每个 cv 在 使用前初始化, 且 只初始化一次
销毁 cv
动态初始化 cv, 应该在不再需要时, 调 pthread_cond_destroy
何时可 安全 销毁 cv
无线程
blocked on cv
try to wait on
signal/broadcast cv
确定这一点的 最佳方法
[1] 1个线程内 `成功 broadcast` to unblock all waiters
[2] 且, 程序逻辑保证 `随后 不会有线程 try to use the cv`
如
当 线程 从列表中 删除 含 cv 的结构, 然后 broadcast 以唤醒任何 waiter 时,
在释放 cv 占用的存储空间前, 销毁 cv 是安全的( 也是 好主意)
被唤醒的线程 应该在它们恢复时, 检查 等待的谓词
因此, 必须确保在 检查 完成前, 没有释放谓词所需资源 —— 这可能需要 `同步`
(2) 等待 在 cv 上
int pthread_cond_wait (pthread_cond_t *cond,
pthread_mutex_t *mutex);
int pthread_cond_timedwait (pthread_cond_t *condr
pthread_mutex_t *mutex,
struct timespec *expiration);
1) cv 与 mutex
任意时刻
1] 1 对 多, Pthread 不允许
2] 多 对 1 可以
线程 1 wait on cv1 指定 mutex1
线程 2 wait on cv2 指定 mutex1
1) 线程 wait on cv 时, 必须 始终让 (关联的) mutex locked
2) 并发(同时) wait on cv 的 所有线程, 必须指定 the same 关联 mutex
2) cv wait 操作
step1: 先 unlock mutex, 再 block 线程 (原子)
step2: 被唤醒 -> relock mutex (原子)
step3: 再 return
3) 在 lock mutex 后, wait on cv 前, test 谓词 很重要
1) why ?
若 不 test 谓词
// 因为 wait on cv 内部, mutex unlocked + thread blocked on cv 是 原子的
=> lock mutex 与 thread blocked on cv 间
other 线程 无法 ( 获得 mutex 以 ) 改变 shared data (包括 谓词)
2) 若 线程 signals / broadcasts cv,
case1: 而 此时, 没有线程在 waiting, 则 nothing happens
case2: 而 之后, other 线程 立即调 pthread_cond_wait, 则
other 线程 将 忽略 刚发的 信号, 一直 waiting
4) 总是 test 谓词, 然后 再次 test !
线程唤醒 时, 再次 test 谓词 同样重要
|\
|
|
应该始终 `在 循环中 等待 cv`, 以防止
程序错误
多处理器竞争 (multiprocessor races)
虚假唤醒 ( spurious wakeups )
应该总是
while(testing predicate)
wait on cv
本书 所有使用 cv 的示例中 也显示了 `正确的 谓词循环`
5) cond.c
1) wait_thread 通知线程
[1] sleep 小段时间, 让 `main 线程 先到达 condition wait`, 再 waking it
[2] set shared 谓词 ( data.value )
[3] signals cv
2) 命令行参数
控制 通知线程 休眠时间: atoi
3) main 线程
cv 上 设 超时等待 2 s
若 通知线程 休眠时间
> 2s => cv 等待 将超时, 返回 ET1MED0UT
= 2s => main 线程 和 wait_thread 线程 竞争 (race), 每次运行 结果可能不同
< 2s => cv 等待 不超时
6) code 中 `不假定 (waiting thread)唤醒时谓词为真`, 是个好主要, 主要原因如下:
[1] 被拦截的(Intercepted) 唤醒
1] 概念
other 线程
step1: 先 获得 mutex
step2: wait 前 check 谓词 -> 若 true/work available -> 不必 wait / accept the work
step3: unlock mutex: no more work
=> 确保 `最新(latest ) 被唤醒的线程` got the work 很昂贵, 且通常会适得其反
2] Remember `线程是异步` 的
3] 从 cv wait 中 唤醒 涉及 lock mutex
[2] 松(Loose) 谓词
1] 概念
用 `actual state 的 近似值` 容易且方便
如 "there may be work" 而不是 "there is work"
2] 好处
1] 易 signal 或 broadcast: 比 "严谓词"
2] code 将 更健壮
cv 被 `意外(accidentally) signal 或 broadcast` 时,
3] 何时用
若 总是在 wait on cv 前和后 test 严谓词, 则可在
`有意义时, 据 松谓词 自由地 signal`
4] 问题
用 `松谓词 或 意外 唤醒` 可能会成为 `性能问题`, 但许多情况下, 这不会有所作为
[3] 虚假唤醒 / Spurious wakeups
1] 概念
wait on cv 时, 没有 线程专门 broadcast / signaled cv, wait 却待可能(偶然) return
2] 原因
多处理器
使 `条件唤醒(condition wakeup) 完全可预测(completely predictable )`,
可能会 `大大减慢 所有 cv 操作`
3] 导致 虚假唤醒 的
`竞争条件(race conditions)` 应该被认为是 `极少的(rare)`
7) 超时等待
pthread_cond_timedwait
[1] 超时 timeout
1] 应该设为 绝对时间
2] 间隔时间 更难
你必须在 每次线程唤醒时 重新计算它, 然后再等待 —— 这需要确定 它已经等待了多久
[2] 超时 保持有效, 即使发生 被拦截唤醒/虚假唤醒
[3] 超时 条件等待 return ETIMEDOUT
应 先 test 谓词 -> 若 true -> ETIMEDOUT 可能并不是错误
原因: 线程 cv wait 返回前, 要 relock mutex
本来 可能没超时/超时
-> 等待 to relock mutex 要花时间
=> 定时等待 似乎(appear to) 比您请求的时间 长很多
(3) 唤醒 cv waiter
int pthread_cond_signal (pthread_cond_t *cond);
int pthread_cond_broadcast (pthread_cond_t *cond);
1) signal 与 broadcast 区别
[1] 将 signal 视为 broadcast 的 优化 更准确
尽管将 broadcast 视为 signal 的推广 很容易
[2] 实际上, 唯一的 `区别 是 效率`
broadcast 将唤醒 额外线程, 它们必须 test 其 predicate 并 恢复(resume) waiting
[3] 用 broadcast 代替 signal 永远不会错
因为 waiters 必须考虑 intercepted 和 spurious wakes
[4] 一般, 不能用 signal 代替 broadcast
如有 疑问, 请 broadcast
2) signal 何时用
[1] 当 只有1个线程需要唤醒 以处理 changed state, 且 任何等待线程都可以这样做时, 用 signal
[2] 多个谓词 用 1个 cv 时, 不能用 signal
原因: 无法判断 是否会 唤醒 等待该谓词 或 另一谓词的 线程
[3] 谓词 不正确时, 不要 resignal
这可能不会像您期望的那样 传递 signal; spurious / intercepted wakeup 可能会导致 一系列毫无意义的 resignal
3) 应用
[1] 加 单个条目(item) 到 队列, 且 cv 上 阻塞 的 线程都只等待 该 item 出现
可能应该用 signal
将唤醒1个线程 check 队列, 而 不打扰 others sleep, 避免了 不必要的 上下文切换
[2] 加 多个条目(item) 到 队列
可能需要 broadcast
[3] signal 和 broadcast 使用示例 见 7.1.2 节 "读/写锁" package
4) wait on cv 时, 必须 lock mutex
但 如果更方便, signal/broadcast cv 时, 可以 mutex unlocked
好处: 许多系统上, 这可能 更 `高效`
Note: 等待线程 唤醒 时, 必须先 (re)lock mutex
[1] case1: signal cv + intercepted wakeups 不是问题(无所谓 哪个 waiter 被 唤醒)
notify 线程 signal cv: mutex not locked
|
| context switch
|/
等待线程 awakens
接着 awakened thread relock mutex, 并从 cv wait 返回
[2] case2: broadcast cv + intercepted wakeups 是问题 (高优先级线程 不应该 被拦截唤醒)
broadcast cv: mutex not locked
任何线程(不仅是 被唤醒的线程) 都可以在 线程被唤醒前 lock mutex
这种 race 是 intercepted wakeups 的原因之一
如, 1个 `低优先级线程` 可能会在 notify 线程 `即将唤醒 1个 高优先级线程` 时, lock mutex
=> 延迟 高优先级线程的调度
|
| 解决
|/
[3] notify 线程 broadcast cv: 保持 mutex locked // signal cv 也易分析
|
| context switch
|/
等待线程 awakens: 此时 notify 线程 holds mutex
接着 awakened thread 必须 `立即 block on mutex`
|
context switch |
|/
signaling 线程 signal cv 处
2个角度
1] 经历 2次 context switches, 又 回到起点( 但 唤醒了 waiter )
2] mutex 上 等待队列 中, 高优先级 waiter 被放在 低优先级 waiter 前 => 先被调度
|
|
|/
按 waiter 优先级 排队
(4) alarm/闹钟 程序 最终版本
Note
一种优化: "等待变形"
`将 线程 直接从 cv 等待队列 move 到 mutex 等待队列`
mutex locked 时, 没有上下文切换
对 许多应用程序, 有 显著的 `性能优势`
1) 几个版本比较
[1/2] 进程/线程 版本: 每个警报 用 单独的上下文
[3] mutex 版本 : 单个线程 `处理警报列表`
alarm_mutex.c
没有对 每个警报用单独的上下文 => 降低资源利用率
问题: 处理线程 对 新警报命令 `反应不灵敏 (not responsive)`
必须 等待1个警报到期, 然后 才能检测到 另1个 具有更早到期时间 进入列表
如, 依次输入命令 "10 message1" -> "5 message 2"
|
| 解决
|/
2) cv 版本 : 用 定时条件等待, 而不是 休眠 来 wait 警报到期时间
alarm_cond.c
main 线程 在 列表的头部 insert 1个新 item 时,
signal cv -> 立即唤醒 alarm_thread
3) alarm_cond.c
[1] data struct: 增加 2 个变量
1] cv: alarm_cond
2] curProcessingAlarmTime: 记录 alarm 的 到期时间
是1种优化
main(insert) thread `不必唤醒` worker thread, `除非` worker thread
1] idle: 等待 将要处理的新 item: curProcessingAlarmTime == 0), 或
2] main thread 刚插入 的 newItem 的 (expiration)time
比 curProcessingAlarmTime 更早
[2] alarm_insert 函数
2处被调用
main 线程 : insert newItem
alarm_thread : reinsert 被 newItem 抢占的 curProcessingItem
[3] alarm_thread 函数: worker thread's routine
1] 第1次 wait on cv(不带 定时), "sleep",
直到 main thread 收到 user 输入 requestCmd
main thread 插入 newItem 到 itemList 后,
立即 wake up working thread
2] curProcessingAlarmTime 置 0, 告诉 main thread,
worker thread 空闲(idle/not busy)
3] 方便起见, `谓词 testing 被拆分`
一半在 while expr / 超时等待
一半 在 while 之后 的 if(!expired) 中
被 抢占处理的 curAlarm reinsert 到 list => 不丢
main 线程 插入 timer 更早的 alarm 时, 会更新 curProcessingAlarmTime
[4] main 基本不变
// alarm_cond.c
#include <pthread.h>
#include <time.h>
#include "errors.h"
typedef struct alarm_tag
{
struct alarm_tag *link;
int seconds;
time_t time; // expirationTime: sort key on alarmList
char message[64];
} alarm_t;
pthread_mutex_t alarm_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t alarm_cond = PTHREAD_COND_INITIALIZER; // (1)
alarm_t *alarmList = NULL;
time_t curProcessingAlarmTime = 0; // (2) shared data protected by alarm_mutex
// Note: called by main thread and worker thread
// Insert alarm entry on list, ordered by time
void alarm_insert(alarm_t *newitem)
{
int status;
alarm_t **cur2levPtr, *next;
cur2levPtr = &alarmList;
next = *cur2levPtr;
while (next != NULL)
{
if (next->time >= newitem->time)
{
newitem->link = next;
*cur2levPtr = newitem;
break;
}
cur2levPtr = &next->link;
next = next->link;
}
if (next == NULL)
{
*cur2levPtr = newitem;
newitem->link = NULL;
}
#ifdef DEBUG
printf("[list: ") for (next = alarmList; next != NULL; next = next->link)
printf("%d(%d)[\"%s\"]", next->time, next->time - time(NULL), next->message);
printf("]\n");
#endif
// === Diff
// newAlarm比 curProcessingAlarm 的 ExpirationTime 早
// update curProcessingAlarmTime & signal
if (curProcessingAlarmTime == 0 || newitem->time < curProcessingAlarmTime)
{
curProcessingAlarmTime = newitem->time;
pthread_cond_signal(&alarm_cond);
}
}
void *alarm_thread(void *arg)
{
alarm_t *alarm;
struct timespec cond_time;
time_t now;
int status, expired;
pthread_mutex_lock(&alarm_mutex); // === lock: 后续为啥没 unlock, 不必, worker thread 因 main thread 而 蒸发
while(1)
{
// (1) curProcessingAlarmTime 置 0: 告诉 main/insert 线程, worker thread 空间 (idle / not busy)
// [1] alarmList empty: wait on cv, until alarmList notEmpty
// [2] else, not wait, direct start processing
curProcessingAlarmTime = 0;
while (alarmList == NULL) // while( test predicate )
pthread_cond_wait(&alarm_cond, &alarm_mutex); // wait on cv (firstly, without timedout)
alarm = alarmList;
alarmList = alarm->link;
now = time(NULL);
// (2)
expired = 0;
if(alarm->time > now)
{
#ifdef DEBUG
printf("[waiting: %d(%ld)\"%s\"]\n", alarm->time, alarm->time - now, alarm->message);
#endif
cond_time.tv_sec = alarm->time;
cond_time.tv_nsec = 0;
// (3) update current processing alarm's time
curProcessingAlarmTime = alarm->time;
/* (4) while‘s expr is half the predicate,
main/insert 线程 newItem 的 time 早, 则
`提前唤醒 / 拦截唤醒` curProcessingAlarm -> reinsert
newItem 在 next loop 才会被处理
Note: while 退出 2 种 case
1] curAlarm 定时到期
2] newAlarm time 比 curProcAlarm 早,
main thread 唤醒 worker thread 时, while predicate 不满足 -> jump out
*/
while (curProcessingAlarmTime == alarm->time)
{
status = pthread_cond_timedwait(&alarm_cond, &alarm_mutex, &cond_time);
if (status == ETIMEDOUT) // wait until expiration time
{
expired = 1;
break;
}
else if (status != 0)
err_abort(status, "Cond timedwait");
}
// (5) the other half the predicate
if (!expired)
alarm_insert(alarm); // reinset into list
}
else
{
expired = 1;
}
// (6) 处理完 1个 item 后, print hint + free it's memory
if (expired)
{
printf("(%d) %s\n", alarm->seconds, alarm->message);
free(alarm);
}
}
}
int main(int argc, char *argv[])
{
char line[128];
alarm_t *alarm;
pthread_t thread;
pthread_create(&thread, NULL, alarm_thread, NULL);
while (1)
{
fgets(line, sizeof(line), stdin);
if (strlen(line) <= 1)
continue;
alarm = (alarm_t *)malloc(sizeof(alarm_t) );
sscanf(line, "%d %64[^\n]", &alarm->seconds, alarm->message) < 2);
pthread_mutex_lock(&alarm_mutex); // === lock
alarm->time = time(NULL) + alarm->seconds; // calc current request alam's expiration time
alarm_insert(alarm); // ineret this item to alarmList
pthread_mutex_unlock(&alarm_mutex); // === unlock
}
}
3.4 线程间 内存可见性
线程 如何看待 计算机 memory
线程世界 中 "同步" 的 真正含义
(1) Pthreads 内存可见性 规则(rules)
[1] pthread_create / unlock mutex / terminate / signal or broadcast cv
连接的 主动与被动线程
主动线程 在 create / unlock / terminate / notify 时, 可见 的 memory, 被动线程 一定可见
... 后, write 的 memory, ... 未必可见
————————————————————————————————————————————————————————————
主动线程 被动线程
————————————————————————————————————————————————————————————
rule1 pthread_create created (时/后)
————————————————————————————————————————————————————————————
rule2 unlock mutex lock mutex
directly unlock
wait on cv
————————————————————————————————————————————————————————————
rule3 terminate (by) 调 pthread_join
cancellation
returning from start func
calling pthread_exit
————————————————————————————————————————————————————————————
rule4 signal or broadcast cv awakened
————————————————————————————————————————————————————————————
[2] 图 3.5/3.6 是 正确/错误 code
FIGURE 3.5: Correct memory visibility
variableA 与 variableB 的 memory 可见性 都是 rule2 中 一定者
————————————————————————————————————————————————————————————————————————————
ThreadBlock A | ThreadBlock B
————————————————————————————————————————————————————————————————————————————
pthread_mutex_lock (&mutexl); |
variableA = 1; | pthread_mutex__lock (&mutexl);
variableB = 2; |
pthread__mutex_unlock (&mutexl); |
| localA = variableA;
| localB = variableB;
| pthread_mutex_unlock (&mutexl);
————————————————————————————————————————————————————————————————————————————
FIGURE 3.6: Incorrect memory visibility:
variableB 的 memory 可见性 是 rule2 中 未必者: ThreadBlock B 看到的 variableB 的值 未必是 2
————————————————————————————————————————————————————————————————————————————
ThreadBlock A | ThreadBlock B
————————————————————————————————————————————————————————————————————————————
pthread mutex lock (&mutexl); |
variableA = 1; | pthread_mutex__lock (&mutexl);
pthread mutex unlock (&mutexl); |
variableB = 2; |
| localA = variableA;
| localB = variableB;
| pthread_mutex_unlock (&mutexl);
————————————————————————————————————————————————————————————————————————————
[3] 那, 程序员 应如何 写 code ?
1)
1个线程的 registers 不能被 另一线程 修改
线程 stack and heap memory 是 private,
除非将 该 memory ptr 传给 other thread
2) 2个线程 需访问 same data 时, 都必须用 Pthreads 内存可见性 规则之一
大多情况下, 这意味着 用 mutex, 目的:
1] 防止 `多次写入`
2] 1个线程 只 read, 也必须 用 mutex 来确保
它看到的是 mutex locked 时 写入的数据 的 最新值(recent value)
3) 规则 中, 一定者 case 下, 无需 mutex 确保 可见性
thread1: 设 global variable(全局变量) -> creates thread2
-> thread2: see global variable 的 recent value
(2) Pthreads API 之下, 深入 硬件内存架构 细节
`多处理器 内存架构` 似浅实深 的 summary
[0] 单线程、完全同步 的 程序中, 随时读取或写入 任何内存 都是 "安全的"
[1] 异步 (including 多处理器) code, 内存可见性 变复杂
如, `异步信号` 可能出现在程序执行的 任何时候
|
|
`信号处理程序`
[2] 经验丰富的程序员 知道,
[1] 应 非常小心地 编写 全局数据 (global data), 且
[2] 可 track 他们所做的事情
[3] 多线程 导致 异步
[4] 尽管 讨论的是 多线程编程, 但本节中列出的问题 都不是 特定于线程的
而是 `内存架构设计` 的产物,
适用于 two "things"(2件事) `独立访问 同一内存` 的 any case, 这两件事可能是
[1] 不同处理器上 运行 的 `线程`
[2] 不同处理器上 运行, 并使用 shared memory 的 `进程`
[3] 单处理器 上运行, 独立 的 `I/O 控制器 reads 或 writes` the same memory
[5] 1个 内存地址 1次只能保存1个值: 不要让 线程 "race/竞争" to 先到那里
测量 绝对外部时基
"处理器 A" 写入值 1 几微秒 后, "处理器 B" 写入值 2
这并不意味着 memory 的 最终状态 将是 2
原因: 机器的 cache 和 memory bus 工作机制
1] 处理器 (可能有) cache memory: 只是 fast, local memory
用于 keep 从 main memory `最新 (recently) read` 的 data 的 `快速可访问 copies`
2] 回写式 cache 系统
数据 最初仅写入 cache, 稍后某时刻 被 copy/flush 到 main memory
3] 不保证 读/写顺序 的机器
只要处理器 觉得方便, 就可以写入 每个 cache
1> 2个处理器 writes 不同值 to 同一内存地址
每个处理器 的值
先进入 自己的 cache
最终, 都将被写入主内存, 但基本上是随机的, 与 将值写入 相应处理器 cache 的顺序 没直接关系
2> 1个线程(处理器) 内 2次 writes, 不需要 以相同的顺序 出现在内存中
|
|/
Note: 不是指 整个程序是单线程的, 可能 有另1线程 一起 并发
内存控制器 可能会发现 以 "相反" 顺序 write 更快或更方便, 如图 3.7
Time ThreadBlock 1 ThreadBlock 2
t write "1" to address 1 (cache)
t+I write "2" to address 2 (cache) read "0" from address 1
t+2 cache system flushes address 2(此时, thread2 可能还没从 addr1 raed 完 (2个 处理器, 并行) => thread1 先 flushes addr2)
t+3 read "2" from address 2
t+4 cache system flushes address 1
Figure3.7 Memory ordering without synchronization
3> 问题不仅限于 2个线程 write memory
线程1 在 处理器1上 writes to memory address
线程2 在 处理器1上 reads from 该 memory address
=> "内存一致性" ( "读/写顺序" )
处理器 间 同步 复杂/开销大
1> 减慢 内存系统 的速度
2> 对 大多 code, 没啥收益
许多现代计算机(通常 也属于 最快的计算机)
`不保证 不同处理器间 的 内存访问 顺序`
除非 程序使用 `memory barriers` 的 特殊指令
memory barriers 前 内存访问, 比 ... 后 的 内存访问, 先完成
`内存访问`
1] 排队到 内存控制器
2] 可以以 最有效的任何顺序 进行处理
// eg
memory address --- 不在 处理器 的 cache
|\
|
read from: wait cache fill(填满) -> read 完成
"dirty cache"
|\
| => 需要先 flushed cache -> write 完成
|
write to
[6] "memory barrier" 是移动的墙, 而不是 "cache flush" 命令
mutex lock
begins by locking the mutex,
completes by 发1个 memory barrier
=> other thread 先看到 mutex was locked
mutex lock 返回后 发出的 memory accesses 后完成
mutex unlock
begins by 发1个 memory barrier
completes by unlocking the mutex
=> mutex unlock 前, 即 locked 时, 发出的 memory accesses 先完成
mutex 后 unlocked: other thread 才能看到 unlock
该 内存屏障模型 是 Pthreads 内存规则 背后的逻辑
每个规则, 都有1个 "源" 事件 和 "目标" 事件
如 线程 调 pthread_mutex_unlock, 线程 从 pthread_mutex_lock 返回
[7] 即使没有 读/写顺序 和 内存屏障
1] 单个内存地址 的 write 未必是 原子的
处理器读写 8 位 units
内存传输 32/64 位 memory units (内存 单元) => `32/64 位 整数倍` 是1个 `新内存单元` 的 开始)
2] 若 变量 跨越 `内存单元(如 4Byte, 整数倍) 间 边界` (若 机器支持 未对齐的内存访问),
则 计算机可能必须 在两个总线事务中发送数据
1个 未对齐的 32 位值
可以通过 写 2 个 相邻的 32位 内存单元 来发送
3] 回到本节开头的建议
若 想编写 可移植的 Pthreads 代码,
要始终用 `Pthreads 内存可见性规则` 来保证 `正确 的 内存可见性`
而不是 依赖于 关于硬件或编译器行为的假设
4] 多处理器内存体系结构 更深入的处理 参阅 UNIX Systems for Modern Architectures
[8] 图 3.10 用 mutex 确保所需的 读/写顺序
线程1 unlock 时, 可见的 值, 与 线程2 lock 时, 可见的值
至少 一样新(recent)
——————————————————————————————————————————————————————————————
Time ThreadBlock 1 ThreadBlock 2
t lock mutex
(memory barrier)
t+1 write "1" to address 1 (cache)
t+2 write "2" to address 2 (cache)
t+3 (memory barrier)
unlock mutex
t+4 lock mutex
(memory barrier)
t+5 read "1" from address 1
t+6 read "2" from address 2
t+7 (memory barrier)
unlock mutex
——————————————————————————————————————————————————————————————
FIGURE 3.10 Memory ordering with synchronization
less important eg
// ====== mutex_static.c
#include <pthread.h>
#include "errors.h"
typedef struct my_struct_tag
{
pthread_mutex_t mutex;
int value;
} my_struct_t;
my_struct_t data = {PTHREAD_MUTEX_INITIALIZER, 0};
int main()
{
return 0;
}
// ====== mutex_dynamic.c
#include <pthread.h>
#include "errors.h"
typedef struct my_struct_tag
{
pthread_mutex_t mutex;
int value;
} my_struct_t; // (1) 含 mutex 的 struct: 明确关联 mutex 与 其保护的 数据
int main()
{
my_struct_t *data;
int status;
data = malloc(sizeof(my_struct_t)); // (2) malloc 动态 mutex
pthread_mutex_init(&data->mutex, NULL); // (3) 动态初始化 动态 mutex
pthread_mutex_destroy(&data->mutex); // (4) 不再需要 pthread_mutex_init 动态初始化的 mutex 时, 调 pthread_mutex_destroy 销毁 mutex
free(data); // (5) free
return status;
}
// ====== 3. tryLockWithBackoff.c
#include <pthread.h>
#include "errors.h"
#define ITERATIONS 10
pthread_mutex_t mutex[3] = {
PTHREAD_MUTEX_INITIALIZER,
PTHREAD_MUTEX_INITIALIZER,
PTHREAD_MUTEX_INITIALIZER};
int backoff = 1;
int yield_flag = 0;
// locks all mutexes(mutex1, mutex2, ...) in order
void *lock_forward(void *arg)
{
int i, iterate, backoffs;
int status;
for (iterate = 0; iterate < ITERATIONS; iterate++)
{
backoffs = 0;
for (i = 0; i < 3; i++)
{
if (i == 0)
{
pthread_mutex_lock(&mutex[i]);
printf("forward lock got %d\n", i);
}
else
{
if (backoff)
status = pthread_mutex_trylock(&mutex[i]);
else
status = pthread_mutex_lock(&mutex[i]);
if (status == EBUSY)
{
backoffs++;
printf("forward locker backing of at %d\n", i);
for (; i >= 0; i--)
pthread_mutex_unlock(&mutex[i]);
}
else
{
if (status != 0)
err_abort(status, "Lock mutex");
printf("forward locker got %d\n", i);
}
}
if (yield_flag)
{
if (yield_flag > 0)
sched_yield();
else
sleep(1);
}
}
// Report that we got 'em, and unlock to try again.
printf("lock forward got all locks, %d backoffs\n", backoffs);
pthread_mutex_unlock(&mutex[0]);
pthread_mutex_unlock(&mutex[1]);
pthread_mutex_unlock(&mutex[2]);
sched_yield();
}
return NULL;
}
// locks all mutexes(mutex1, mutex2, ...) backward: ..., mutex2, mutex1
void *lock_backward(void *arg)
{
int i, iterate, backoffs;
int status;
// Note: lock mutex1, mutex2, mutex3 backward
for (iterate = 0; iterate < ITERATIONS; iterate++)
{
backoffs = 0;
for (i = 2; i >= 0; i--)
{
if (i == 2)
{
pthread_mutex_lock(&mutex[i]);
printf("backward lock got %d\n", i);
}
else
{
if (backoff)
status = pthread_mutex_trylock(&mutex[i]);
else
status = pthread_mutex_lock(&mutex[i]);
if (status == EBUSY)
{
backoffs++;
printf("backward locker backing of at %d\n", i);
for (; i < 3; i++)
{
status = pthread_mutex_unlock(&mutex[i]);
if (status != 0)
err_abort(status, "Backoff");
}
}
else
{
if (status != 0)
err_abort(status, "Lock mutex");
printf("backward locker got %d\n", i);
}
}
if (yield_flag)
{
if (yield_flag > 0)
sched_yield();
else
sleep(1);
}
}
printf("lock backward got all locks, %d backoffs\n", backoffs);
pthread_mutex_unlock(&mutex[2]);
pthread_mutex_unlock(&mutex[1]);
pthread_mutex_unlock(&mutex[0]);
sched_yield();
}
return NULL;
}
int main(int argc, char *argv[])
{
pthread_t forward, backward;
int status;
if (argc > 1)
backoff = atoi(argv[1]);
if (argc > 2)
yield_flag = atoi(argv[2]);
pthread_create(&forward, NULL, lock_forward, NULL);
pthread_create(&backward, NULL, lock_backward, NULL);
pthread_exit(NULL);
}