使用简单的消息队列实现文件列表

2023-09-20  本文已影响0人  FredricZhu

本例是Bartosz Milewski C++11 Concurrency课程中第9课的代码,使用std::condition_variable 和 std::mutex实现简单的消息队列,来进行文件列表功能的视线。
程序代码如下,
conanfile.txt

[requires]
boost/1.72.0

[generators]
cmake

CMakeLists.txt

cmake_minimum_required(VERSION 3.3)

project(1_list_dir_server)

set(ENV{PKG_CONFIG_PATH} "$ENV{PKG_CONFIG_PATH}:/usr/local/lib/pkgconfig/")

set ( CMAKE_CXX_FLAGS "-pthread")
set(CMAKE_CXX_STANDARD 17)
add_definitions(-g)

include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
conan_basic_setup()

include_directories(${INCLUDE_DIRS})
LINK_DIRECTORIES(${LINK_DIRS})

file( GLOB main_file_list ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp) 

foreach( main_file ${main_file_list} )
    file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${main_file})
    string(REPLACE ".cpp" "" file ${filename})
    add_executable(${file}  ${main_file})
    target_link_libraries(${file} ${CONAN_LIBS} pthread)
endforeach( main_file ${main_file_list})

async_out.hpp

#ifndef _FREDRIC_ASYNC_OUT_HPP_
#define _FREDRIC_ASYNC_OUT_HPP_

#include <sstream>
#include <mutex>
#include <iostream>

struct AsyncOut: public std::stringstream {
    static inline std::mutex cout_mutex;

    ~AsyncOut() {
        std::lock_guard<std::mutex> lock(cout_mutex);
        std::cout << rdbuf();
        std::cout.flush();
    }
};

#define aout AsyncOut{}

#endif

clocker.hpp

#ifndef _FREDRIC_CLOCKER_HPP_
#define _FREDRIC_CLOCKER_HPP_
#include <chrono>
#include "asyc_out.hpp"

struct Clocker {
    std::chrono::time_point<std::chrono::high_resolution_clock> start;

    Clocker() {
        start = std::chrono::high_resolution_clock::now();
    }

    ~Clocker() {
        auto end = std::chrono::high_resolution_clock::now();
        auto mill_dur = std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count();
        aout << "Takes " << mill_dur << " milliseconds\n";
    }
};
#endif

main.cpp

#include "boost/filesystem.hpp"
#include <iostream>
#include <thread>
#include <algorithm>
#include <boost/foreach.hpp>
#include <vector>
#include <string>
#include <cassert>
#include <future>
#include <mutex>
#include <chrono>
#include <condition_variable>
#include <deque>
#include "asyc_out.hpp"
#include "clocker.hpp"

using path_type = boost::filesystem::path;
int const NUM_THREADS = 8;

template <typename T>
class MsgQueue {
    std::deque<T> queue_;
    std::condition_variable cond_;
    std::mutex mutex_;

public:
    void send(T&& msg) {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            queue_.push_front(std::move(msg));
        }
        cond_.notify_one();
    }

    T receive() {
        // 注意这里必须用std::unique_lock,因为cond_.wait的时候会调用lock和unlock接口
        std::unique_lock<std::mutex> lock(mutex_);
        cond_.wait(lock, [this](){return !queue_.empty();});
        T msg = std::move(queue_.back());
        queue_.pop_back();
        return msg;
    }
};


void list_dir_server(MsgQueue<path_type>& dir_queue, MsgQueue<std::string>& file_queue) {
    for(;;) {
        path_type dir = dir_queue.receive();
        boost::filesystem::directory_iterator it(dir);
        for(auto& sub_path: it) {
            if(boost::filesystem::is_directory(sub_path)) {
                auto path_ = sub_path.path();
                dir_queue.send(std::move(path_));
            } else {
                auto path_leaf =  sub_path.path().filename().string();
                file_queue.send(std::move(path_leaf));
            }
        }
    }
}

void print_server(MsgQueue<std::string>& name_queue) {
    for(;;) {
        std::string name = name_queue.receive();
        aout << name << "\n";
    }
}

void list_tree(path_type&& root_dir) {
    MsgQueue<path_type> dir_queue;
    MsgQueue<std::string> file_queue;
    dir_queue.send(std::move(root_dir));

    std::vector<std::future<void>> futures;
    for(int i=0; i<NUM_THREADS; ++i) {
        futures.push_back(std::async(std::launch::async, &list_dir_server, 
            std::ref(dir_queue),
            std::ref(file_queue)));
    }

    futures.push_back(std::async(std::launch::async, &print_server, std::ref(file_queue)));

    try {
        while(!futures.empty()) {
            auto ftr = std::move(futures.back());
            futures.pop_back();
            ftr.get();
        }
    } catch(boost::filesystem::filesystem_error& err) {
        aout << "File system error: " << err.code().message() << "\n";
    } catch(std::exception& ex) {
        aout << "Exception: " << ex.what() << "\n";
    } catch(...) {
        aout << "Unknown exception\n";
    }
}

int main(int argc, char* argv[]) {
    path_type root_dir("/home/fredric/code");
    list_tree(std::move(root_dir));

    return EXIT_SUCCESS;
}

程序的输出如下,


image.png
上一篇下一篇

猜你喜欢

热点阅读