Effective Modern C++

【Effective Modern C++(7)】并发API

2018-12-25  本文已影响15人  downdemo

35 使用std::async替代std::thread

int doAsyncWork();
std::thread t(doAsyncWork);
auto fut = std::async(doAsyncWork); // "fut" for "future"
int doAsyncWork() noexcept;
std::thread t(doAsyncWork); // 若无线程可用,仍会抛出异常
auto fut = std::async(doAsyncWork); // 由标准库的实现者负责线程管理

36 需要异步则指定std::launch::async

auto fut1 = std::async(f); // 意义同下
auto fut2 = std::async(std::launch::async | std::launch::deferred, f);
auto fut = std::async(f);
auto fut = std::async(f); // f的TLS可能和一个独立线程相关
                          // 但也可能和调用fut的get或wait的线程相关
using namespace std::literals; // C++14的duration suffixes

void f() // f睡眠1秒后返回
{
    std::this_thread::sleep_for(1s);
}
auto fut = std::async(f);
while (fut.wait_for(100ms) != std::future_status::ready)
{ // 循环至f运行完成,但这可能永远不会发生
    …
}
auto fut = std::async(f);
if (fut.wait_for(0s) == std::future_status::deferred) // 任务被推迟
{
    … // 使用fut的wait或get异步调用f
}
else // 任务未被推迟
{
    while (fut.wait_for(100ms) != std::future_status::ready)
    {
        … // 任务未被推迟也未就绪,则做并发工作直至结束
    }
    … // fut准备就绪
}
auto fut = std::async(std::launch::async, f); // 异步执行f
template<typename F, typename... Ts>
inline
std::future<typename std::result_of<F(Ts...)>::type>
reallyAsync(F&& f, Ts&&... params) // 返回异步调用f需要的future
{
    return std::async(std::launch::async,
        std::forward<F>(f),
        std::forward<Ts>(params)...);
}
auto fut = reallyAsync(f); // 异步运行f,如果std::async抛出异常则reallyAsync也抛出异常
template<typename F, typename... Ts>
inline
auto // C++14
reallyAsync(F&& f, Ts&&... params)
{
    return std::async(std::launch::async,
        std::forward<F>(f),
        std::forward<Ts>(params)...);
}

37 让std::thread对象在所有路径上不可合并(unjoinable)

constexpr auto tenMillion = 10'000'000; // C++14允许单引号分隔数字增强可读性
bool doWork(std::function<bool(int)> filter, int maxVal = tenMillion)
{
    std::vector<int> goodVals; // 筛选出的值
    std::thread t(
        [&filter, maxVal, &goodVals] // 遍历goodVals
        {
            for (auto i = 0; i <= maxVal; ++i)
            { if (filter(i)) goodVals.push_back(i); }
        });
    auto nh = t.native_handle(); // 使用底层handle设置优先级
    …
    if (conditionsAreSatisfied())
    {
        t.join(); // 让t结束执行
        performComputation(goodVals);
        return true; // 计算已执行
    }
    return false; // 计算未执行
}
class ThreadRAII {
public:
    enum class DtorAction { join, detach };
    ThreadRAII(std::thread&& t, DtorAction a) // std::thread对象是不可拷贝的
    : action(a), t(std::move(t)) {}
    ~ThreadRAII()
    {
        if (t.joinable())
        {
            if (action == DtorAction::join) t.join();
            else t.detach();
        }
    }
    // 析构函数会抑制合成移动操作,因此需要显式声明
    ThreadRAII(ThreadRAII&&) = default;
    ThreadRAII& operator=(ThreadRAII&&) = default;
    std::thread& get() { return t; }
private:
    DtorAction action;
    std::thread t;
};
bool doWork(std::function<bool(int)> filter, int maxVal = tenMillion)
{
    std::vector<int> goodVals;
    ThreadRAII t( // 使用RAII对象
        std::thread(
            [&filter, maxVal, &goodVals]
            {
                for (auto i = 0; i <= maxVal; ++i)
                { if (filter(i)) goodVals.push_back(i); }
            }),
        ThreadRAII::DtorAction::join // RAII action
    );
    auto nh = t.get().native_handle();
    …
    if (conditionsAreSatisfied())
    {
        t.get().join();
        performComputation(goodVals);
        return true;
    }
    return false;
}

38 线程handle的析构函数的不同行为

