C++11 实现线程池
2024-09-05 本文已影响0人
leon_tly
线程池
在程序开始运行前创建多个线程这样,程序在运行时,只需要从线程池中拿来用就可以了.大大提高了程序运行效率。
- 线程池中预先设置了多个线程。
- 线程池中有一个任务队列,用于存储函数。
- 线程池中有一个push任务的函数,讲需要在线程中调用的函数以及函数参数push到队列中。
- 线程池中不断监听任务队列,利用信号量机制判断是否有任务到队列中。监听到任务则执行。
线程池的实现
#include <iostream>
#include <thread>
#include <mutex>
#include <string>
#include <condition_variable>
#include <queue>
#include <vector>
#include <functional>
class ThreadPool{
public:
ThreadPool(int num) : stop(false)
{
for (int i = 0; i < num; i++)
{
threads.emplace_back([this](){
while(true)
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this]{
return !tasks.empty() || stop;
});
if (stop && tasks.empty())
{
return;
}
std::function<void()> task(std::move(tasks.front()));
tasks.pop();
lock.unlock();
task();
}
});
}
}
~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(mtx);
stop = true;
}
cv.notify_all();
for (auto& t : threads)
{
t.join();
}
}
// not undersanding ...
template<typename F, typename... Args>
void enqueue(F&& f, Args&&... args)
{
std::function<void()> task =
std::bind(std::forward<F>(f), std::forward<Args>(args)...);
{
std::unique_lock<std::mutex> lock(mtx);
tasks.emplace(std::move(task));
}
cv.notify_one();
}
private:
bool stop;
/* threads list*/
std::vector<std::thread> threads;
/* task list*/
std::queue<std::function<void()>> tasks;
std::mutex mtx;
std::condition_variable cv;
};
void task(std::string name, int data)
{
std::cout << "name : " << name << ", data : " << data << std::endl;
}
int main()
{
ThreadPool pool(4);
for(int i = 0; i < 10; i++)
{
pool.enqueue(task, "task", i);
}
return 0;
}
带有返回结果的线程池
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
class ThreadPool {
public:
ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queueMutex);
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty()) {
return;
}
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
}
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
using return_type = decltype(f(args...));
auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queueMutex);
if (stop) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers) {
worker.join();
}
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop;
};
// Example function to be executed in the thread pool
int add(int a, int b) {
return a + b;
}
int main() {
ThreadPool pool(4);
// Enqueue tasks and get return values
std::future<int> result1 = pool.enqueue(add, 2, 3);
std::future<int> result2 = pool.enqueue(add, 5, 7);
// Get the results
std::cout << "Result1: " << result1.get() << std::endl;
std::cout << "Result2: " << result2.get() << std::endl;
return 0;
}