第5章: 并发与实用功能

2022-06-20  本文已影响0人  my_passion

5.0 建议

    1 用 资源句柄 管理资源
    2 用 unique_ptr 访问 多态类型的对象
    3 用 shared_ptr 访问 共享对象
    4 用 类型安全的机制 处理 并发
    5 最好避免 shared data
    6 从 并发执行任务, 而不是 thread 角度 思考

5.1 资源管理

1 unique_ptr 与 shared_ptr : <memory>

    资源: 符合 先获取后释放 的 东西
        内存/锁/套接字/线程句柄/文件句柄
        |
        |   不及时释放
        |/
     资源泄露
        |
        |
        |/
    标准库 组件 不会出现 资源泄露
        用 RAII 确保 `资源 依存于 其所属对象, 而 不会超过 其对象的 lifetime`
    
     
    (1) SP: unique_ptr vs. shared_ptr
        unique_ptr/shared_ptr 所有权 唯一/共享
            移动/copy 
            
        unique_ptr  
            可以把 自由存储 上的对象 传递给/出 函数
            
        shared_ptr
            1) copy 而非 移动
            2) 最后1个 shared_ptr 销毁时, 所管理的 资源 才被销毁
            3) 使对象的 lifetime 变得不那么容易掌控
            4) 没有指明 哪个 owner 有权读写 资源
                数据竞争 / 数据混淆 依然存在
                
