C++ concurrency in action: 1~4 章

2021-01-10  本文已影响0人  my_passion
    C++11 最重要的新特性: 多线程

        2 个好处

            [1] 可写 `平台无关的 多线程程序, 移植性 提高`

            [2] `并发` 提高性能

1 概述

    1   并发

        (1) 含义

            `任务切换`: 能切出去, 也能切回来

            任务切得太快, 看起来像 同时执行多个任务 一样

        (2) 2 种方式

            [1] `多进程` 

                `进程间通信: 信号 / 套接字 / 文件 / 管道`

                    `1个 进程 一套 地址空间`

            [2] `多线程`

                `同一进程 中 所有线程 共享地址空间`

                    所有线程 访问到 大部分数据—全局变量仍然是全局的,
                        指针、对象的引用、数据可 在线程间传递

    2   并发 优势

        (1) 关注点分离 ( SOC )`

            code 分 2 part

                “DVD播放”

                “用户界面”

            [1] 单线程: 两者放一起, 以便调用

            [2] 多线程: 分隔关注点, 两者不用放一起

                线程1:    处理“用户界面” 事件

                线程2:    “DVD播放”

                线程间 交互 / 关联
                    用户点击“暂停”

        (2) ·算法/数据 并行 -> 提高性能`

            [1] 数据相同 / 算法拆分

            [2] 计算相同 / 数据拆分

    3   C++11 多线程 特点

        (1) `线程感知` 的 `内存模型`

        (2) 直接支持 `原子操作`

            coder 无需了解 与平台相关的汇编指令
            
                就可编写 高效/可移植 代码

            compiler 负责 `搞定具体平台`

        (3) `C++ 线程库 性能` 
                
                `接近 直接使用底层 API 时`

    4    多线程入门

        (1) 单线程
            #include<iostream>
            int main()
            {
                std::cout<<"Hello World\n";
            }

        (2) 多线程
        
            #include <iostream>
            #include <thread>           //(1)
            void hello()                //(2)
            {
                std::cout<<"Hello Concurrent World\n";
            }
            int main()
            {
                std::thread t(hello);   //(3)
                t.join();               //(4)
            }

                1)  头文件 <thread>

                2)  `线程 初始函数`: 新线程执行入口

                        [1] `主   线程: main()`

                        [2] `其他 线程: std::thread 对象 ctor 中 指定`

                3) std:thread 对象创建 完成 -> 开始执行新线程

                4) join(): 让 caller 等待 新线程 完成

2 管理线程: thread/join/detach/RAAI/std::ref/std::bind/move ownership

