并发控制

2020-06-06  本文已影响0人  小跑001

1 c++中的锁和条件变量实现状态同步

#include <queue>
#include <string>
#include <mutex>
#include <thread>
#include <chrono>
#include <iostream>

using std::vector;
using std::string;
using std::queue;
using std::mutex;
using std::thread;

class Message {
 public:

     void Put(const string& msg) {
         std::lock_guard<std::mutex> guard(msgs_lock_);
         msgs_.push(msg);
     }

     bool Get(string& out_msg) {
         std::lock_guard<std::mutex> guard(msgs_lock_);
         if (!msgs_.empty()) {
             out_msg = msgs_.front();
             msgs_.pop();
             return true;
         }
         return false;
     }

 private:
     queue<string> msgs_;
     mutex msgs_lock_;
};

    
Message g_message_mgr;

void produce() {
    for(;;) {
        string msg = "some rand msg";
        g_message_mgr.Put(msg);
        std::this_thread::sleep_for(std::chrono::seconds(2));
    }
}

void cosumer() {
    for(;;) {
        string msg;
        if (g_message_mgr.Get(msg)) {
            // do sth with msg
            std::cout << msg << std::endl;
        }
    }
}

int main() {
    thread t1(produce);
    thread t2(cosumer);
    t1.join();
    t2.join();

    return 0;
}

class Message {
 public:

     void Put(const string& msg) {
         std::unique_lock<std::mutex> guard(msgs_lock_);

         bool need_notify = false;
         if (msgs_.empty()) {
             need_notify = true;
         }

         msgs_.push(msg);

         if (need_notify) {
             cv_.notify_all();
         }
     }

     bool Get(string& out_msg) {
          std::unique_lock<std::mutex> guard(msgs_lock_);
         if (!msgs_.empty()) {
             out_msg = msgs_.front();
             msgs_.pop();
             return true;
         } else {
             cv_.wait(guard, [this]{return msgs_.empty() == false;});
         }
         return false;
     }

 private:
     queue<string> msgs_;
     mutex msgs_lock_;
     condition_variable cv_;
};
class Message {
 public:

     enum {
         GET_NORMAL,
         GET_NONE,
         GET_OVER,
     };

     bool Put(const string& msg) {
         std::unique_lock<std::mutex> guard(msgs_lock_);
         if (is_over_) {
             return false;
         }

         bool need_notify = false;
         if (msgs_.empty()) {
             need_notify = true;
         }

         msgs_.push(msg);

         if (need_notify) {
             cv_.notify_all();
         }

         return true;
     }

   bool Get(string& out_msg) {
         for(;;) {
             int ret = get(out_msg);
             if (ret == GET_NORMAL) {
                 return true;
             } else if (ret == GET_NONE) {
                 continue;
             } else if (ret == GET_OVER) {
                 return false;
             } else {
                 assert(false);
             }
         }
     }

     int get(string& out_msg) {
         std::unique_lock<std::mutex> guard(msgs_lock_);
         if (!msgs_.empty()) {
             out_msg = msgs_.front();
             msgs_.pop();
             return GET_NORMAL;
         } else if (is_over_) {
             return GET_OVER;
         } else {
             cv_.wait(guard, [this]{return msgs_.empty() == false || is_over_;});  // 这里is_over 注意要捕获, 不然可能会阻塞在这里.
             return GET_NONE;
         }
     }

     void Close() {
          std::unique_lock<std::mutex> guard(msgs_lock_);
          is_over_ = true;
          cv_.notify_all();
     }

 private:
     queue<string> msgs_;
     mutex msgs_lock_;
     condition_variable cv_;
     bool is_over_ = false;
};

2 信号量的同步

从第一节看出是由锁和条件变量来实现状态同步的, 而信号量也可以实现同样的功能, 参考伪代码:

produce:
    P(emptyCount)
    P(useQueue)
    putItemIntoQueue(item)
    V(useQueue)
    V(fullCount)
consume:
    P(fullCount)
    P(useQueue)
    item ← getItemFromQueue()
    V(useQueue)
    V(emptyCount)

3 锁与信号量

锁和信号量都能实现类似的功能, 在一些复杂的场景, 例如多种相互依赖的状态需要同步的时候通过锁和条件变量更方便实现, 但是要注意锁的串行特性, 不要锁住耗时多的代码.

上一篇 下一篇

猜你喜欢

热点阅读