生产者消费者C++代码

2023-12-04  本文已影响0人  leon0514

生产者消费者

生产者消费者是一种常见的并发编程模型,用于解决多线程或多进程之间的数据共享和协作问题。在该模型中,生产者负责生成数据或任务,并将其放入一个共享的缓冲区中,而消费者则负责从缓冲区中获取数据或任务并进行处理。

生产者和消费者之间通过共享的缓冲区进行通信,这个缓冲区可以是一个队列、一个缓存区或者其他数据结构。生产者将数据放入缓冲区后,消费者可以从缓冲区中取出数据进行处理。如果缓冲区为空,消费者就需要等待,直到有新的数据被生产者放入缓冲区。同样地,如果缓冲区已满,生产者就需要等待,直到有空闲的空间可以放入新的数据。

这种模型的一个重要特点是解耦了生产者和消费者之间的耦合关系。生产者和消费者可以独立地进行操作,不需要彼此直接通信,而通过共享的缓冲区进行间接通信。这种解耦可以提高系统的灵活性和可扩展性,并允许生产者和消费者的处理速度不一致。

生产者消费者模型可以应用于多种场景,例如生产者可以是一个数据生成器,而消费者可以是一个数据处理器;生产者可以是一个任务调度器,而消费者可以是执行任务的线程或进程等。通过合理设计和实现生产者消费者模型,可以有效地控制并发访问共享资源,避免竞态条件和资源争用问题,提高系统的性能和可靠性

代码解析

对源代码做了部分修改、删减、添加注释

#ifndef __CPM_HPP__
#define __CPM_HPP__

// Comsumer Producer Model

#include <algorithm>
#include <condition_variable>
#include <future>
#include <memory>
#include <queue>
#include <thread>

namespace cpm {

// 三个模板分别为结果类型、输入类型、处理数据的模型类型
// 在深度学习模型识别中,Model可以为模型识别类的结果或指针
template <typename Result, typename Input, typename Model>
class Instance {
 protected:
  struct Item {
    Input input;
    std::shared_ptr<std::promise<Result>> pro;
  };
  // 每次放入队列的数据结构
  // input 存储需要处理的数据
  // pro 存储对数据的处理结果
 
   // 条件变量
  std::condition_variable cond_;
  // 缓冲队列,所有commit的数据都存储在这个队列中
  std::queue<Item> input_queue_;
  // 队列锁,避免同时读取修改队列数据
  std::mutex queue_lock_;
  // 消费者线程,不断消耗队列中的数据
  std::shared_ptr<std::thread> worker_;
  //  是否运行标识
  volatile bool run_ = false;

 public:
  virtual ~Instance() { stop(); }
  
  // 结束函数
  void stop() {
    // 运行状态置为false再通知工作线程结束,跳出循环
    run_ = false;
    cond_.notify_one();
    {
      // 清空队列
      std::unique_lock<std::mutex> l(queue_lock_);
      while (!input_queue_.empty()) {
        auto &item = input_queue_.front();
        if (item.pro) item.pro->set_value(Result());
        input_queue_.pop();
      }
    };
    
    if (worker_) {
      worker_->join();
      worker_.reset();
    }
  }

  // 生产者函数,不断向队列中commit数据
  virtual std::shared_future<Result> commit(const Input &input) {
    Item item;
    item.input = input;
    item.pro.reset(new std::promise<Result>());
    {
      // 队列加锁,避免同时读取修改队列
      std::unique_lock<std::mutex> __lock_(queue_lock_);
      input_queue_.push(item);
    }
    // 通知生产者线程干活
    cond_.notify_one();
    // 返回结果的shared_future
    return item.pro->get_future();
  }

  // 开始函数
  // 1. 初始化模型,通过传入的loadmethod方法来初始化模型
  // 2. 先stop一下,保证不会重复启动,释放资源
  // 3. 开启工作线程
  template <typename LoadMethod>
  bool start(const LoadMethod &loadmethod) {
    stop();
    std::promise<bool> status;
    worker_ = std::make_shared<std::thread>(&Instance::worker<LoadMethod>, this,
                                            std::ref(loadmethod), std::ref(status));
    return status.get_future().get();
  }

 private:
  // 工作线程,在start方法中传入loadmethod方法,对于深度学习模型部署中一般用于初始化模型
  template <typename LoadMethod>
  void worker(const LoadMethod &loadmethod, std::promise<bool> &status) {
    std::shared_ptr<Model> model = loadmethod();
    if (model == nullptr) {
      status.set_value(false);
      return;
    }

    run_ = true;
    status.set_value(true);

    Item fetch_item;
    Input input;
    // 等待从队列中获取数据
    // 队列为空则一直阻塞在这里,当队列中塞入数据时cond_会通知该线程开始干活
    while (get_item_and_wait(fetch_item)) {
      input = fetect_item.input;
      // 初始化的模型中的forward方法用于处理数据
      auto ret = model->forward(input);
      // pro设置返回结果,设置返回结果后future在get的时候就不会一直阻塞了
      fetch_item.pro->set_value(ret);
    }
    model.reset();
    run_ = false;
  }
  
  // 封装的等待获取数据的方法
  virtual bool get_item_and_wait(Item &fetch_item) {
    std::unique_lock<std::mutex> l(queue_lock_);
    cond_.wait(l, [&]() { return !run_ || !input_queue_.empty(); });

    if (!run_) return false;
    // 获取数据并从队列弹出
    // std::move避免数据拷贝
    fetch_item = std::move(input_queue_.front());
    input_queue_.pop();
    return true;
  }
};
};  // namespace cpm

#endif  // __CPM_HPP__

使用例子

ps : 在不用深度学习模型部署的情况也可以使用该代码,十分通用、好用

太长不放了,有时间继续写🤷♂️

代码来源

infer/src/cpm.hpp at main · shouxieai/infer · GitHub

上一篇 下一篇

猜你喜欢

热点阅读