image.png
    线索
    
        1   启动线程, 等待它结束 or 放后台运行
        
        2   给 线程函数 传参
        
        3   transfer ownership of thread 
        
                from current associated std::thread object to another
                
        4   确定线程数, 识别特殊线程
    

    #1  基本 线程管理

        1.1 启动线程

            `线程 在 std::thread 对象创建 时 启动`

            1   thread 对象
    
                (1) 一旦构造, 立即开始执行 `线程函数`
                                    
                (2) 不可 copy, 可 move
                        
                        copy/赋值 deleted
                            
                            原因 
                                线程对象 `copy/赋值` 可能 `弄丢已 join 的线程`

                (3) 可能 `不 表示/关联 任何线程` => safe to destroy & not joinable
            
                        after 4场景之一

                        [1] 默认构造 
                
                                => 无 线程 ( 函数 ) 执行

                        [2] move from
                
                                => 线程 move 给 other thread 对象去管理

                        [3] detach

                                => 线程 可能仍在执行

                        [4] join
                
                                => 线程 执行完成
                                    
            2   线程函数
                    
                [1] 不与 caller 通信时, 若 抛出异常
                    
                    std::terminate 被调用
                        以终止线程函数

                [2] return value 或 异常 可 传给 caller
                    
                    2种方式
                        ——————————————————————————
                        1] std::promise
                        
                        2] 共享变量 (可能需 同步)
                        ——————————————————————————
                                        
            3   std::thread ctor 的 args: 可调用对象 

                (1) 入口点为 function

                    void f();
                    std::thread my_thread(f)
            
                (2) 入口点为 `可调用对象`

                    可调用对象 copied into thread's storage
                        
                        并 `invoked` from there

                            原始可调用对象 可被立即销毁
        
                                class F
                                {
                                public:
                                    void operator()() const { do_something(); }
                                };

                                F f;
                                std::thread my_thread(f);

                    问题: `函数对象 临时无名对象 作 std::thread ctor's arg`

                        会被 C++ 编译器 解释为 `函数声明`

                            std::thread my_thread( F() );
            
                    3 种 解决
                        ————————————————————————————————————————————————————
                        [1] 函数对象对象 外再加一层小括号
                            
                            std::thread my_thread( (F() ) ); 
                        ————————————————————————————————————————————————————
                        [2] 花括号
                            
                            std::thread my_thread{ F() };
                        ————————————————————————————————————————————————————
                        [3] lambda 表达式 启动线程
                            
                            std::thread my_thread( 
                                    []{ do_something(); } 
                                );
                        ————————————————————————————————————————————————————
                        
            4   join 还是 detach ?

                (1) `必须用 + 何时用` 
                    
                    原因
                        
                        `std::thread 对象 销毁` 前线程 `没被 joined 或 detached`

                            线程对象 
                                ——————————————————————————————————————
                                [1] joinable 
                                ——————————————————————————————————————
                                [2] dtor 调 std::terminate() 结束程序
                                ——————————————————————————————————————
                                
                (2) `detached 线程` 要在 `程序结束前 结束`
                    
                    原因
                    
                        main return 时, 正在执行的 detached 线程 
                        
                            ——————————————————————————————————————
                            [1] 被暂停
                            ——————————————————————————————————————
                            [2] 其 thread-local objects 销毁
                            ——————————————————————————————————————
                            
                (3) 用 join 还是 detach ?
                
                    ————————————————————————————————————————————
                    [1] join
                    ————————————————————————————————————————————
                    [2] 除非 你想 `更灵活`
                        
                            并用 `同步` 机制 去 `等待` 线程完成 
                                
                                此时 `detach`
                    ————————————————————————————————————————————

            5   std::terminate() in <exception>

                    被  C++ runtime 调用 
                    
                        ——————————————————————————————————————————————
                        3 种 case 
                            ——————————————————————————————————————————
                            [1] std::thread 初始函数 抛出异常
                            ——————————————————————————————————————————
                            [2] joinable 线程对象 `被销毁` 或 `被赋值`
                            ——————————————————————————————————————————
                            [3] `异常` 被抛出却未被捕获
                        ——————————————————————————————————————————————
                            
            6   lifetime  问题:   多(比单)线程 更易发生

                    `正在运行的 detached 子线程` 
                        
                        access `已被 destroyed 的 object` 
                        
                            =>  undefined behavior
                            
                        |
                        |   如 
                        |/
                        
                    caller 
                        ——————————————————————————————————————————————————
                        [1] local 变量 ptr/ref -> 传给 callee 线程 以 访问 
                        ——————————————————————————————————————————————————
                        [2] 已 return 
                        ——————————————————————————————————————————————————
                        [3] 相比 caller 线程, `callee 线程 lifetime 更长`
                        
                            => `ptr/ref 悬挂` ( dangling )
                                        |
                                        |/
                                潜在的 访问 隐患
                        ——————————————————————————————————————————————————
                        |
                        |   解决 
                        |/
                                
                    [1] 线程函数 `自包含`
                     +
                    [2] `copy data into thread`
                                
                    struct func
                    {
                        int& i;
                        
                        func(int& i_) : i(i_) {}

                        void operator() ()
                        {
                            for (unsigned j=0 ; j<1000000 ; ++j)
                            {
                                // 潜在的访问隐患: dangling reference
                                do_something(i);        
                            }
                        }
                    };

                    void oops()
                    {
                        int local_state = 0;
                        func my_func(local_state);   
                        std::thread my_thread(my_func); 
                        my_thread.detach();             
                    }  

        1.2 `等待 线程完成`

            1   怎样 `等待 线程完成`

                关联的 std::thread object 上调 join()

            2   join() 做了啥
                
                清理 所关联线程的 内存
             
                    调 1 次 join 后
                        线程对象 
                            [1] 不再关联 (已完成的)线程 
                            [2] not joinable <=> joinable() == false

            3   等待过程中 控制线程
                    ——————————————————————
                    [1] 检查线程是否完成
                    
                        1] cv
                        
                        2] futures 机制
                    ——————————————————————  
                    [2] 等待特定时间
                    ——————————————————————

        1.3 异常下的 等待

            1   `call join() 的位置 要精挑细选`

                (1) 问题

                    call join() 前 抛出异常
                        
                        join() 调用 被跳过
                        
                            `lifetime problem`
                 

                (2) 解决 1

                    让 join() 不被跳过
                        
                        use try/catch, `catch 中 也调 join()`

                            缺点
                                [1] try/catch 冗长
                                [2] scope 易出错
                
                    struct func; // defined in list2.1
                    void f()
                    {
                        int some_local_state=0;
                        func my_func(some_local_state);
                        std::thread t(my_func);
                        try
                        {
                            do_something_in_current_thread();
                        }
                        catch(...)
                        {
                            t.join(); // 1
                            throw;
                        }
                        t.join();     // 2
                    }

                (3) 解决 2

                    RAII

                        本质含义
                        
                            `让 资源管理对象 own 资源, 在 析构时 释放资源

                                => `资源的 lifetime 管理` 就和 `资源管理对象 lifetime 管理` 统一起来

                                    => 只要 `不泄漏 资源管理对象, 就能保证 不泄漏资源`

                                        至于是 `让 ctor 自己 创建资源`,
                                            
                                            `还是 把资源创建好 再交给 ctor 保管`,
                                                
                                                没有优劣之分

                                                    可用 `static Factory Method 创建资源对象`
                                                        `client 就不用管 里边怎么实现了`
                        实现 
                        
                            线程对象 
                                |
                                |   设 
                                |/
                            保护类 thread_guard

                                [1] explicit ctor: 左值引用 para = std::thread& t_
                                                                    |
                                                                    |   `线程对象` 作 arg 引用传递 构造 `线程保护对象`  
                                                                    |/
                                [2] init. list  初始化         /       绑定 
                                                                    |
                                                                    |/
                                [3] 左值引用 成员                  =  std::thread& t


                                    => `线程 的 ownership 没有转移`

                                        => `caller 可能 已对线程 调 join()` 
                                                
                                            => [4] thread_guard `dtor 中 调 join() 前 必须 先判线程是否可 joinable`

                                                    =>  线程 `caller 函数 执行 }`
                                                        
                                                            即 ret 指令 `弹栈 逆序销毁` local 对象 时 
                                                                
                                                                先 `销毁 保护对象`

                                                                    `调 保护对象 dtor 中 可能 调 join()`

                                [5] copy 和 赋值 delete
                                    
                                        因 `资源管理对象` 可能 `outlive 其管理的 `线程对象` 的 `scope`

                        //list2.3 用 RAII 等待线程完成
                        class thread_guard
                        {
                        private:
                            std::thread& t;                        // [3]
                        public:
                            explicit thread_guard(std::thread& t_) // [1]
                                :t(t_){}                           // [2]

                            ~thread_guard()
                            {
                                // [4] 先 test joinable(), 再 join()
                                if( t.joinable() )
                                {
                                    t.join();     
                                }
                            }

                            // [5] copy 和 赋值 delete  
                            thread_guard(thread_guard const&)=delete; 
                            thread_guard& operator=(thread_guard const&)=delete;
                        };

                        struct func; // defined in list2.1

                        void f()
                        {
                            int state = 0;

                            func f(state);

                            std::thread t(f);
                            
                            // `新线程对象` 作 arg 引用传递 构造 `线程保护对象`  
                            thread_guard g(t); 
                            
                            do_something_in_current_thread();
                        } 
                    
                (3) 解决 3

                    若 不必 等待线程结束

                        `detach & copy data into thread`
 
                            可避免 `异常安全` 问题 

                                `detach` 会 `打破 线程 与 std::thread 对象` 间 `关联`
                                    
                                    只要 detach 线程 先于 main 函数 退出, 
                
        1.4 后台 / background 运行线程

            1   detach 线程: std::thread instance上调 detach()`

                `detached 线程` 特点

                [1] 在后台(typically 长时间)运行

                    称 `守护线程`
                    
                    应用
                        1> 监控文件系统
                        
                        2> 实现发后即忘/fire and forget work: 
                            只管消息发送, 不管消息接收

                [2] 无法 获得 关联它的 std::thread object 

                    => 无法直接 communicate with it
                            it can `no longer be joined 
                    => 无法等待它完成`

                [3] `ownership 和 control 被传给 C++ runtime 库` 
                
                    保证了 线程关联资源 
                        
                        在线程结束时 被正确回收

            2   application : word processor
            
                `可一次编辑多个文档`

                UI级和内部 处理的一种方法:

                `1 个窗口 -> 1 个被编辑文档`

                `edit_document (编辑文档) 作 线程函数` + 其中 通过 `与用户交互 获取 用户欲执行动作`
            
                若为 打开新文档 
                    
                    `起 新线程 & 递归调 线程函数 edit_document` 
        
                else
                    执行相应 操作

                //List 2.4 Detach a thread to handle other document

                //para: filename
                void edit_document(std::string const& filename)
                {
                    //1. open file
                    open_document_and_display_gui(filename);
                    
                    //2. Loop, until edit is done/finished
                    while( !done_editing() ) 
                    {
                        // 3. get user's input(cmd)     
                        user_command cmd=get_user_input();
                        
                        //(1) if user's cmd is open new file
                        if(cmd.type==open_new_document)
                        {
                            //(2) get filename from user
                            std::string const new_name=get_filename_from_user();
                        
                            //(3) start new thread to open: 
                            //the operations of new and current thread are the same.
                            //So, entry function reuse the caller function 
                            std::thread t(edit_document,new_name); 

                            //(4) detech new thread
                            t.detach();                            
                        }
                        else // 5. otherwise, process according to user's choise
                        {
                            process_user_input(cmd);
                        }
                    }
                }

    #2  传参 给 线程函数

        2.1 thread ctor 接口

                ——————————————————————————————————————————————————————————————————————————————————————————  
                线程函数    |       thread ctor 的 paraList
                ——————————————————————————————————————————————————————————————————————————————————————————
                非成员函数  |    [1] callable obj    [2] 相应 `函数调用运算符` 的 para... 
                ——————————————————————————————————————————————————————————————————————————————————————————
                成员函数    |   [1] `成员函数指针`    [2] 相应 对象的指针  [3] 相应 `成员函数` 的 para... 
                ——————————————————————————————————————————————————————————————————————————————————————————
                
        2.2 内部机制

            1   `默认下` 
                
                std::thread Ctor 实参  
                
                    [1] copied
                        
                        副本 在新线程 context 中 `隐式转换 为 相应参数类型`
                
                    [2] 除非 用 `std::ref 等`

                        void f(std::string const& s);
                        std::thread t(f, "hello");
                
                如
                    实参 `字符串 字面量` 
                        
                        [0] 是 ptr:  char const*
                        
                        [1] copied
                        
                        [2] 副本 converted 为 std::string
                            
                        [3] 表面上 引用传递 
                            
                            实际是 ptr copy 作 arg 调 string Ctor
                            
                        问题 
                            
                            caller 函数 
                            
                                在 thread ctor 的 `ptr/ref 型 arg` 的 
                                    
                                    `副本 转换为 相应参数前 退出`
                                        
                                        `所指对象销毁` 
                                                
                                            `副本指针 悬挂`
                                            
                                                => undefined behavior
                
                        解决
                            [1] 先 强转 arg, 再 传给 thread ctor`
                            
                            [2] ptr 用 具有 `移动语义` 的 对象 管理
                            
                                std::thread t(f, std::string(buffer) ); 
                
            2   std::ref() / std::bind wrap 想 `引用传递` 的 `arg`
                                                |
                                                |   想 
                                                |/
                                            更新数据 
                [1] std::ref()
                
                    void f(Data& data);
                    Data data;
                    std::thread t(f, std::ref(data) );

                [2] std::bind   <functional>

                    std::thread ctor 与 std::bind 机制相同

                        X x;
                        std::thread t(&X::f, &x);   

        #3  转移 线程 ownership
            
            1   为什么要引出 move of std::thread

                    应对 2 种场景

                        线程 ownership 转交给  
                            ——————————————
                            [1] caller 
                            
                            [2] 其他 函数
                            ——————————————
                            
                (1) `ownership moved out of a function`

                    `形似 pass by value, 实际却是 move 语义`

                        std::thread caller()
                        {
                            void callee();
                            return std::thread(callee);
                        }

                (2) `ownership moved into a function`

                    void g(std::thread t);

                    void f()
                    {
                        void thread_func();             
                        g( std::thread(thread_func) ); 
                    }
                    
            2   move 机制 
            
                —————————————————————————————————————————
                [1] 隐含自动 move 
                        
                        左 或 右运算对象 至少一侧为 右值
                —————————————————————————————————————————
                [2] 显式 调 std::move()
                —————————————————————————————————————————
                
            2   thread_guard 与 scoped_thread

                    ————————————————————————————————————————————————————————————————————————————
                    是否转移线程 ownship  |   否                       |       是 
                    ————————————————————————————————————————————————————————————————————————————
                    成员                  |   左值引用 std::thread& t |   值  std::thread t
                    ————————————————————————————————————————————————————————————————————————————    
                    args 传递方式           |   左值引用 传递         |   值传递 + 移动语义 
                    ————————————————————————————————————————————————————————————————————————————
                    dtor 中 call join 前  |                           |
                        是否 check joinable | 是                       |       否 
                    ————————————————————————————————————————————————————————————————————————————
                    ctor 中              |                           |
                        是否 check joinable | 否: 啥也不干             |       是 
                    ————————————————————————————————————————————————————————————————————————————
                    
                    class scoped_thread
                    {
                    private:
                        std::thread t;
                    public:
                        explicit scoped_thread(std::thread t_)  // 隐式 move 
                            : t( std::move(t_) )                // 显式 move
                        {
                            if( !t.joinable() ) // Note
                                throw std::logic_error(“No thread”);
                        }
                        ~scoped_thread()
                        {
                            t.join(); 
                        }
                        
                        scoped_thread(scoped_thread const&)=delete;
                        scoped_thread& operator=(scoped_thread const&)=delete;
                    };

                    struct F; 

                    void f()
                    {
                        int local_state;
                        scoped_thread st( std::thread( F(local_state) ) ); 
                        
                        do_something_in_current_thread();
                    }

        #4  runtime 时 选择 动态数量的线程 : 用 线程 groups 来 divide work

            std::thread::hardware_concurrency()

                硬件支持的并发线程数 的 hint
                
                int hardware_threads = std::thread::hardware_concurrency();
                    
                int n_threads = std::min(hardware_threads != 0 ? hardware_threads : 2);
                    
                [1] 线程 vector 
                    
                    std::vector<std::thread> threads(n_threads);
                
                [2] 启动 n_threads 个子线程  
                    for i = 0 ~ n_threads-1
                        threads[i] = std::thread(
                                            f, block_start, block_end, std::ref(results[i] )
                                        );
                        
                [3] wait 子线程 
                
                    std::for_each(threads.begin(), threads.end(),
                        std::mem_fn(&std::thread::join) );      
                        
        #5  识别线程:目的 `把 data 或 behavior 与 线程 关联起来`

            1   线程 identifier 是 std::thread::id type

                std::thread::id master_thread;
                
                void f()
                {
                    if( std::this_thread::get_id() == master_thread )
                    {
                        do_master_thread_work();
                    }
                    
                    do_common_work();
                }
        
            2   线程ID 可作 关联容器 中 `key`

3 在线程间 共享 data

