第5章: 并发与实用功能
2022-06-20 本文已影响0人
my_passion
5.0 建议
1 用 资源句柄 管理资源
2 用 unique_ptr 访问 多态类型的对象
3 用 shared_ptr 访问 共享对象
4 用 类型安全的机制 处理 并发
5 最好避免 shared data
6 从 并发执行任务, 而不是 thread 角度 思考
5.1 资源管理
1 unique_ptr 与 shared_ptr : <memory>
资源: 符合 先获取后释放 的 东西
内存/锁/套接字/线程句柄/文件句柄
|
| 不及时释放
|/
资源泄露
|
|
|/
标准库 组件 不会出现 资源泄露
用 RAII 确保 `资源 依存于 其所属对象, 而 不会超过 其对象的 lifetime`
(1) SP: unique_ptr vs. shared_ptr
unique_ptr/shared_ptr 所有权 唯一/共享
移动/copy
unique_ptr
可以把 自由存储 上的对象 传递给/出 函数
shared_ptr
1) copy 而非 移动
2) 最后1个 shared_ptr 销毁时, 所管理的 资源 才被销毁
3) 使对象的 lifetime 变得不那么容易掌控
4) 没有指明 哪个 owner 有权读写 资源
数据竞争 / 数据混淆 依然存在
(2) `何时应该用 SP / ` vector 等 资源句柄(vector / thread)`
答: 指针语义 / 对象集合
5.2 并发
1 task(任务) 和 thread(线程)
(1) task
可视为与 caller 并发执行的 function
只需 传参 / 获取结果 / 保证 2个 task 不竞争使用 共享数据
形式: func / funcObj
(2) std::thread 对象: <thread>
task 作 实参
线程间通信
[1] 同步
1] 调用-返回
传参
返回结果
2] 共享数据
3] cv
[2] 异步
2 传参
thread ctor: 可变参数模板
3 返回结果
(1) 通过参数: 不优雅
默认 值传递 copy
std::ref() 才引用传递
注意2种 handle 参数: 指针 / 容器
值传递 -> copy 的是 handle, resource 不 copy
(2) 异步
// eg: 传参 + 返回结果(通过参数)
void f(vector<double>& v, double* res); // 从 v 获取输入, 结果放 *res
class F
{
private:
vector<double>& v; // 输入 源
duoble* res; // 输出 目标
public
F(vector<double>& vv, double* p): v(vv), res(p){}
void operator()();
};
duoble res1;
double res2;
thread t1 {f, vec1, &res1};
thread t2 {F(vec2), &res2};
4 shared data
|
| 访问 必须
|
(1) `同步`
以确保 同一时刻 最多有1个 task 能访问
|
| 机制
|/
mutex / 锁: <mutex>
thread
lock()/unlock() 操作 可 获取/释放 1个 mutex
unique_lock: 对 mutex 的 RAII
若 other 线程 已获取 mutex
当前线程 `等待/阻塞`, 直到 that 线程 完成对共享数据的访问
mutex m; // 控制 共享数据 访问 的 mutex
int sh; // 共享数据
void f()
{
unique_lock<mutex> lck {m}; // 获取 mutex
sh++; // 处理 共享数据
} // 隐式 释放 mutex
| 共享对象 与 mutex 间 对应关系, 靠 程序员 去维护, 易出错
|
| 解决:
|/
1) mutex 与 shared dada 关联, 形成类, 访问 shared data 前 先 lock mutex
class Record
{
public:
mutex rm;
int sh;
};
同时 访问多个资源 来 执行1个操作
|
| 可能导致
|/
死锁
thread1 获取了 mutex1 然后 试图获取 mutex2, 而同时 thread2 已获取了 mutex2 然后 试图获取 mutex1
=> 2 个任务 都无法继续执行了
|
| 解决
| 同时获取多个锁
|/
2) 推迟加锁 defer_lock + 同时获取 全部锁 lock(lck1, lck2);
void f()
{
// ...
unique_lock<mutex> lck1 {m1, defer_lock};
unique_lock<mutex> lck2 {m2, defer_lock};
// ...
lock(lck1, lck2);
// ... 处理 共享数据 ...
} // 隐式 释放所有 mutex
(2) cv
condition_variable 机制
thread1 等待 条件/事件(线程2完成某工作后) 发生
// === 2个 thread 通过 queue 传递消息
#include <condition_variable>
class Message{
// ...
};
queue<Message> mq;
condition_variable cv;
mutex mut;
// 1)
void producer()
{
while(true)
{
Message msg;
// fill msg
unique_lock<mutex> lck {mut}; // === lock
mq.push(msg);
cv.notify_one(); // 通知
} // === unlock implicitly
}
// 2)
void consumer()
{
while(true)
{
unique_lock<mutex> lck {mut};
while (cv.wait(lck) ); // 释放 lck 并 等待, 被唤醒后 重新获取 lck
auto msg = mq.front(); // extract msg
mq.pop();
lck.unlock(); // 释放 lck: explicitly
// 处理 msg
}
}
5 任务通信
在 task (抽象层), 而不是 线程和锁(底层) 上操作
user 不用 care 锁
<future>
(1) future + promise
task 间 value/exception 传递
2个 task 运行于 独立 线程
step
[1] task1 建 promise, 对 promise 调 get_future 获取 promise 内部的 future
[2] task1 将 promise 引用传递 给 task2
[3] task2 中 对 promise 调 set_value(...)/set_exception(...):
内部 将 value/exception 设给 promise 的 future
[4] task1 对 future 调 get(), block, 直到 future 为 ready 态/抛出异常, get() 返回 value /exception
另1种形式
[1] caller 建 promise, 对 promise 调 get_future() 获得 promise 内部的 future
[2] caller 将 promise 引用传到 task1
[3] task1 中 对 promise 调 set_value()/set_exception()
[4] caller 将 future 引用传到 task2
[5] task2 调 future.get() 获得 value
(2) packaged_task
1) task2 的 return value 传给 task1
|
转化为 | wrap/封装 task2 到 packaged_task
| packaged_task 把 task 的 return value/异常 放入 packaged_task 内部的 future
|/
task1 与 packaged_task 间 value/exception 传递
task1 对 packaged_task 调 get_future()
task1 对 获得的 future 调 get()
task1 与 packaged_task 运行于 独立 线程
2) 类模板
[1] 模板参数: task 类型
ctor 的参数: task
[2] note
packaged_task 不能 copy
为 packaged_task 启动线程时, 必须 move(packaged_task_obj)
(3) async()
不必再操心 `锁 和 线程`
1) 思路
类似 调用函数 的方式 启动任务
`将 task 当作` 可以与 other task 并发执行 的 `函数` 来处理
2) 机制: 2个分离
[1] 分离 函数调用的 `调用部分` 和 `获取结果部分`
[2] 分离 这2部分 与 任务的实际执行
3) 优势
简单 又不失其强大性
能满足广泛的需求
4) 限制
对 share data 且 需要锁机制 的 task 不要用 async()
用 async() 内部到底启用 几个 thread, 由 async()实现 决定, user 不知道
template< class Function, class... Args >
std::future<typename std::result_of<
typename std::decay<Function>::type(
typename std::decay<Args>::type...)
>::type>
async( std::launch policy, Function&& f, Args&&... args );
ArgsType
FunctionType
FunctionReturnType
// ===(1)
#include <future>
#include <thread>
#include <chrono>
#include <iostream>
void f(std::promise<int>& pr)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
int res = 1;
pr.set_value(res);
}
void g(std::future<int>& fut)
{
int v = fut.get();
std::cout << v << std::endl;
}
void test()
{
std::promise<int> pr;
std::future<int> fut = pr.get_future();
std::thread t1(f, std::ref(pr));
std::thread t2(g, std::ref(fut) );
t1.join();
t2.join();
}
int main()
{
test();
}
// ===
void f(promise<X>& px) // promise 任务: 将 结果 放 promise 中
{
// ...
try
{
X res;
// ... 计算 1个 值, 保存在 res 中
px.set_value(res);
}
catch(...)
{
// 将异常传给 future 的线程
px.set_exception( current_exception() );
}
}
void g(future<X>& fx) // future 任务: 从 future 获取 结果
{
// ...
try
{
X v = fx.get(); // 如必要, 等待 值 准备好
// ... 使用 v
}
catch(...)
{
// ...
}
}
// === (2)
#include <iostream>
#include <thread>
#include <future>
int f(int x, int y) { return x+y; }
// 1] 任务 类型: 任务 / 函数 signature
using TaskType = int(int, int);
void task_thread()
{
// 2] wrap task
std::packaged_task<TaskType> pt(f);
// 3] 请求 future
std::future<int> fut = pt.get_future();
// 4] 为 packaged_task0 / 1 启动线程
std::thread t(std::move(pt), 1, 2);
t.join();
// 5] 获得 结果
std::cout << fut.get() << '\n';
}
void task_lambda()
{
std::packaged_task<TaskType> pt(
[](int a, int b) {return a + b; });
std::future<int> fut = pt.get_future();
pt(1, 2);
std::cout << fut.get() << '\n';
}
void task_bind()
{
// Note: bind 哪个参数后, TaskType 中 哪个参数去掉
std::packaged_task<int()> pt( std::bind(f, 1, 2) );
std::future<int> fut = pt.get_future();
pt();
std::cout << fut.get() << '\n';
}
int main()
{
task_thread();
task_lambda();
task_bind();
}
// === (3)
#include <iostream>
#include <future>
#include <thread>
int f(int x)
{
x++;
return x;
}
int main()
{
// std::launch::deferred 当执行到 fut.get() 时, 才开始 创建线程
std::future<int> fut = std::async(std::launch::deferred, f, 1);
std::cout << fut.get() << std::endl;
}
promise-future.jpg
5.3 小工具 组件
1 类型函数
`编译时求值` 的 函数, 它 接受 `类型实参` 或 返回 `类型结果`
(1) iterator_traits 机制: <iterator>
`容器类型`
| template <typename C>
| using Iterator_type = typename C::iterator;
|
| 迭代器类型 再用别名 封装1层
| // Note: Iterator_type<C> 前没有 typename, 编译器已知道 template <typename C> Iterator_type<C> 为类型
| template <typename C>
| using Iter = Iterator_type<C>;
|/
`容器迭代器类型`
|
| template <typename Ite>
| using Iterator_category = typename
| std::iterator_traits<Ite>::iterator_category;
|/
迭代器 category
|
| 对象: 构建 `标签值`, 指示 迭代器 的 category
| Iterator_category<Iter>{}
|/
标签分发
形参: vector / list 的 迭代器 category 为 random_access_iterator_tag / forward_iterator_tag
对 list 排序
list copy 到 vector -> vector 上 sort -> vector copy 回 list
`类型魔法` 发生在 选择 helper 函数时
// template< class RandomIt >
// void sort(RandomIt first, RandomIt last);
#include <iostream>
#include <iterator> // std::bidirectional_iterator_tag / std::random_access_iterator_tag
#include <vector>
#include <list>
#include <algorithm> // copy, sort
#include <type_traits> // std::remove_cv
template <typename C>
using Iterator_type = typename C::iterator;
template <typename Ite>
using Iterator_category = typename std::iterator_traits<Ite>::iterator_category;
// vector sort
template <typename Iter>
void sort_helper(Iter beg, Iter end, std::random_access_iterator_tag)
{
std::sort(beg, end);
}
// list sort
template <typename Iter>
void sort_helper(Iter beg, Iter end, std::bidirectional_iterator_tag)
{
//decltype(*beg) is int&: because *beg is not a identifier, but is a lvalue
using U = typename std::remove_reference< decltype(*beg) >::type;
// <=>
//auto identifier = *beg;
//using U = decltype(identifier);
std::vector< U > v{ beg, end }; // list init(copy to) vector
std::sort(v.begin(), v.end()); // sort vector
std::copy(v.begin(), v.end(), beg); // vector copy to list
}
template <typename C>
void sort(C& c)
{
using Iter = Iterator_type<C>;
auto beg = c.begin();
sort_helper(beg, c.end(), Iterator_category<Iter>{});
}
int main()
{
std::vector<int> vec{ 1, 3, 2 };
std::list<int> lst { 1, 2, 5, 4, 3 };
sort(vec);
sort(lst);
}
(2) 类型谓词 <type_traits>
标准库 类型谓词 是 类型函数
负责回答 关于类型的问题
bool b1 = is_pod<int>();
has_virtual_destructor
is_base_of
2 pair 和 tuple <utility>
equal_range 返回 迭代器 pair
pair
提供 = == <
其 元素得 支持 这些运算
希望从 接口函数 返回2个结果
(结果本身, 结果好不好的标志位) -> pair
make_pair()/make_tupe() 可创建 pair/tuple
vector<string> vec;
auto p = make_pair(vec.begin(), 2); // pair<vector<string>::iterator, int>
auto p_first = p.first;
auto p_second = p.second;
auto t = make_tuple(string("hello"), 1, 1.2);
string s = get<0>(t);