生产者消费者C++代码
2023-12-04 本文已影响0人
leon0514
生产者消费者
生产者消费者是一种常见的并发编程模型,用于解决多线程或多进程之间的数据共享和协作问题。在该模型中,生产者负责生成数据或任务,并将其放入一个共享的缓冲区中,而消费者则负责从缓冲区中获取数据或任务并进行处理。
生产者和消费者之间通过共享的缓冲区进行通信,这个缓冲区可以是一个队列、一个缓存区或者其他数据结构。生产者将数据放入缓冲区后,消费者可以从缓冲区中取出数据进行处理。如果缓冲区为空,消费者就需要等待,直到有新的数据被生产者放入缓冲区。同样地,如果缓冲区已满,生产者就需要等待,直到有空闲的空间可以放入新的数据。
这种模型的一个重要特点是解耦了生产者和消费者之间的耦合关系。生产者和消费者可以独立地进行操作,不需要彼此直接通信,而通过共享的缓冲区进行间接通信。这种解耦可以提高系统的灵活性和可扩展性,并允许生产者和消费者的处理速度不一致。
生产者消费者模型可以应用于多种场景,例如生产者可以是一个数据生成器,而消费者可以是一个数据处理器;生产者可以是一个任务调度器,而消费者可以是执行任务的线程或进程等。通过合理设计和实现生产者消费者模型,可以有效地控制并发访问共享资源,避免竞态条件和资源争用问题,提高系统的性能和可靠性
代码解析
对源代码做了部分修改、删减、添加注释
#ifndef __CPM_HPP__
#define __CPM_HPP__
// Comsumer Producer Model
#include <algorithm>
#include <condition_variable>
#include <future>
#include <memory>
#include <queue>
#include <thread>
namespace cpm {
// 三个模板分别为结果类型、输入类型、处理数据的模型类型
// 在深度学习模型识别中,Model可以为模型识别类的结果或指针
template <typename Result, typename Input, typename Model>
class Instance {
protected:
struct Item {
Input input;
std::shared_ptr<std::promise<Result>> pro;
};
// 每次放入队列的数据结构
// input 存储需要处理的数据
// pro 存储对数据的处理结果
// 条件变量
std::condition_variable cond_;
// 缓冲队列,所有commit的数据都存储在这个队列中
std::queue<Item> input_queue_;
// 队列锁,避免同时读取修改队列数据
std::mutex queue_lock_;
// 消费者线程,不断消耗队列中的数据
std::shared_ptr<std::thread> worker_;
// 是否运行标识
volatile bool run_ = false;
public:
virtual ~Instance() { stop(); }
// 结束函数
void stop() {
// 运行状态置为false再通知工作线程结束,跳出循环
run_ = false;
cond_.notify_one();
{
// 清空队列
std::unique_lock<std::mutex> l(queue_lock_);
while (!input_queue_.empty()) {
auto &item = input_queue_.front();
if (item.pro) item.pro->set_value(Result());
input_queue_.pop();
}
};
if (worker_) {
worker_->join();
worker_.reset();
}
}
// 生产者函数,不断向队列中commit数据
virtual std::shared_future<Result> commit(const Input &input) {
Item item;
item.input = input;
item.pro.reset(new std::promise<Result>());
{
// 队列加锁,避免同时读取修改队列
std::unique_lock<std::mutex> __lock_(queue_lock_);
input_queue_.push(item);
}
// 通知生产者线程干活
cond_.notify_one();
// 返回结果的shared_future
return item.pro->get_future();
}
// 开始函数
// 1. 初始化模型,通过传入的loadmethod方法来初始化模型
// 2. 先stop一下,保证不会重复启动,释放资源
// 3. 开启工作线程
template <typename LoadMethod>
bool start(const LoadMethod &loadmethod) {
stop();
std::promise<bool> status;
worker_ = std::make_shared<std::thread>(&Instance::worker<LoadMethod>, this,
std::ref(loadmethod), std::ref(status));
return status.get_future().get();
}
private:
// 工作线程,在start方法中传入loadmethod方法,对于深度学习模型部署中一般用于初始化模型
template <typename LoadMethod>
void worker(const LoadMethod &loadmethod, std::promise<bool> &status) {
std::shared_ptr<Model> model = loadmethod();
if (model == nullptr) {
status.set_value(false);
return;
}
run_ = true;
status.set_value(true);
Item fetch_item;
Input input;
// 等待从队列中获取数据
// 队列为空则一直阻塞在这里,当队列中塞入数据时cond_会通知该线程开始干活
while (get_item_and_wait(fetch_item)) {
input = fetect_item.input;
// 初始化的模型中的forward方法用于处理数据
auto ret = model->forward(input);
// pro设置返回结果,设置返回结果后future在get的时候就不会一直阻塞了
fetch_item.pro->set_value(ret);
}
model.reset();
run_ = false;
}
// 封装的等待获取数据的方法
virtual bool get_item_and_wait(Item &fetch_item) {
std::unique_lock<std::mutex> l(queue_lock_);
cond_.wait(l, [&]() { return !run_ || !input_queue_.empty(); });
if (!run_) return false;
// 获取数据并从队列弹出
// std::move避免数据拷贝
fetch_item = std::move(input_queue_.front());
input_queue_.pop();
return true;
}
};
}; // namespace cpm
#endif // __CPM_HPP__
使用例子
ps : 在不用深度学习模型部署的情况也可以使用该代码,十分通用、好用
太长不放了,有时间继续写🤷♂️