image.png image.png
    2个目标 
        
        [1] 避免潜在问题 
        
        [2] 最大化收益

    `并发 好处`

        线程间 `共享数据` 容易、直接 

    1.1 线程间 共享 数据

        双向链表 delete node == update next + update prev
        
            只 update next 时, `invariant 被 打破了`
            
                若此时另一线程 正 access this node & 没做保护处理
                
                    => 该 `race condition` 会 引起 bug

        1   问题: race condition 竞争条件 

            `结果取决于 多个线程` 上 `操作` 执行的 `相对顺序` 
                
                多个线程 竞争地去执行 各自的 `操作`
                                                |
                                                |/
                                              抢票
            
            data race 
                
                对象 并发修改 对象 
                    
                    => undefined behavior

        2   解决
        
            (1) `用 mutex wrap 数据结构`
                
                    以保证 `只有 真正执行修改的那个线程 可看到 invariants 被打破的状态`

            (2) lock-free 编程

                    修改 数据结构的设计 及其 invariant
                    
                        `分离 变化` 成 `n 个 不可分割的 更小变化`
                        
                            每个小变化 保护 其 invariant

            (3) STM / software transactional(事务) memory

                    数据库更新
                    
                        update 数据结构 == process a transaction/事务

                            `store` series of `数据修改和读` to a `事务log` 
                                
                                commit it in a step

                                    `commit 失败, 事务重启`

    2.1 mutex 机制
            
        1   `mut`ually `ex`clusive 互斥
            
            保护 共享数据
            
                access 前    lock    其关联的 `mutex` 
                
                access 后    unlock

        2   mutex 并不是 silver bullet 银弹

            更重要的是 
            
                [1] `设计代码结构` 来 `保护数据`       2.2节 
                
                [2] `接口中内在地避免 race conditions`  2.3节

        3   mutex 问题
            
                [1] 死锁 
                
                [2] 保护范围 要多大 

    2.2 mutexes 使用 

        1   mutex 创建/lock/unlock

            创建   

            上/解 锁
            
                成员函数 lock()/unlock()

        2   对 mutex 不推荐 手工 调 lock()
                
                原因: mutex 是一种 资源 
                
                    与 对 thread 不推荐手工调 join() 一样 
                        ———————————————————————————————————————————————
                            资源  |   资源管理类 / RAII 类 
                        ———————————————————————————————————————————————
                        thread 对象 | thread_guard 或 scoped_thread    
                        ———————————————————————————————————————————————
                        mutex  对象 | std::lock_guard
                        ———————————————————————————————————————————————
                        
                            //list3.1 Protect list with mutex
                            #include <list>
                            #include <mutex>
                            #include <algorithm>

                            std::list<int> lst;     //1 单个 全局变量
                            std::mutex mut;         //2 用 相应 全局 std::mutex 保护

                            void add_to_list(int val)
                            {
                                std::lock_guard<std::mutex> guard(mut); 
                                lst.push_back(val);
                            }
                            bool list_contains(int val) 
                            {
                                std::lock_guard<std::mutex> guard(mut); 
                                return std::find(lst.begin(),lst.end(),val) != lst.end();
                            }

            |
            |
            |   重构
            |
            |       [1] 函数              作 public  成员 
            |
            |       [2] 被保护数据 + mutex  作 private 成员
            |/          
    2.3 Structuring code 以 保护 共享数据

        1   问题
            
            `迷途( stray ) ptr/ref` 和 `后门`
        
                若 成员函数 将 被保护数据 的 `ref/ptr 传出` lock 的 scope
                                                        |
                                                        |   如 
                                                        |/      
                            ——————————————————————————————————————————————————————————————————————————————————
                            [1] `隐晦地 传出` 
                                    
                                    成员函数 para 为 `( 用户策略 ) 函数对象` 
                                                                    |
                                                                    |   函数调用运算符 operator() 的 para 为 
                                                                    |/
                                                            ref / ptr 参数 
                            ——————————————————————————————————————————————————————————————————————————————————      
                            [2] 显而易见 的 传出 
                            
                                    return
                            ——————————————————————————————————————————————————————————————————————————————————
                            
                    => 留了 `后门 ( backdoor )`

                        => 任何能 访问 ptr/ref 的 code 可
                            
                                `绕过(bypass) mutex 的 保护` access `被保护数据`
                                        |
                                        |/
                                    不用 lock mutex
                                    
                (1) code 结构 
                    
                    ——————————————————————————————————————————————————————————
                    [1] ProtectedData + mutex  作 其 管理类(DataWrapper) 成员 
                    ——————————————————————————————————————————————————————————
                    [2] 成员函数 para 为 `函数对象`
                            
                            接受 `用户(恶意 malicious)策略函数`
                                            |
                                            |   [3]
                                            |/
                                ——————————————————————————————————————————————
                                1] `引用参数` => 可 `取 被保护数据 的 ptr`
                                
                                2] 绕过 mutex 保护 access 被保护数据
                    ——————————————————————————————————————————————————————————
                    
                (2) 根因
                    
                        没有把 access 数据 的 所有代码段 标记为 `互斥` 

                            `漏掉了 保护 传出的 被保护数据 的 ref`

                (3) 更深的 陷阱 ( pitfall )
                    
                    接口函数间 调用顺序( 接口设计 ) 导致 race conditions — 2.3节
            
            class Data  // [] ProtectedData        
            {
                int a;
                std::string b;
            public:
                void mf();
            };
            
            class DataWrapper      
            {
            private:
                Data data;      // [1] ProtectedData + mutex  作 其 管理类(DataWrapper) 成员 
                std::mutex m;           
            public:     
                // [2] 成员函数 para 为 `函数对象`
                //      接受 `用户(恶意 malicious)策略函数`
                template<typename F>
                void processData(F f)   
                {
                    std::lock_guard<std::mutex> l(m); 
                    f(data); 
                }
                // 略去 ctor
            };

            // === client 
            Data* gpd; 

            // [3] 用户(恶意 malicious)策略函数: 1] `引用参数` => 可 `取 被保护数据 的 ptr` 
            void maliciousFunc(Data& protected_data)  
            {
                gpd = &protected_data; 
                
                //  2] 绕过 mutex 保护 access 被保护数据
            }

            DataWrapper x;

            void f()
            {
                x.processData(maliciousFunc); 
                gpd->mf();
            }

    2.4 在接口本身中 发现 ( Spotting ) race conditions

        1    双向 list 
                
                删除 1个 node
                    
                    线程安全
                        
                        阻止 前中后共 3个节点 的 并发 accesses

            问题 

                单独保护 每个节点
                    仍有 race conditions

            解决 
                
                `单个 mutex` 保护 `整个 数据结构(list)`

        2   线程安全 的 栈

                // List 3.3 std::stack 的 接口
                template<typename T, typename Container = std::deque<T> >
                class stack
                {
                public:
                    explicit stack(const Container&);
                    explicit stack(Container&& = Container() );
                    template <class Alloc> explicit stack(const Alloc&);
                    template <class Alloc> stack(const Container&, const Alloc&);
                    template <class Alloc> stack(Container&&, const Alloc&);
                    template <class Alloc> stack(stack&&, const Alloc&);

                    size_t size() const;
                    
                    // 
                    bool empty() const;
                    
                    T& top();
                    T const& top() const;

                    void pop();
                    
                    void push(T const&);
                    void push(T&&);
                    
                    void swap(stack&&);
                };

            (1) 问题 
            
                    调用顺序 为 empty() -> top() -> pop() 时, `not 线程安全`

                [1] 线程1 empty() top() 间 线程2 pop()

                        empty() 判非空 -> pop() 使栈 空 -> top()

                            原因: 接口设计导致
                        
                                stack<int> s;
                                if(!s.empty() ) 
                                {
                                    int const value = s.top(); 
                                    s.pop(); 
                                    do_something(value);
                                }

                [2] 线程1 top() pop() 间 线程2 top()

                        `2个线程 本打算 分别 处理 顶值和次顶值, 实际 都 处理 顶值` 

                            次顶值 没处理 就被移除了

            (2) 解决 
            
                            ( 有 隐患 )
                                |\
                                |
                [1] mutex + 联合调 top 和 pop

                        若 stack 对象 copy ctor 可能 抛出异常 

                            Herb Sutter 给出 solution

                [2] pop() = std::stack 的 empty() + top() + pop()
                
                    ————————————————————————————————————————————————
                    1]  `构造 栈元素类型 新值` + pass by reference
                    +
                    2]  赋值 给 构造的新值 = `引用 实参/形参`
                    ————————————————————————————————————————————————
                    
                    缺点 
                        
                        1]  构造 代价太高 for some type
                        
                        2]  popped value type 必须 `可 赋值`
                            
                                许多 用户定义类型 不支持 assignment
        
                [3] 用 copy ctor 或 move ctor 不抛出异常 的类型

                        C++11 的 `右值引用`
                                        
                            使 更多类型 move ctor 不会抛出异常, 虽然 copy ctor 会

                    缺点
                    
                        不通用
                            
                            用户自定义类型 
                                1]  copy ctor 可能 抛出异常
                                
                                2]  没有 move ctor
        
                [4] `return ptr` 指向 popped item

                        `ptr 可安全 copy, 不会 抛出异常`

                    缺点
                        
                        要内存管理, 简单类型时 开销大
                            
                            用 std::shared_ptr

                [5] 解决 [2] + [3] 或 [4]
                        
                    接口简化
                        
                        例 [2] + [4]
                            5个操作 变 3个 好处
                                    empty   empty
                                    top
                                    pop     pop: 2 个 重载版本 [2] [4]
                                    push    push
                                    swap 
                            
                                mutex 保护 std::stack 上 `完整 3 操作` empty() -> top() -> pop()
                    
                                    ——————————————————————————————————————————————————————————————————————————————————————————————————
                                                              | [4]                                     |   [2]
                                    ——————————————————————————————————————————————————————————————————————————————————————————————————          
                                    函数原型                  | std::shared_ptr<T> pop()                |   void pop(T& value)
                                    ——————————————————————————————————————————————————————————————————————————————————————————————————      
                                    para                      | 空                                       |   模板参数 T `引用类型`   
                                    ——————————————————————————————————————————————————————————————————————————————————————————————————                          
                                    实现关键                  | 似 placement new + args 转发           |   1] pop 的 caller 负责 构造
                                        取出 top 元素         | pop() 负责 构造                         |   2] 赋值 给 构造的新值 = `引用 实参/形参`  Note: 只是 引用参数, 不是 取出/top 栈中 元素的引用
                                            怎么办 ?         | std::shared_ptr<T> const res(           |   value = stk.top(); // T 可 赋值                            
                                    用于`构造 栈元素类型 新值`|        std::make_shared<T>( stk.top() ) ); |   
                                    ——————————————————————————————————————————————————————————————————————————————————————————————————  
                                    记住 解决 [4] 易 => 解决 [2] 等 
                                    ——————————————————————————————————————————————————————————————————————————————————————————————————  
                    
                                template< class T, class... Args >
                                    shared_ptr<T> make_shared( Args&&... args )             //  <memory>
            
                                        T 非 数组 
                                         |\
                                         |  Note
                                         |
                                    构造 T 型 object
                                        
                                        wrap 进 std::shared_ptr

                                            就像 
                                                ::new (pv) T(std::forward<Args>(args)...)   // placement new + args 转发 
                                                        |
                                                        |   internal 
                                                        |/
                                                    internal void*
                                                    
                                                    
                    // 解决 [4]: 记住 解决 [4] => 解决 [2] 等 
                    template<typename T>
                    std::shared_ptr<T> threadsafe_stack::pop() 
                    {
                        std::lock_guard<std::mutex> lock(m);
                        
                        if( stk.empty() ) 
                            throw empty_stack(); 
                        
                        std::shared_ptr<T> const res( 
                            std::make_shared<T>( stk.top() ) ); 
                            
                        stk.pop(); 
                        
                        return res;
                    }
                            
                            
                        // 用 sloution [2] + [4] 实现 接口 无 race conditions
                        #include <exception>
                        #include <memory>
                        #include <mutex>
                        #include <stack>

                        struct empty_stack: std::exception
                        {
                            const char* what() const throw();
                        };

                        template<typename T>
                        class threadsafe_stack
                        {
                        private:
                            std::stack<T> stk;   // [1] internal std::stack
                            mutable std::mutex m;   
                        public:
                            threadsafe_stack(){}
                            
                            threadsafe_stack(const threadsafe_stack& rhs)
                            {
                                // [2] copy ctor: under mutex
                                std::lock_guard<std::mutex> lock(rhs.m);
                                
                                stk = rhs.stk; 
                            }
                            
                            // [3] Assignment deleted & have no swap()
                            threadsafe_stack& operator=(const threadsafe_stack&) = delete;

                            bool 
                            empty() const
                            {
                                std::lock_guard<std::mutex> lock(m);
                                return stk.empty();
                            }

                            // 解决 [4]: 记住 解决 [4] => 解决 [2] 等 
                            std::shared_ptr<T> 
                            pop() 
                            {
                                std::lock_guard<std::mutex> lock(m);
                                
                                if( stk.empty() ) 
                                    throw empty_stack(); 
                                
                                std::shared_ptr<T> const res( 
                                    std::make_shared<T>( stk.top() ) ); 
                                    
                                stk.pop(); 
                                
                                return res;
                            }

                            // 解决 [2] : two overloads all empty()->top()->pop()
                            void 
                            pop(T& value)
                            {
                                std::lock_guard<std::mutex> lock(m);
                                
                                if(stk.empty() ) 
                                    throw empty_stack();
                                    
                                value = stk.top(); // T 可 赋值 // dif
                                 
                                stk.pop();
                            }

                            void push(T val)
                            {
                                std::lock_guard<std::mutex> lock(m);
                                stk.push(val);
                            }
                        };

        3   lock 粒度/范围
                ————————————————————————————————————————————————————————————————————
                [1] 大 + 1个 mutex        |   并发增益小
                ————————————————————————————————————————————————————————————————————
                [2] 小 + 1个 mutex        |   没有完全保护期望的操作 => race condition
                ————————————————————————————————————————————————————————————————————
                [3] 更小 + 多个 mutex   |   死锁: 两线程相互等待, 结果谁也不往前走
                ————————————————————————————————————————————————————————————————————
                
    2.5 死锁 解决

        1   死锁: 2个线程, 2个mutex
        
            2线程 都要 lock 2个 mutex 才能执行
                2个线程 各锁 1个mutex 时, 都在 等对方锁住的 mutex 被释放
                    2个线程都在死等
                    
                    
            场景: 无锁 case 下的 死锁

                `2 个 thread func` 在 对方 `线程对象` 上 调 join()
                        |
                        |                                   std::thread t1;
                        |/                                  std::thread t2(f2, std::ref(t1) );
                    para 为 `线程对象 ref`                   t1 = std::thread(f1, std::ref(t2) );
                                                            void f1(std::thread& t_) { t_.join(); }
                        死锁: 都等 对方结束             void f2(std::thread& t_) { /* 休眠 5秒  */ t_.join(); }
                    
    2.6 避免死锁 的方法

        思想 
            线程1 可能等 线程2 时, 线程2 就不要 等线程 1 了

            1   避免 嵌套锁

            2   当 持有1个锁 时, 避免调 `用户代码`
                                            |
                                            |/
                                    可能 获得 另一个 锁 
                                        
                                        => 可能会 嵌套锁

            3   每个线程 以 相同顺序 获得多个锁
                
                总在 mutex1 前 锁住 mutex2
                    
                [1]   适用于: 不同 mutexes for 不同 purposes
                        |
                        |    [2] 问题:  不能 解决 的 死锁
                        |/
                    2个 mutex 保护 同 class 2个 instance
                            swap    
                        |
                        |   解决 
                        |/
                    std::lock() + std::lock_guard + std::adopt_lock 作 第 2 参数 
                        |                                   |
                        |                                   |/
                    1次 lock 多个 mutex                    告诉 std::lock_guard Ctor 
                                                                
                                                            [1] 其 第1参数 mutex 已上锁
                                                            
                                                            [2] 构造时只需 接收 mutex 上 mutex 的 ownership

                        |   [3] 问题: 不能 解决 的 死锁
                        | 
                        |       多个锁 被 `分开 获得`
                        |/
                        
                                //List 3.6 swap 操作中用 std::lock() + std::lock_guard + std::adopt_lock
                                class A;

                                void swap(A& lhs, A& rhs);

                                class X
                                {
                                private:
                                    A a;
                                    std::mutex m;
                                public:
                                    X(A const& sd):a(sd){}
                                    friend void swap(X& lhs, X& rhs)
                                    {
                                        if(&lhs==&rhs) 
                                            return; 
                                            
                                        std::lock(lhs.m,rhs.m); //一次锁住2个mutex
                                        std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock); 
                                        std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock); 
                                        
                                        swap(lhs.a, rhs.a);
                                    }
                                };
                                
                
            4   锁层次

                [1] application 分 多层

                [2] 某层 已拥有 某 mutex 上的锁
                        
                        就 `不允许 上一层 lock 另一 mutex`

                    // List 3.7 用层次锁避免死锁
                    
                    1]  调 high_level_func()
                    
                            lock high_level_mutex 
                            -> 调 low_level_func(): 
                            lock low_level_mutex, 返回值作为high_level_stuff() 参数 
                            => 遵守层次锁规则

                    2]  lock other_mutex(低层次值)
                    
                        -> 调 high_level_func():
                        lock 高层次值的 mutex 
                        => 不 遵守层次锁规则, 
                        hierarchical_mutex 报告错误, runtime 时失败
                        
                    3]  用户自定义 mutex 类 hierarchical_mutex 
                            
                            实现 符合 mutex 概念的 
                                
                                3个成员函数: lock()      unlock()    try_lock()  
                            
                                    => 可用于 std::lock_guard

                            // 1. hierarchy value of 10000
                            hierarchical_mutex high_level_mutex(10000);

                            //2. 
                            hierarchical_mutex low_level_mutex(5000);

                            int do_low_level_stuff();

                            int low_level_func()
                            {
                                std::lock_guard<hierarchical_mutex> lk(low_level_mutex); 
                                return do_low_level_stuff();
                            }
                            void high_level_stuff(int some_param);
                            void high_level_func()
                            {
                                std::lock_guard<hierarchical_mutex> lk(high_level_mutex); 
                                high_level_stuff( low_level_func() ); 
                            }

                            // ---------------------
                            void thread_a()  // 遵守 层次锁 规则
                            {
                                high_level_func();
                            }

                            // ---------------------
                            hierarchical_mutex other_mutex(100); 
                            void do_other_stuff();
                            void other_stuff()
                            {
                                high_level_func(); 
                                do_other_stuff();
                            }

                            void thread_b() // 不遵守 层次锁 规则
                            {
                                std::lock_guard<hierarchical_mutex> lk(other_mutex); 
                                other_stuff();
                            }

                            
                            //List 3.8 hierarchical mutex

                            class hierarchical_mutex
                            {
                                // 4个成员变量
                                //1. 内部标准 mutex, 被 委托 真正 调 lock() unlock() try_lock()
                                std::mutex internal_mutex; 
                                    
                                //2. 该 mutex 当前层次值
                                // 1)ctor 中 初始化 为指定值
                                // 2)比 本线程 层次值 小时, 检查通过
                                // 3)更新时, 最后 赋值 给 本线程层次值
                                unsigned long const hierarchy_value;

                                //3.该mutex 先前层次值
                                // 1)ctor 中 初始化 为0, 先不用
                                // 2)更新时, 先被 赋值 为 本线程层次值
                                // 3)unlock()中 用于 恢复 本线程层次值
                                unsigned long previous_hierarchy_value; 
                                
                                /* 4. 本线程 层次值
                                1) 1个线程 可能 lock 多个 层次mutex, 
                                保存1个 本线程层次值, 
                                以保证 `lock 的 层次 mutex 链`中,
                                `各 层次 mutex 的 层次值 依次减小`; 否则, 抛出 逻辑错误 异常
                                => 用 thread_local 
                                2) 又因为 属于class 而是 instance 
                                => 用 static
                                */
                                static thread_local unsigned long this_thread_hierarchy_value; 
                                
                                // 2 个 私有函数: 检查 & 更新
                                void check_for_hierarchy_violation()
                                {
                                    if(hierarchy_value >= this_thread_hierarchy_value)
                                    {
                                        throw std::logic_error(“mutex hierarchy violated”);
                                    }
                                }

                                void update_hierarchy_value()
                                {
                                    // 1)记录 次新的 本线程层次值, 以 
                                    // `沿着 已 lock 的 各 层次 mutex`, 
                                    // `逐层恢复出 原来的 本线程层次值`
                                    previous_hierarchy_value = this_thread_hierarchy_value; 

                                    // 2)更新 最新的 本线程层次值, 
                                    // 以 lock 下一层上的 层次mutex       
                                    this_thread_hierarchy_value = hierarchy_value; 
                                }

                            public: // 4 个 接口函数
                                explicit hierarchical_mutex(unsigned long value):
                                    hierarchy_value(value), previous_hierarchy_value(0){}
                                    
                                // 检查 层次锁->委托 标准 mutex lock-> 更新 线程层次值
                                void lock() 
                                {
                                    check_for_hierarchy_violation();
                                    internal_mutex.lock(); 
                                    update_hierarchy_value(); 
                                }

                                // 恢复 本线程 次新层次值 -> unlock
                                void unlock() 
                                {        
                                    this_thread_hierarchy_value = previous_hierarchy_value; 
                                    internal_mutex.unlock();
                                }

                                // 与 lock() 区别: mutex 的lock已被另一线程拥有时,返回false
                                bool try_lock() 
                                {
                                    check_for_hierarchy_violation();

                                    if( !internal_mutex.try_lock() ) 
                                        return false;

                                    update_hierarchy_value();
                                    return true;
                                }
                            };

                            //本线程 层次值 初始化为 最大值
                            thread_local unsigned long
                                hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);

    2.7 std::unique_lock 实现更灵活的 locking

        1   思想

                松弛 invariants

                    并不总是拥有 mutex

            代价
                内部 要存/更新 1个 flag 来标识是否拥有 mutex
                    
        2   std::defer_lock 作 第2参数, 告诉 std::unique_lock Ctor, 构造时 `mutex unlocked`

        3   std::unique_lock 对象

            2种方法 加锁

                1) 其 上调用 lock()

                2) 传给 std::lock() 
                        
                        因为 std::unique_lock 提供 3 个成员函数
                        
                            lock() try_lock() unlock() 

        4   std::lock_guard & std::unique_lock

            同 都可 
        
                RAII 锁管理

            异
                1) 成员函数
                
                    前者  只有 ctor & dtor: ctor 中 加锁 + dtor 中 解锁

                    后者  还有 3 个成员函数 lock() try_lock() unlock()  => 更灵活

                2) 是否可 `管理 mutex 的 lifetime` 
                        
                        否/是

                            =>前者 最好将 mutex 设为 global 变量
            
            swap 的  std::unique_lock 版本 
                
                std::unique_lock + std::defer_lock 作 第2参数 + std::lock(uniLkm1, uniLkm2); 
                                        |
                                        |/
                                    告诉 std::unique_lock Ctor, 构造时 `mutex unlocked`
                
                class A;
                void swap(A& lhs,A& rhs);

                class X
                {
                private:
                    A a;
                    std::mutex m;
                public: 
                    X(A const& a_) : a(a_){}
                    
                    friend void swap(X& lhs, X& rhs)
                    {
                        if(&lhs == &rhs) 
                            return;

                        std::unique_lock<std::mutex> uniLkm1(lhs.m, std::defer_lock); 
                        std::unique_lock<std::mutex> uniLkm2(rhs.m, std::defer_lock); 
                        
                        std::lock(uniLkm1, uniLkm2); 
                        
                        swap(lhs.a,rhs.a);
                    }
                };

    2.8 转移 mutex ownership between scope

        1   mutex 的 ownership 可在 std::unique_lock 间 move

            std::unique_lock: movable 但 not copyable

        2   std::unique_lock 3 种灵活应用

            (1) 函数 lock mutex, transfer 该 mutex 的 ownership 给其 caller 的 lock
            
                std::unique_lock<std::mutex> 
                get_lock()
                {
                    extern std::mutex mut; // extern mutex
                    std::unique_lock<std::mutex> lk(mut);
                    prepare_data();

                     // 目的是右值 => implicitly move
                    return lk;
                }
                void processData()
                {
                    //  源是右值 => implicitly move
                    std::unique_lock<std::mutex> lk(get_lock()); 
                    do_something();
                }

            (2) lock 不直接返回, 而是作 gateway class 的 mem, 
                    
                    所有 access to data 都通过 gateway 类

                        gateway 
                        
                            [1] destory 时, releases lock
                            [2] movable
                    
            (3) std::unique_lock 可在 `销毁前 释放 其 管理的 mutex` 
                                        |
                                        |
                                        |/
                                    unlock() 中
                => 提高性能

    2.9 合适粒度下 上锁

        `真正 访问 共享数据 时 再 lock` mutex 
            
            左侧 和 右侧 在不同时刻相等

                (可能有场景需要得到这种信息)
                    两次 read 之间, 两侧值可能已经被改了
        
            void get_and_processData()
            {
                // 加锁 -> 获取1块处理数据 -> proces 前 解锁  
                // -> process -> process 后 再加锁 -> process 结果 写到result
                std::unique_lock<std::mutex> my_lock(the_mutex);
                some_class data_to_process = get_next_T();
                my_lock.unlock(); 
                result_type result=process(data_to_process);
                my_lock.lock();
                write_result(data_to_process,result);
            }

            //List3.10 Lock one mutex at a time 
            // in a comparison operator
            class Y
            {
            private:
                int a;
                mutable std::mutex m;
                
                int get_detail() const
                {
                    std::lock_guard<std::mutex> lock_a(m); 
                    return a;
                }
            public:
                Y(int sd):a(sd){}
                
                friend bool operator==(Y const& lhs, Y const& rhs)
                {
                    if(&lhs==&rhs)
                        return true;
                    int const lhs_value=lhs.get_detail();       
                    int const rhs_value=rhs.get_detail(); 
                    return lhs_value==rhs_value; 
                }
            };

    3.1 `初始化期间` 保护 共享数据

        1   延迟 初始化 
        
                共享数据 只在 `初始化 并发 access 阶段` 才用 mutex 保护 
                    |
                    |   如: 打开 数据库连接 
                    |/
                  构造代价                  
                    
                (1) 单检测: `资源 (指针)` 是否 已初始化
                    
                    1)  思想
                    
                        资源 使用前检测
                            若 没初始化  
                                先 初始化, 再 解锁, 再 使用
                                    |
                                    |/
                                    spr.reset(new R);
                    
                    2)  code 结构
                        —————————————————————————————————————————————————
                        [1] 加锁 => 所有线程 串行
                        [2] 检测
                        [3] 初始化
                        [4] 解锁
                        [5] 使用 资源指针
                        —————————————————————————————————————————————————   
                            加/解 锁 
                                std::unique_lock + 其 成员函数 unlock()
                        —————————————————————————————————————————————————
                        
                    3)  问题
                    
                        第1次检测 前 加锁 => 检测处 `串行` => `并发性差`
                                
                        |
                        |   (半)解决
                        |/
                 
                (2) 双检测锁 ( Double-Checked Locking ) 
                    
                    1)  思想 
                        
                        `第2次检查 前 才加锁`   
                    
                    2) code 结构 
                        
                        —————————————————————————————————————————————————
                        [1] 第1次检测
                        [2] 加锁
                        [3] 第2次检测 
                        —————————————————————————————————————————————————   
                            加/解锁
                                std::lock_guard + scope 结束 Dtor 解锁 
                        —————————————————————————————————————————————————   
                    3)  问题
                    
                            1条 new 语句 分解为 3 条底层语句
                                ——————————-————
                                [1] 分配 内存 
                                [2] 构造 
                                [3] 指针赋值 
                                ————————————————
                                    compiler 可能重排执行顺序为 [1][3][2]

                                        线程 1 资源 `构造未完成 -> 资源 ptr 就 已赋值`

                                            线程 2 检测 资源 ptr 非空 
                                            
                                                `访问 初始化未完成` 的 资源
                                                    
                                                    data race 型 race condition
                                                        
                                                        undefined behavior

                            
                    
                        // 单检测      
                        std::shared_ptr<R> spr; 
                        std::mutex mut;
                        void f()
                        {
                            std::unique_lock<std::mutex> lk(mut); 
                            if(!spr)
                            {   
                                spr.reset(new R); 
                            }
                            lk.unlock(); 
                            spr->mf();
                        }

                        // 双检测 
                        void f() 
                        {
                            if(!spr)
                            {  
                                std::lock_guard<std::mutex> lk(mut);  
                                if(!spr) 
                                {
                                    spr.reset(new R); 
                                }
                            } 
                            spr->mf(); 
                        }   
            
            解决
                
                std::call_once(std::once_flag, 资源初始化函数, func_args)

                    [1] 多线程 `并发进入` std::call_once 函数 + `同步` 
                        
                            但 `只有1个线程 ( active 线程 ) 真正执行` 第 2 arg: (资源初始化) `函数`
                                
                                其他线程 ( passive/被动 线程 ) 进入 std::call_once 后 
                                    
                                    `等待` active 线程 完成初始化 + 返回 后, 再返回
                                        |
                                        |/
                                        同步 
                                                    
                    [2] Note    
                        
                        当前线程 从 std::call_once 返回时, 
                        
                            资源初始化 已完成, 但可能由 另一线程 完成

                    [3] std::call_once 参数           
                        
                        template< class Callable, class... Args >
                        void call_once( std::once_flag& flag, Callable&& f, Args&&... args ) // <mutex>             
                        
                        ——————————————————————————————————————————————————————
                        1] 第1参数     std::once_flag 
                        ——————————————————————————————————————————————————————
                        2] 其余参数 与 std::thread Ctor
                            ——————————————————————————————————————————————————  
                            1> 形式 同     `右值引用 args` 
                            ——————————————————————————————————————————————————  
                            2> 目的 不同    `避免 copy, 而不是 实现 move 语义`
                            
                                                因为 不必转移到 另一线程 去执行
                        ——————————————————————————————————————————————————————      

                    [4] std::once_flag
                        
                            功能相当于 std::mutex
                            
                                同 `不能 copy`

                    1) 作 namespace-scope

                        std::shared_ptr<R> pr;        // 资源指针
                        std::once_flag rFlag;         //std::once_flag

                        void initResource() { pr.reset(new R); }
                        void f()
                        {
                            std::call_once(rFlag, initResource); 
                            pr->mf();
                        }

                    2) 作 类的 (private)成员
                                
                        class X
                        {
                        private:
                            connection_info connection_details;
                            connection_handle connection; // resource handle
                            std::once_flag connection_init_flag;

                            void 
                            open_connection()
                            {
                                connection=connection_manager.open(connection_details);
                            }

                        public:
                            X(connection_info const& connection_details_):
                                connection_details(connection_details_) {}

                            void 
                            send_data(data_packet const& data)
                            {
                                std::call_once(connection_init_flag, &X::open_connection, this); 
                                connection.send_data(data);
                            }

                            data_packet 
                            receive_data()
                            {
                                std::call_once(connection_init_flag,&X::open_connection, this);
                                return connection.receive_data();
                            }
                        };

        2   `static local 变量` 初始化 时 race condition

            (1) static 局部变量 初始化

                [1] 可能 真正在多个线程上 进行(C++11 前编译器) => problematic race condition

                [2] 只能 真正在1个线程上 进行 (C++11 compiler 解决)

                    => race condition 只是说 `哪个线程 去 初始化`

            (2) 应用 

                `单例` 
                
                    需要 `单个 全局 instance` 时
                        
                        实现 `像 std::call_once 一样` 的 功能

                        // 单例
                        class A;

                        A& get_my_class_instance()
                        {
                            static A a; 
                            return a;
                        }

    3.2 很少更新 的 数据结构 的 保护

            boost 库 boost::shared_mutex