// this container might block in its dtor, because one or more contained futures
// could refer to a shared state for a non-deferred task launched via std::async
std::vector<std::future<void>> futs;

class Widget { // Widget类型对象可能会在析构函数中阻塞
public:
    ...
private:
    std::shared_future<double> fut;
};
int calcValue();
std::packaged_task<int()> pt(calcValue);
auto fut = pt.get_future(); // 获取pt的future
std::thread t(std::move(pt)); // 在t上运行pt
{
    std::packaged_task<int()>
    pt(calcValue);
    auto fut = pt.get_future();
    std::thread t(std::move(pt));
    …
}

39 对一次性事件通信使用void future

std::condition_variable cv; // 事件的条件变量
std::mutex m; // 使用cv时加上的互斥量
… // 检测事件
cv.notify_one(); // 通知响应任务
… // 准备响应
{
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk); // 等待通知,但这里会出错的
    … // 响应事件(m被锁定)
} // 析构lk以解锁m
… // 继续等待响应(m已解锁)
cv.wait(lk,
    []{ return whether the event has occurred; });
std::atomic<bool> flag(false); // 共享的bool标志位
… // 检测事件
flag = true; // 通知响应任务
… // prepare to react
while (!flag); // wait for event
… // react to event
// 检测任务
std::condition_variable cv;
std::mutex m;
bool flag(false);
… // 检测事件
{
    std::lock_guard<std::mutex> g(m);
    flag = true; // 通知响应任务(part 1)
}
cv.notify_one(); // 通知响应任务(part 2)

// 响应任务
… // 准备响应
{
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, [] { return flag; }); // 使用lambda防止虚假唤醒
    … // 对事件进行响应(m被锁定)
}
…
std::promise<void> p;
… // 检测事件
p.set_value(); // 通知响应任务
… // 准备响应
p.get_future().wait();
… // 对事件进行响应
std::promise<void> p;
void react(); // 响应任务的函数
void detect() // 检测任务的函数
{
    std::thread t([] // 创建线程
    {
        p.get_future().wait(); // 暂停t直到future被设置
        react();
    });
    … // 这里t处于暂停状态
    p.set_value(); // 取消暂停t(于是将调用react)
    … // 其他工作
    t.join(); // make t unjoinable
}
void detect()
{
    ThreadRAII tr(
        std::thread([]
        {
            p.get_future().wait();
            react();
        }),
        ThreadRAII::DtorAction::join // risky! (see below)
    );
    … // tr中的线程在此处暂停
    p.set_value(); // tr中的线程在此取消暂停
    …
}
std::promise<void> p;
void detect()
{
    auto sf = p.get_future().share(); // sf为std::shared_future<void>
    std::vector<std::thread> vt; // 响应线程的容器
    for (int i = 0; i < threadsToRun; ++i)
    {
        vt.emplace_back([sf]{ sf.wait(); react(); });
    }
    … // 如果此处抛异常则detect将失去响应
    p.set_value(); // 让所有线程取消暂停
    …
    for (auto& t : vt)
    {
        t.join(); // make all threads unjoinable
    }
}

40 对并发使用std::atomic,对特殊内存使用volatile

std::atomic<int> ai(0);
ai = 10;
std::cout << ai;
++ai;
--ai;
volatile int vi(0);
vi = 10;
std::cout << vi;
++vi;
--vi;
std::atomic<int> ac(0);
volatile int vc(0);
// Thread 1
++ac; ++vc;
// Thread 2
++ac; ++vc;
std::atomic<bool> valAvailable(false);
auto imptValue = computeImportantValue(); // 计算值
valAvailable = true; // 通知其他任务值已可用
volatile bool valAvailable(false);
auto imptValue = computeImportantValue();
valAvailable = true; // 其他线程可能认为此赋值在imptValue之前
int x = 42;
auto y = x;
y = x;
x = 10;
x = 20;
auto y = x;
y = x;
x = 10;
x = 20;
auto y = x;
x = 20;
auto y = x;
y = x;
x = 10;
x = 20;
volatile int x;
std::atomic<int> x;
auto y = x; // 错误
y = x; // 错误
std::atomic<int> y(x.load());
y.store(x.load());
register = x.load(); // 将x读入寄存器
std::atomic<int> y(register); // 用寄存器值初始化y
y.store(register); // 将寄存器值存入y
volatile std::atomic<int> vai; // 对val的操作是原子的且不可优化
上一篇 下一篇

猜你喜欢

热点阅读