使用简单的消息队列实现文件列表
2023-09-20 本文已影响0人
FredricZhu
本例是Bartosz Milewski C++11 Concurrency课程中第9课的代码,使用std::condition_variable 和 std::mutex实现简单的消息队列,来进行文件列表功能的视线。
程序代码如下,
conanfile.txt
[requires]
boost/1.72.0
[generators]
cmake
CMakeLists.txt
cmake_minimum_required(VERSION 3.3)
project(1_list_dir_server)
set(ENV{PKG_CONFIG_PATH} "$ENV{PKG_CONFIG_PATH}:/usr/local/lib/pkgconfig/")
set ( CMAKE_CXX_FLAGS "-pthread")
set(CMAKE_CXX_STANDARD 17)
add_definitions(-g)
include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
conan_basic_setup()
include_directories(${INCLUDE_DIRS})
LINK_DIRECTORIES(${LINK_DIRS})
file( GLOB main_file_list ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
foreach( main_file ${main_file_list} )
file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${main_file})
string(REPLACE ".cpp" "" file ${filename})
add_executable(${file} ${main_file})
target_link_libraries(${file} ${CONAN_LIBS} pthread)
endforeach( main_file ${main_file_list})
async_out.hpp
#ifndef _FREDRIC_ASYNC_OUT_HPP_
#define _FREDRIC_ASYNC_OUT_HPP_
#include <sstream>
#include <mutex>
#include <iostream>
struct AsyncOut: public std::stringstream {
static inline std::mutex cout_mutex;
~AsyncOut() {
std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << rdbuf();
std::cout.flush();
}
};
#define aout AsyncOut{}
#endif
clocker.hpp
#ifndef _FREDRIC_CLOCKER_HPP_
#define _FREDRIC_CLOCKER_HPP_
#include <chrono>
#include "asyc_out.hpp"
struct Clocker {
std::chrono::time_point<std::chrono::high_resolution_clock> start;
Clocker() {
start = std::chrono::high_resolution_clock::now();
}
~Clocker() {
auto end = std::chrono::high_resolution_clock::now();
auto mill_dur = std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count();
aout << "Takes " << mill_dur << " milliseconds\n";
}
};
#endif
main.cpp
#include "boost/filesystem.hpp"
#include <iostream>
#include <thread>
#include <algorithm>
#include <boost/foreach.hpp>
#include <vector>
#include <string>
#include <cassert>
#include <future>
#include <mutex>
#include <chrono>
#include <condition_variable>
#include <deque>
#include "asyc_out.hpp"
#include "clocker.hpp"
using path_type = boost::filesystem::path;
int const NUM_THREADS = 8;
template <typename T>
class MsgQueue {
std::deque<T> queue_;
std::condition_variable cond_;
std::mutex mutex_;
public:
void send(T&& msg) {
{
std::lock_guard<std::mutex> lock(mutex_);
queue_.push_front(std::move(msg));
}
cond_.notify_one();
}
T receive() {
// 注意这里必须用std::unique_lock,因为cond_.wait的时候会调用lock和unlock接口
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this](){return !queue_.empty();});
T msg = std::move(queue_.back());
queue_.pop_back();
return msg;
}
};
void list_dir_server(MsgQueue<path_type>& dir_queue, MsgQueue<std::string>& file_queue) {
for(;;) {
path_type dir = dir_queue.receive();
boost::filesystem::directory_iterator it(dir);
for(auto& sub_path: it) {
if(boost::filesystem::is_directory(sub_path)) {
auto path_ = sub_path.path();
dir_queue.send(std::move(path_));
} else {
auto path_leaf = sub_path.path().filename().string();
file_queue.send(std::move(path_leaf));
}
}
}
}
void print_server(MsgQueue<std::string>& name_queue) {
for(;;) {
std::string name = name_queue.receive();
aout << name << "\n";
}
}
void list_tree(path_type&& root_dir) {
MsgQueue<path_type> dir_queue;
MsgQueue<std::string> file_queue;
dir_queue.send(std::move(root_dir));
std::vector<std::future<void>> futures;
for(int i=0; i<NUM_THREADS; ++i) {
futures.push_back(std::async(std::launch::async, &list_dir_server,
std::ref(dir_queue),
std::ref(file_queue)));
}
futures.push_back(std::async(std::launch::async, &print_server, std::ref(file_queue)));
try {
while(!futures.empty()) {
auto ftr = std::move(futures.back());
futures.pop_back();
ftr.get();
}
} catch(boost::filesystem::filesystem_error& err) {
aout << "File system error: " << err.code().message() << "\n";
} catch(std::exception& ex) {
aout << "Exception: " << ex.what() << "\n";
} catch(...) {
aout << "Unknown exception\n";
}
}
int main(int argc, char* argv[]) {
path_type root_dir("/home/fredric/code");
list_tree(std::move(root_dir));
return EXIT_SUCCESS;
}
程序的输出如下,
image.png