第4章 同步并发操作 cv & future & async & packaged_task & promise

image.png image.png image.png image.png image.png
    多线程 2个要点
        
        [1] 保护 `共享数据`
                
                第 3 章 
            
        [2] 线程 `同步`

            线程 需 `等待` 另一线程 完成某任务后, 再去完成 自己的任务

                实现
                
                    cv & futures
                    |
                    |   cv: 类 std::condition_variable
                    |/
                    条件变量/condition variables 

#1 循环等待 事件/条件: cv

    ##1.1   最简单的 方法 
    
        (1) code 结构 // 等待线程

                    |—— ——> `标志变量 flag` = true /false 表示条件 是/否满足
            加锁  |    - - - - -
                    |            |
            check 条件 是否满足  |
                                 |  => 周期性 ( 即 循环 )
                否                |          +
                                 |          等待
                    解锁 - - - - -            |\
                                            |
                    sleep 固定时间 // Note: sleep 期间, other 线程 可获得锁 并 `modify 条件 使满足`
        
                是
                    函数退出 => 自动解锁 
                    
        (2) 问题
        
            sleep 多久 难确定

                太短: 浪费太多处理时间

                太长: 事件早已发生, 等待线程还在 sleep
                            
                            1]  丢帧              快节奏游戏 中 
                            
                            2]  过多消耗 时间片        实时应用   中 

        (3) 解决
        
            cv
    
                bool flag; 
                std::mutex m;

                void wait_for_flag() 
                {
                    std::unique_lock<std::mutex> lk(m); 
                    
                    while(!flag)
                    {
                        lk.unlock(); 
                        std::this_thread::sleep_for(std::chrono::milliseconds(100) ); 

                        lk.lock();  
                    }
                }
            
    ##1.2 cv class

            1   `同步` 基础
            
                可用于 同时 `block/阻塞 多个线程`
                
                    until 通知线程 
                    
                        1]  修改 a `shared variable` 
                                        |
                                        |   用于表示 
                                        |/
                                        条件 
                                                
                        2]  notifies cv

                    ——————————————————————————————
                    通知线程 modify & notify cv
                                |
                        shared variable
                                |
                    等待线程 check  & wait on cv 
                    ——————————————————————————————

            2   2 个问题   
                
                (1) 问题1 
                
                    精确调度事件
                    
                        => 可能 `notify called on destroyed cv`
                    
                    解决1
                        
                        notify under lock 可能必要
        
                (2) 问题2 
                    
                    notify 发送时, 等待线程 没有 waiting on cv
                    
                        => `错过 notification`

                    原因 
                    
                        ————————————————
                        shared variable 
                        ————————————————
                            1] `atomic`     
                        ————————————————        
                            2] 未加锁 
                        ————————————————
                            Note 
                            
                                atomic copy delete 
                                    => 
                                        不能用 cv 上 predicated wait 版本
                            
                                            即 不能为 
                                                
                                                cv.wait(lk, [] { return proceed; } ); //  std::atomic<bool> proceed(false);
                                            
                    解决 
                        
                        1) 用 cv 上 non-predicated wait 版本 
                                        
                                while ( !proceed ){ cv.wait(lk); }

                        2) shared variable 即使是 atomic 
                            
                            修改 也要 加锁 
            
            3   cv 类 
                
                [1] copy deleted

                [2] notify_one & notify_all

                [3] wait 2个重载
                    
                    1] non-predicated 版本        (如) while ( !proceed ){ cv.wait(lk); }  
                        
                        非循环
                        
                            `释放` lock 
                                
                                `阻塞` current thread & 加到 cv 上的 `等待线程 队列`
                                    
                                    notified 或 spuriously wake up 
            
                                        `解阻塞` 
                                            
                                            `relock` + `wait exits`
                                
                    2] predicated wait  版本      (如) cv.wait(lk, [] { return proceed; } );
                        
                        循环 + 条件
                                |   
                                |/
                                满足 = p() == true 时, 返回 
                        
                            ————————————————————————————————————————————
                            与 non-predicated 版本  
                                ————————————————————————————————————————    
                                区别 
                                    
                                    仅多了1步
                                    
                                    relock 后, (循环) 检测 条件是否满足 
                                ————————————————————————————————————————
                                联系 
                                    
                                    视为 循环调 non-predicated 版本
                            ————————————————————————————————————————————        
                                    
                                template<typename Pred>
                                void std::condition_variable::wait(unique_lock<mutex>& uniLk, Pred pred)
                                {
                                    while ( !pred() )
                                        wait(uniLk);    // 视为 non-predicated 版本
                                }

                                //等价于
                                while ( !pred() ) 
                                {
                                    wait(uniLk);
                                } 
        
            4   等待线程

                (1) notify_one
        
                    通知线程 + 多个等待线程 
                            
                        没必要 持有 the same mutex

                            否则
                                
                                `悲观锁` 

                                    hurry up and wait
                    
                                原因
                                    
                                    awakened 后 可能无法 获得 mutex ( 被 通知线程 占着 )
                            |
                            |   解决 
                            |/
                        pthreads 实现  
                            
                            1]  `识别` 悲观锁场景 
                        
                            2]  `notify 内` 把 等待线程   
                                    
                                    从 `cv 上的 队列` 转移到 `mutex 上的 队列` 
                                        
                                        而 `不 唤醒它`

                (2) notify_all

                    `惊群现象`
                        
                        事件发生
                            
                            `唤醒所有` 所有等待线程
                            
                                但 `只能有1个线程 获得事件处理权`
                                
                                    `其他线程 重新 陷入等待`          
                
            5   spurious wakeup / 假唤醒 
                        
                    notify_one/notify_all 之外的因素导致的 
                        
                        `wait 被唤醒, 但 条件不满足`
                            
                            |
                            |   解决  
                            |/  
                         predicate wait 版本 
             
                [1] 只影响 non-predicated wait


                [2] 不影响 predicated wait
                    
            6   应用

                //------ App
                threadsafe_queue<T> que; 

                void producer()
                {
                    while( more_data_to_prepare() )
                    {
                        T const data = prepare_data(); 
                        que.push(data);  // [1] 真正 push 前 加锁: que.push() 中 internal std::queue 真正 push 前  
                    }
                }

                void consumer()
                {
                    while(true)
                    {
                        T data;
                        que.wait_and_pop(data);
                        process(data); // [2] process 前 解锁: wait_and_pop() scope 结束 + 自动解锁
                        if( is_last_chunk(data) )
                            break;
                    }
                }
                    
                // 记住这 1个 足够 => others 
                template<typename T>
                std::shared_ptr<T> 
                threadsafe_queue::wait_and_pop()
                {
                    std::unique_lock<std::mutex> lk(mut);
                    
                    // 捕获 this 目的: 可在 lambda 中用 当前类的 成员 
                    cv.wait(lk, 
                                [this]{ return !que.empty();} );
                                
                    std::shared_ptr<T> res( 
                        std::make_shared<T>( que.front() ) );
                        
                    que.pop();
                    
                    return res;
                }
        
                (1) List 4.1 等待 data 被处理: 用 std::condition_variable

                    std::mutex mut;
                    std::queue<T> que;               
                    std::condition_variable cv; 

                    void producer() 
                    {
                        while(more_data_to_prepare())
                        {
                            T const data=prepare_data(); 
                            
                            std::lock_guard<std::mutex> lk(mut); 
                            
                            que.push(data);              
                            cv.notify_one();             
                        }
                    }

                    void consumer()  
                    {   
                        while(true) 
                        {       
                            std::unique_lock<std::mutex> lk(mut); 
                            cv.wait(
                                lk, []{ return !que.empty(); } ); 
                                
                            T data = que.front(); 
                            que.pop();  
                                 
                            lk.unlock();  // unlock mutex                  
                            process(data);
                            
                            if( is_last_chunk(data) )
                                break;
                        }
                    }
                        |
                        |   (2) 重构: 同步被限制在 queue 本身, 可 极大减少 同步问题 和 竞争条件 
                        |
                        |   提取  thread-safe queue: using cv
                        |   
                        |   借鉴 chapter3 thread-safe stack's thought:联合 front() and pop()
                        |/  
                
                    //------ App
                    threadsafe_queue<T> que; 

                    void producer()
                    {
                        while( more_data_to_prepare() )
                        {
                            T const data = prepare_data(); 
                            que.push(data);  // [1] 真正 push 前 加锁: que.push() 中 internal std::queue 真正 push 前  
                        }
                    }

                    void consumer()
                    {
                        while(true)
                        {
                            T data;
                            que.wait_and_pop(data);
                            process(data); // [2] process 前 解锁: wait_and_pop() scope 结束 + 自动解锁
                            if( is_last_chunk(data) )
                                break;
                        }
                    }
                    
                    // ------ threadsafe_queue
                    #include <queue>
                    #include <memory>
                    #include <mutex>
                    #include <condition_variable>

                    template<typename T>
                    class threadsafe_queue
                    {
                    private:
                        mutable std::mutex mut;   
                        std::queue<T> que; 
                        std::condition_variable cv;
                    public:
                        threadsafe_queue(){}
                        threadsafe_queue(threadsafe_queue const& rhs) 
                        {
                            std::lock_guard<std::mutex> lk(rhs.mut);
                            que = rhs.que;
                        }
                        
                        void 
                        push(T new_value)
                        {
                            std::lock_guard<std::mutex> lk(mut);
                            
                            que.push(new_value);
                            
                            cv.notify_one();
                        }

                        // 记住这 1个 足够 => others 
                        std::shared_ptr<T> 
                        wait_and_pop()
                        {
                            
                            std::unique_lock<std::mutex> lk(mut);
                            
                            // 捕获 this 
                            cv.wait(lk, 
                                        [this]{ return !que.empty();} );
                                        
                            std::shared_ptr<T> res( 
                                std::make_shared<T>( que.front() ) );
                                
                            que.pop();
                            
                            return res;
                        }
                        
                        void 
                        wait_and_pop(T& value)
                        {
                            std::unique_lock<std::mutex> lk(mut);
                            
                            cv.wait(lk, 
                                        [this]{return !que.empty();});
                                        
                            value = que.front(); 
                            
                            que.pop();
                        }

                        std::shared_ptr<T> 
                        try_pop()
                        {
                            std::lock_guard<std::mutex> lk(mut);
                            
                            if(que.empty())  // dif
                                return std::shared_ptr<T>(); // empty std::shared_ptr
                                
                            std::shared_ptr<T> res(
                                    std::make_shared<T>(que.front() ) );
                                    
                            que.pop();
                            
                            return res;
                        }
                        
                        bool 
                        try_pop(T& value)
                        {
                            std::lock_guard<std::mutex> lk(mut);
                            
                            if( que.empty() )
                                return false;
                                
                            value = que.front(); // dif 
                            
                            que.pop();
                            
                            return true;
                        }

                        bool empty() const //7. const function
                        {
                            std::lock_guard<std::mutex> lk(mut);
                            return que.empty();
                        }
                    };

                        Since locking a mutex is a mutating operation, 
                        the `mutex object must be` marked `mutable`