(2) `何时应该用 SP / ` vector 等 资源句柄(vector / thread)`
        
答: 指针语义 / 对象集合

5.2 并发

1 task(任务) 和 thread(线程)

    (1) task
    
        可视为与 caller 并发执行的 function
            只需 传参 / 获取结果 / 保证 2个 task 不竞争使用 共享数据 
        
        形式: func / funcObj
        
    (2) std::thread 对象: <thread>
        task 作 实参
        
        线程间通信
        
        [1] 同步
        
            1] 调用-返回 
                传参
                返回结果
                
            2] 共享数据 
            
            3] cv
            
        [2] 异步

2 传参

thread ctor: 可变参数模板

3 返回结果

    (1) 通过参数: 不优雅
            
        默认 值传递 copy
        std::ref() 才引用传递
                
        注意2种 handle 参数: 指针 / 容器
            值传递 -> copy 的是 handle, resource 不 copy  

    (2) 异步
    
    // eg: 传参 + 返回结果(通过参数)  
    void f(vector<double>& v, double* res); // 从 v 获取输入, 结果放 *res

    class F
    {
    private:
        vector<double>& v; // 输入 源
        duoble* res;       // 输出 目标
    public
        F(vector<double>& vv, double* p): v(vv), res(p){}
        void operator()(); 
    };
    
    duoble res1;
    double res2;
    thread t1 {f, vec1, &res1};
    thread t2 {F(vec2), &res2};

4 shared data

    |
    | 访问 必须 
    |    
(1) `同步`
        以确保 同一时刻 最多有1个 task 能访问
    |
    | 机制
    |/
    
 mutex / 锁: <mutex>
    
    thread 
        lock()/unlock() 操作 可 获取/释放 1个 mutex
    
    unique_lock: 对 mutex 的 RAII 
        
        若 other 线程 已获取 mutex
            当前线程 `等待/阻塞`, 直到 that 线程 完成对共享数据的访问

        
    mutex m; // 控制 共享数据 访问 的 mutex
    int sh;  // 共享数据
    
    void f()
    {
        unique_lock<mutex> lck {m}; // 获取 mutex
        sh++;                       // 处理 共享数据
    } // 隐式 释放 mutex

    | 共享对象 与 mutex 间 对应关系, 靠 程序员 去维护, 易出错
    |
    | 解决: 
    |/
            
    1)  mutex 与 shared dada 关联, 形成类, 访问 shared data 前 先 lock mutex  
        
        class Record
        {
        public:
            mutex rm;
            int sh;
        };
    
        同时 访问多个资源 来 执行1个操作
            |
            | 可能导致
            |/
          死锁
                thread1 获取了 mutex1 然后 试图获取 mutex2, 而同时 thread2 已获取了 mutex2 然后 试图获取 mutex1
                => 2 个任务 都无法继续执行了
            |
            |   解决
            |       同时获取多个锁
            |/
    
    2) 推迟加锁 defer_lock + 同时获取 全部锁 lock(lck1, lck2);
    
        void f()
        {
            // ...
            unique_lock<mutex> lck1 {m1, defer_lock};
            unique_lock<mutex> lck2 {m2, defer_lock};
            // ...
            lock(lck1, lck2);
            // ... 处理 共享数据 ...
        }                               // 隐式 释放所有 mutex    

(2) cv

    condition_variable 机制
            
    thread1 等待 条件/事件(线程2完成某工作后)  发生

    // === 2个 thread 通过 queue 传递消息
    #include <condition_variable>
    class Message{
        // ...
    };
    
    queue<Message> mq;
    condition_variable cv;
    mutex mut;
    
    // 1)
    void producer()
    {
        while(true)
        {
            Message msg;
            // fill msg
            
            unique_lock<mutex> lck {mut}; // === lock 
            mq.push(msg);
            cv.notify_one();              // 通知
        }                                 // === unlock implicitly 
    }
    
    // 2)
    void consumer()
    {
        while(true)
        {
            unique_lock<mutex> lck {mut};
            while (cv.wait(lck) ); // 释放 lck 并 等待, 被唤醒后 重新获取 lck
                                    
            auto msg = mq.front(); // extract msg
            mq.pop();
            
            lck.unlock();          // 释放 lck: explicitly
            // 处理 msg
        }
    }

5 任务通信

    在 task (抽象层), 而不是 线程和锁(底层) 上操作
    
    user 不用 care 锁 
    
    <future>
    
    (1) future + promise
    
        task 间 value/exception 传递
            2个 task 运行于 独立 线程
            
            step
                [1] task1 建 promise, 对 promise 调 get_future 获取 promise 内部的 future
                
                [2] task1 将 promise 引用传递 给 task2
                
                [3] task2 中 对 promise 调 set_value(...)/set_exception(...): 
                        内部 将 value/exception 设给 promise 的 future
                
                [4] task1 对 future 调 get(), block, 直到 future 为 ready 态/抛出异常, get() 返回 value /exception
        
        另1种形式 
            [1] caller 建 promise, 对 promise 调 get_future() 获得 promise 内部的 future 
            
            [2] caller 将 promise 引用传到 task1
            
            [3] task1 中 对 promise   调 set_value()/set_exception()
            
            [4] caller 将 future 引用传到 task2 
            
            [5] task2 调 future.get() 获得 value 
            
    (2) packaged_task
        
        1) task2 的 return value 传给 task1   
                |
        转化为 |  wrap/封装 task2 到 packaged_task
                |  packaged_task 把 task 的 return value/异常 放入 packaged_task 内部的 future
                |/
        task1 与 packaged_task 间 value/exception 传递 
            task1 对 packaged_task 调 get_future()
            task1 对 获得的 future 调 get()
        
        task1 与 packaged_task 运行于 独立 线程 
        
        2) 类模板
                
            [1] 模板参数: task 类型
                    
                ctor 的参数: task
                    
            [2] note
                packaged_task 不能 copy 
                为 packaged_task 启动线程时, 必须 move(packaged_task_obj)
                                            
    (3) async()
    
        不必再操心 `锁 和 线程`
        
        1) 思路
        
            类似 调用函数 的方式 启动任务
        
            `将 task 当作` 可以与 other task 并发执行 的 `函数` 来处理
        
        2) 机制: 2个分离
            
            [1] 分离 函数调用的 `调用部分` 和 `获取结果部分`
            
            [2] 分离 这2部分 与 任务的实际执行
    
        3) 优势
        
            简单 又不失其强大性
                能满足广泛的需求
        
        4) 限制
        
            对 share data 且 需要锁机制 的 task 不要用 async()
            
            用 async() 内部到底启用 几个 thread, 由 async()实现 决定, user 不知道
        
        template< class Function, class... Args >
        std::future<typename std::result_of<
            typename std::decay<Function>::type(
                typename std::decay<Args>::type...)
                    >::type>
            async( std::launch policy, Function&& f, Args&&... args );
            
            ArgsType 
            FunctionType
            FunctionReturnType

    // ===(1)
    #include <future>
    #include <thread>
    #include <chrono>
    #include <iostream>

    void f(std::promise<int>& pr)
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        int res = 1;
        pr.set_value(res);
    }

    void g(std::future<int>& fut)
    {

        int v = fut.get();
        std::cout << v << std::endl;
    }

    void test()
    {
        std::promise<int> pr;
        std::future<int> fut = pr.get_future();

        std::thread t1(f, std::ref(pr));
        std::thread t2(g, std::ref(fut) );
        t1.join();
        t2.join();
    }
    int main()
    {
        test();
    }

    // ===
    void f(promise<X>& px) // promise 任务: 将 结果 放 promise 中
    {
        // ...
        try
        {
            X res;
            // ... 计算 1个 值, 保存在 res 中
            px.set_value(res);
        }
        catch(...)
        {
            // 将异常传给 future 的线程
            px.set_exception( current_exception() );
        }
    }
    
    void g(future<X>& fx) // future 任务: 从 future 获取 结果
    {
        // ...
        try
        {
            X v = fx.get(); // 如必要, 等待 值 准备好
            // ... 使用 v
        }
        catch(...)
        {
            // ...
        }
    }

    // === (2)
    #include <iostream>
    #include <thread>
    #include <future>

    int f(int x, int y) { return x+y; }

    // 1] 任务 类型: 任务 / 函数 signature
    using TaskType = int(int, int);

    void task_thread()
    {
       
        // 2] wrap task
        std::packaged_task<TaskType> pt(f);

        // 3] 请求 future
        std::future<int> fut = pt.get_future();

        // 4] 为 packaged_task0 / 1 启动线程
        std::thread t(std::move(pt), 1, 2);
        t.join();

        // 5] 获得 结果
        std::cout << fut.get() << '\n';
    }

    void task_lambda()
    {
        std::packaged_task<TaskType> pt(
            [](int a, int b) {return a + b; });

        std::future<int> fut = pt.get_future();

        pt(1, 2);

        std::cout << fut.get() << '\n';
    }

    void task_bind()
    {
        // Note: bind 哪个参数后, TaskType 中 哪个参数去掉  
        std::packaged_task<int()> pt( std::bind(f, 1, 2) );

        std::future<int> fut = pt.get_future();

        pt();

        std::cout << fut.get() << '\n';
    }

    int main()
    {
        task_thread();
        task_lambda();
        task_bind();
    }                           
    
    // === (3)
    #include <iostream>
    #include <future>
    #include <thread>

    int f(int x) 
    {
        x++;
        return x;
    }

    int main()
    {
        // std::launch::deferred 当执行到 fut.get() 时, 才开始 创建线程
        std::future<int> fut = std::async(std::launch::deferred, f, 1);
        std::cout << fut.get() << std::endl; 
    }
promise-future.jpg

5.3 小工具 组件

1 类型函数

`编译时求值` 的 函数, 它 接受 `类型实参` 或 返回 `类型结果`
    
(1) iterator_traits 机制: <iterator>

    `容器类型` 
        |   template <typename C>
        |   using Iterator_type = typename C::iterator;     
        |
        |   迭代器类型 再用别名 封装1层
        |   // Note: Iterator_type<C> 前没有 typename, 编译器已知道 template <typename C> Iterator_type<C> 为类型     
        |   template <typename C>    
        |   using Iter = Iterator_type<C>; 
        |/
    `容器迭代器类型`
        |           
        |   template <typename Ite>     
        |   using Iterator_category = typename     
        |       std::iterator_traits<Ite>::iterator_category;    
        |/      
    迭代器 category 
        |   
        |   对象: 构建 `标签值`, 指示 迭代器 的 category 
        |       Iterator_category<Iter>{}    
        |/      
    标签分发
    
        形参: vector / list 的 迭代器 category 为 random_access_iterator_tag / forward_iterator_tag
        
            对 list 排序
                list copy 到 vector -> vector 上 sort -> vector copy 回 list
            
        `类型魔法` 发生在 选择 helper 函数时

    // template< class RandomIt >
    // void sort(RandomIt first, RandomIt last);

    #include <iostream>
    #include <iterator> // std::bidirectional_iterator_tag / std::random_access_iterator_tag
    #include <vector>
    #include <list>
    #include <algorithm>    // copy, sort
    #include <type_traits>  // std::remove_cv

    template <typename C>
    using Iterator_type = typename C::iterator;

    template <typename Ite>
    using Iterator_category = typename std::iterator_traits<Ite>::iterator_category;

    // vector sort
    template <typename Iter>
    void sort_helper(Iter beg, Iter end, std::random_access_iterator_tag)
    {
        std::sort(beg, end);
    }

    // list sort
    template <typename Iter>
    void sort_helper(Iter beg, Iter end, std::bidirectional_iterator_tag)
    {
        //decltype(*beg) is int&: because *beg is not a identifier, but is a lvalue
        using U = typename std::remove_reference< decltype(*beg) >::type;
        // <=>
        //auto identifier = *beg;
        //using U = decltype(identifier);
        
        std::vector< U > v{ beg, end };     // list init(copy to) vector
        std::sort(v.begin(), v.end());      // sort vector
        std::copy(v.begin(), v.end(), beg); // vector copy to list 
    }

    template <typename C>
    void sort(C& c)
    {
        using Iter = Iterator_type<C>;

        auto beg = c.begin();
        sort_helper(beg, c.end(), Iterator_category<Iter>{});
    }

    int main()
    {
        std::vector<int> vec{ 1, 3, 2 };
        std::list<int> lst { 1, 2, 5, 4, 3 };
        sort(vec);
        sort(lst);
    }
            
(2) 类型谓词 <type_traits>
    
    标准库 类型谓词 是 类型函数
    负责回答 关于类型的问题
    
    bool b1 = is_pod<int>();
    
    has_virtual_destructor
    is_base_of

2 pair 和 tuple <utility>

    equal_range 返回 迭代器 pair
    
    pair 
        提供 = == <
        其 元素得 支持 这些运算
        
        希望从 接口函数 返回2个结果
            (结果本身, 结果好不好的标志位) -> pair
        
    make_pair()/make_tupe() 可创建 pair/tuple
    
        vector<string> vec;
        auto p = make_pair(vec.begin(), 2); // pair<vector<string>::iterator, int>
        auto p_first = p.first;
        auto p_second = p.second;
        
        auto t = make_tuple(string("hello"), 1, 1.2);
        string s = get<0>(t);
上一篇下一篇

猜你喜欢

热点阅读