#2 等待 `1次性事件`: future

    线程 以某种方式
        
        获得 future -> 表示事件
        
    场景: 等待航班

    ##2.0 std::future

        `等待 被 异步设定 的 value`

        template< class T > class future;       (1)
        template< class T > class future<T&>;   (2)
        template<>          class future<void>; (3)

        1   异步 机制
                          准备好
            异步操作     - - - -> result  + 修改 shared state
                |                 /\                 |
                | 提供             /  hold             |  link
                |/              /                    |
                std::future 对象 —— —— —— —— —— —— ——  
                |       |\
                |  给   |    query / wait for / extract result: 阻塞, 直到 result 准备好 
                |/      |
                 creator
                    std::async / std::packaged_task / std::promise

            future shared_future async packaged_task promise

            除 async 为 为 function template 外, 其余为 class template

    ##2.1 std::async

        `异步 runs a function` ( 可能 in a new thread)
            
            返回 std::future
                
                 future 上调 get(), 当前线程 阻塞, 直到 future ready, 然后 return result
        
        传参
            与 std::thread 传参方式相同
        
        
    ##2.2 std::packaged_task
    
        std::packaged_task<> obj 绑定 future to callable object

        work.get_future() 返回 future 对象
        
    ##2.3 std::promise
        
        1   针对2种场景

        [1] work 不能被表达为 函数调用
        
        [2] 结果 来自多个地方

        2   机制

            // 3大成员函数
            
            [1] get_future()
            
                返回 promised 结果 关联的 future

            [2] set_value() : 主动线程
            
                    会 make state/future ready

            [3] fut.get()   : 被动线程


        #include <iostream>
        #include <numeric> // std::accumulate
        #include <vector>
        #include <thread>
        #include <future>

        void accumulate(std::vector<int>::iterator first,
            std::vector<int>::iterator last,
            std::promise<int> prom)
        {
            int sum = std::accumulate(first, last, 0);
            prom.set_value(sum);
        }

        int main()
        {
            // 演示用 promise<int> 在线程间传递结果
            std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };

            //1. 构建 promise obj
            std::promise<int> prom;

            //2. get_future()
            std::future<int> futu = prom.get_future();

            //3. 启动新线程, 线程间 move promise
            std::thread work_thread(accumulate, 
                 numbers.begin(), 
                 numbers.end(),
                 std::move(prom) );
            
            //4. future 上 get()
            std::cout << "result = " << futu.get() << '\n';
            
            work_thread.join(); 
        }

    ##2.4 async & packaged_task & promise 机制 & 应用

    1   async
    
    
        //List4.6 用 std::future 得到一个 异步任务的返回值

        #include <future>
        #include <iostream>

        int find_the_answer_to_ltuae(); // long time 
        void do_other_stuff();

        int main()
        {
            // std::async 可以增加 参数, 作为 异步任务 函数的 args 
            std::future<int> the_answer=std::async(find_the_answer_to_ltuae); 
            do_other_stuff();
            std::cout<<"The answer is "<< the_answer.get() <<std::endl;
        }

        //List 4.7 std::async 中 传参数给 异步函数

        #include <string>
        #include <future>

        //-------class1
        struct X
        {
            void foo(int,std::string const&);
            std::string bar(std::string const&);
        };
        X x;

        //1. 通过 指针 p/&x 间接 提供对象:
        //   调 p->foo(42, "hello"), p 是 &x
        auto f1=std::async(&X::foo, &x, 42, "hello"); 

        //2. 直接 提供 (左值)对象 x: 
        //   调 tmpx.bar("goodbye"), tmpx is a copy of x
        auto f2=std::async(&X::bar, x, "goodbye");    

        //-------class2
        struct Y
        {
            double operator()(double);
        };
        Y y;

        //3. 调 tmpy(3.141), tmpy is move-constructed from Y()
        //   <=> tmpy is constructed from std::move( Y() )
        //(1 )参数 Y(): 调 Y 的 ctor, 产生 右值(无名对象) tmp
        //(2) 右值 Y() 被 wrap 成 std::move( Y() ), 
        //   作为 实参 调 Y 的 移动ctor, 产生 tmpy
        //(3) 通过 std::async 调 operator(), tmpy(3.141)
        auto f3=std::async( Y(), 3.141);    
             
        //4. std::ref (左值对象 y) 提供对象 & 调 y 的 operator() 
        //   => 调 y(2.718)
        auto f4=std::async(std::ref(y), 2.718);

        //5. 调 baz(x)
        // 异步 非 成员函数 & ref(类的对象)
        X baz(X&); 
        std::async(baz, std::ref(x) ); 

        //-------class3
        class move_only
        {
        public:
            move_only();
            move_only(move_only&&)
            move_only(move_only const&) = delete;

            move_only& operator=(move_only&&);
            move_only& operator=(move_only const&) = delete;

            void operator()();
        };

        //6. 调 tmp() , tmp is constructed from std::move( move_only() )
        // 与 3 同理
        auto f5=std::async( move_only() ); 


        (3)可用第1参数 `std::launch` 指定, `是让 std::async 起一个新线程; 还是 在 future 上等待时, 才让 任务运行`

        //1) 函数调用被 延迟, 直到 在 future 调用 wait()/get()
        std::launch::deferred

        //2) 函数调用 必须运行在 自己的线程上, 即 起一个新线程
        std::launch::async

        // 就像调用
        std::thread(std::forward<F>(f), std::forward<Args>(args)...)

        //3) 由实现决定 <=> 默认选择
        std::launch::deferred | std::launch::async

        // 新线程中 运行
        auto f6=std::async(std::launch::async, Y(), 1.2); 

        //2.1 future 上 wait() 或 get() 时 运行
        auto f7=std::async(std::launch::deferred, baz, std::ref(x) ); 

        //3. 实现选择
        //(1)默认
        auto f9=std::async(baz, std::ref(x) );     
        //(2)显式写出
        auto f8=std::async(                             
                        std::launch::deferred | std::launch::async,
                        baz, std::ref(x) );


        //2.2 唤起 延迟函数 / deferred function
        // 此时才调 work f7 的 函数 baz(x)
        f7.wait();                                       

    2   用 future 关联 任务 std::packaged_task

        (1) std::packaged_task<> 机制

        实参及实际返回类型 只要能正确地 隐式转换为 
        函数标签中的 参数类型 和 返回类型 即可


        1) 创建 std::packaged_task<> obj ( work )

        std::packaged_task<> ties a future to a function 或 callable object

        2) work.get_future() 返回 future 对象, 存 work函数 返回值/抛出的异常

        3) `唤起` std::packaged_task<> 对象时, 

            1> 以 相应的 `args 作实参 调该对象的 operator()`, 该 operator() 中再 `转调 work function 或 callable object 的 operator()`

            2> makes the future ready

        4) future对象 .get() 获得 返回值

        (2) std::packaged_task< > 特化版 部分接口

    //List 4.8 std::packaged_task< > 特化的  类(部分)定义
        template<>
        class packaged_task< std::string(std::vector<char>*, int) > 
        {
        public:
            template<typename Callable>
            explicit packaged_task(Callable&& f);

            std::future<std::string> get_future();
            void operator()(std::vector<char>*,int);
        };
        => `std::packaged_task 对象 是 callable 对象:` 可以
        1)被 `wrap` 进 std::function  对象
        2)作为 `线程函数` 传给 std::thread ctor
        3)传给需要 callable object 的函数
        4)直接唤起

    3   std::packaged_task 3种用法
        // std::packaged_task 3种用法
        #include <iostream>
        #include <cmath>
        #include <thread>
        #include <future>
        #include <functional>
         
        int f(int x, int y) { return std::pow(x,y); }
         
        void work_lambda()
        {
            std::packaged_task<int(int,int)> work([](int a, int b) {
                return std::pow(a, b); 
            });
            std::future<int> result = work.get_future();
         
            work(2, 9);
         
            std::cout << "work_lambda:\t" << result.get() << '\n';
        }
         
        void work_bind()
        {
            std::packaged_task<int()> work(std::bind(f, 2, 11));
            std::future<int> result = work.get_future();
         
            work();
         
            std::cout << "work_bind:\t" << result.get() << '\n';
        }
         
        void work_thread()
        {
            std::packaged_task<int(int,int)> work(f);
            std::future<int> fut = work.get_future();
            std::thread work_td(std::move(work), 2, 10);
            work_td.join();
            std::cout << "work_thread:\t" << fut.get() << '\n';
        }
         
        int main()
        {
            work_lambda();
            work_bind();
            work_thread();
        }

    4   2个应用

        4.1 若 任务 可 划分` 为多个 self-contained `子任务`

            -> 每个std::packaged_task<> instance `wrap` 1个子任务

            -> 该 instance 被 `传进 任务调度器 或 线程池` 

            => 好处: `调度器 只需要处理 各std::packaged_task<> instance`, 而不是 各函数

        4.2 `线程间 传递works`

                `GUI 框架: 用 指定的线程 更新 GUI`

                方法1: 指定一个线程 更新GUI, 某线程想 更新GUI, 给它发消息即可

                方法2: 

                共享/全局 std::packaged_task 任务队列: 放 gui work
                std::deque<std::packaged_task<void()> > works; 


                全局 mutex:
                1   何时 lock mutex?
                access 共享数据(结构) 时, lock

                2   何时 释放/unlock mutex
                access 完, 让 std::lock_guard<std::mutex> 对象 作用域 结束, 自动释放

                => 代码中 锁的范围 就很好确定

                `std::packaged_task 任务`:
                //1. 创建 obj : wrap Func
                std::packaged_task<void()> work(f);

                //2. push work to deque : 用 std::move()
                works.push_back(std::move(work));

                //默认 ctor : 对象 用于接收 提取的 work
                std::packaged_task<void()>work;

                //3. 提取 work: 用 std::move()
                work=std::move( works.front() ); 

                //4. invoke work 函数: with 函数参数(本例无参)
                //通过 调用 work 的 operator(),  转调 work 函数的 operator()
                work()

                gui 线程:
                循环 : 直到 收到 gui关闭消息 时 跳出
                    轮询(polling for) 要处理的 GUI messages, 比如 用户 clicks
                    根据队列是否为空 来决定, 
                        是从 work队列 front  返回 1个 gui work; 还是跳到下一循环
                    pop work -> 释放锁
                    执行 work
                    
                push 任务的线程:
                用接收的 函数 创建1个 work
                获得 fut = work.get_future()
                push work到 任务队列
                返回 fut

                调度线程:
                以 work 函数 启动 push任务的线程

                放 gui 消息 到 GUI 消息队列
                启动 gui 线程

                从 push线程 返回的 future: 调 get 获取 work 函数 返回值; 
                    或  丢掉 future

                发gui 结束消息 给 gui 线程

                // 合成完整的list4.9 用 std::packaged_task 在 GUI 线程上 运行代码
                #include <deque>
                #include <mutex>
                #include <future>
                #include <thread>
                #include <utility>

                std::mutex m;
                std::deque<std::packaged_task<void()> > works; 

                bool gui_shutdown_message_received();
                void get_and_process_gui_message();

                //-----1 GUI 线程
                void gui_thread()      
                {
                    while( !gui_shutdown_message_received() )
                    {
                        get_and_process_gui_message();     
                        
                        // local work: 接收 从 任务队列提取出的 任务
                        std::packaged_task<void()> work; 

                        {
                            std::lock_guard<std::mutex> lk(m);
                            if( works.empty() )          
                                continue;
                                
                            // 从work队列中 提取 front 任务         
                            work=std::move( works.front() ); 
                            works.pop_front();       
                        }// 释放 锁 => 用 块作用域 {}

                        work();    //  运行任务
                    }
                }
                std::thread gui_bg_thread(gui_thread);

                //-----2 post_work_for_gui_ 的线程
                template<typename Func>
                std::future<void> post_work_for_gui_thread(Func f)
                {
                    std::packaged_task<void()> work(f);    
                    std::future<void> res=work.get_future();

                    std::lock_guard<std::mutex> lk(m);

                    works.push_back( std::move(work) );  
                    return res;                    
                }

                // -----3 调度线程
                //(1)在 future 上等待 : 可 知道 任务是否已完成
                //(2)丢掉 future


    ###2.6.3 promises

    1   针对2种场景

        [1] work 不能被表达为 函数调用
        
        [2] 结果 来自多个地方

    2   机制

        // 3大成员函数
        
        1   get_future()
        
            返回 promised 结果 关联的 future

        2   set_value() : 主动线程
        
                会 make state/future ready

        3   fut.get()   : 被动线程


    3   应用: `网络连接 数 n > 1`

        `网络侧 起多线程, 单个线程上 处理 单连接:` 

        问题:连接数过多时, 消耗 OS 资源过多, 上下文切换过多(线程数超过硬件支持的并发数)

        解决: `网络侧 起多线程, 单个线程上 处理 多连接 & 用 promise / future structure`


        网络侧 循环处理 来自终端的 多连接 中 每个连接, 连接分2种

        (1)上传数据的连接
            1)取出 接收data
            2)用 接收data的ID 获得 连接关联的 promise
            3)promise 结果 : set_value为 data 的 payload/有效载荷

        (2)下载数据的连接
            1)数据发送队列 中 取出 发送data(与上传线程中 接收data 类型不同)
            2)发送 data
            3)data关联的 promise 结果 : set_value 为 true, 以告诉  对方线程 发送成功

        //List 4.10 用 promises 处理 来自1个线程的 多个 connections
        #include <future>
        void process_connections(connection_set& connections)
        {
            //1 循环, 直到 处理完所有连接, 即 done() 返回 true
            while( !done(connections) ) 
            {
                for(connection_iterator connection=connections.begin(),end=connections.end();
                    connection!=end;
                    ++connection)
                {
                    if( connection->has_incoming_data() )    
                    {
                        data_packet data=connection->incoming(); 
                        std::promise<payload_type>& p=
                            connection->get_promise(data.id); 
                        p.set_value(data.payload);            
                    }
                    
                    if( connection->has_outgoing_data() )   
                    {
                        outgoing_packet data=                
                            connection->top_of_outgoing_queue();

                        connection->send(data.payload);      
                        data.promise.set_value(true);  
                    }
                }
            }
        }

    ###2.6.4  用 future 保存 exception

        1   std::async 抛出异常 时, future 内部 存储 异常 & future become ready
        -> fut.get() 会再次抛出异常`

            double square_root(double x)
            {
                if(x<0)
                {
                    throw std::out_of_range(“x<0”);
                }
                return sqrt(x);
            }
            
            std::future<double> f=std::async(square_root,-1);

            double y=f.get();

        2   用 promise 的 set_exception() 来 make future ready`

            extern std::promise<double> some_promise;

            try
            {
                some_promise.set_value(calculate_value());
            }
            catch(...)
            {
                some_promise.set_exception(std::current_exception()); // std::current_exception(): 恢复异常
            }

    ###2.6.5 从多线程中 等待 

        1   多线程 access 1个 std::future/std::shared_future 对象时, `data race` 

            `对future, 并发 access 没有意义; 对shared_future, 用1个 锁来 保护 并发 access`


            第1次调用get()之后, future 就没有值可以获取
            -> 只有1个线程可以恢复future中的值
            -> 并发 access 没有意义
            
            std::promise<int> p;
            std::future<int> f( p.get_future() );
            assert(f.valid());  // future f is valid

            std::shared_future<int> sf( std::move(f) );
            assert(!f.valid()); // f 不再 valid
            assert(sf.valid()); // sf 是 valid

            // shared_future
            std::promise<std::string> prom;

            //Implicit transfer of ownership 
            //  for rvalue of type std::future<std::string>
            std::shared_future<std::string> sf( prom.get_future() ); 

            std::future::`share(): creates a new std::shared_future and transfers ownership to it directly`

        2   总结

            `future 可以 关联 或 不关联 数据`

            `一旦 事件发生, future 就不能被重置`

            //(1)类模板: <> 内 参数为 关联的数据类型 

            //(2)不 关联数据
            std:future<void> /std::shared_future<void> 

    ##4.2 `消息传递` + `同步`

        1   CSP 通信序列处理

            若 `无 shared state, 则 每个线程 可以 完全独立, 是一个 状态机: 

            收到 1个消息 
            -> 更新 自身状态 & 发消息 给其他线程 & 处理 是基于初始状态 
            => 有限状态机模型`

        2   代码分成 3个独立线程

            (1) 设备处理 线程
            (2) ATM 逻辑 线程
            (3) 与银行 communicate

            仅 `通过 `传递消息`, 而不是 `共享数据`, `3个线程就可以 communicate—消息传递风格`

            如: 当用户 插入卡 / 按下按键 时,
            设备处理线程 给 逻辑线程 发一个消息; 
            而 逻辑线程 给 设备处理线程 发消息 提示取多少钱

            // List 4.15  simple implementation of ATM logic class
            struct card_inserted
            {
                std::string account;
            };

            class atm
            {
                messaging::receiver incoming;
                messaging::sender bank;
                messaging::sender interface_hardware;
                
                // function pointer : void (*state)(); 
                // => member function pointer: void (atm::*state)(); 
                void (atm::*state)(); 
                
                std::string account;
                std::string pin;
                
                void waiting_for_card() //1
                {
                    //2 send “waiting insert card” message to interface
                    interface_hardware.send( display_enter_card() ); 
                    
                    incoming.wait()     //3
                        .handle<card_inserted>(
                            [&](card_inserted const& msg) //4
                            {
                                account = msg.account;
                                pin="";
                                
                                // send "input PIN" message to interface
                                interface_hardware.send( display_enter_pin() ); 
                                
                                // member function pointer state reassigned:  
                                state = &atm::getting_pin; 
                            }
                        );
                }
                
                void getting_pin(); 
                
            public:
                void run()    //5
                {
                    state = &atm::waiting_for_card; //6
                    
                    try
                    {
                        for( ; ; )
                        {
                            //7. call function ( signature: void (), return void, no Args ) 
                            //   by member function pointer atm::*state 
                            //   firstly call waiting_for_card() circularly, then call getting_pin()
                            (this->*state)(); 
                        }
                    }
                    catch(messaging::close_queue const&) { }
                }
            };
上一篇下一篇

猜你喜欢

热点阅读