task_queue事件队列的使用,模拟解包,压缩和发送过程

2021-04-10  本文已影响0人  FredricZhu

完整的事件队列应该还添加一个条件变量,来监控事件队列的大小。本例的事件队列比较小,不是很完整。
程序类图如下,


图片.png

CMakeLists.txt

cmake_minimum_required(VERSION 2.6)
project(lexical_cast)

add_definitions(-std=c++14)

include_directories("/usr/local/include")
link_directories("/usr/local/lib")
file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
foreach( sourcefile ${APP_SOURCES} )
    file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
    string(REPLACE ".cpp" "" file ${filename})
    add_executable(${file} ${sourcefile})
    target_link_libraries(${file} boost_filesystem boost_thread boost_system boost_serialization pthread boost_chrono)
endforeach( sourcefile ${APP_SOURCES} )

main.cpp

#include <boost/atomic.hpp>
#include <boost/function.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/thread.hpp>

#include <vector>
#include <deque>
#include <iostream>

#include <cassert>
// 未加工的数据包
struct data_packet {
    unsigned int value;
}; 

// 解码后的数据包
struct decoded_data {
    unsigned int value;
};

// 压缩后的数据包
struct compressed_data {
    unsigned int value;
};

using atomic_t = boost::atomic<unsigned int>;

class subsystem1 {
    atomic_t i_;

public:
    subsystem1(): i_(0) {}

    // 获取原始数据包
    data_packet get_data() {
        data_packet ret = { ++ i_};
        return ret;
    }
    
    // 最多运行10000次
    static const unsigned int max_runs = 10000;
    

    // 最多运行10000次
    bool is_stopped() const {
        return i_ == max_runs;
    } 
};

class subsystem2 {
    atomic_t i_;

public:
    subsystem2(): i_(0) {}

    void send_data(const compressed_data& data) {
        ++ i_;
        assert(data.value == i_);
    }

    unsigned int send_packets_count() const {
        return i_;
    }
};

// 解码数据
decoded_data decode_data(const data_packet& packet) {
    static unsigned int i = 0;
    ++ i;
    decoded_data ret = { packet.value };
    assert(i==packet.value);
    return ret;
}

// 压缩数据
compressed_data compress_data(const decoded_data& packet) {
    static unsigned int i = 0;
    ++ i;
    compressed_data ret = {packet.value};
    assert(i == packet.value);
    return ret;
}

class work_queue {
public:
    using task_type = boost::function<void()>;

private:
    std::deque<task_type> tasks_;
    boost::mutex mutex_;
    boost::condition_variable cond_;
    bool is_stopped_;

public:
    work_queue(): is_stopped_(false){}
    // 停止队列输入和输出
    void stop() {
        boost::lock_guard<boost::mutex> lock(mutex_);
        is_stopped_ = true;
        cond_.notify_all();
    }

    // 向任务队列中push任务
    void push_task(const task_type& task) {
        boost::unique_lock<boost::mutex> lock(mutex_);
        if(is_stopped_) {
            return;
        }

        tasks_.push_back(task);
        lock.unlock();
        cond_.notify_one();
    } 

    task_type pop_task() {
        boost::unique_lock<boost::mutex> lock(mutex_);
        while(tasks_.empty()) {
            if(is_stopped_) {
                return task_type();
            }
            cond_.wait(lock);
        }

        task_type ret = tasks_.front();
        tasks_.pop_front();
        return ret;
    }
};


work_queue decoding_queue, compressing_queue, sending_queue;
subsystem1 subs1;
subsystem2 subs2;

void do_decode(const data_packet& packet);
void start_data_accepting() {
    while(!subs1.is_stopped()) {
        data_packet packet = subs1.get_data();
        decoding_queue.push_task([packet](){
            do_decode(packet);
        });
    }
}

void do_compress(const decoded_data& packet);
void do_decode(const data_packet& packet) {
    decoded_data dec_packet = decode_data(packet);
    compressing_queue.push_task([dec_packet](){
        do_compress(dec_packet);
    });
}

void do_compress(const decoded_data& packet) {
    compressed_data com_packet = compress_data(packet);
    sending_queue.push_task([com_packet]() {
        subs2.send_data(com_packet);
    });
}

void run_while_not_stopped(work_queue& w_queue) {
    work_queue::task_type task;
    while(task = w_queue.pop_task()) {
        task();
    }
}

int main(int argc, char* argv[]) {
    boost::thread t_data_accepting(&start_data_accepting);
    boost::thread t_data_decoding([](){
        run_while_not_stopped(decoding_queue);
    });
    boost::thread t_data_compressing([](){
        run_while_not_stopped(compressing_queue);
    });
    boost::thread t_data_sending([](){
        run_while_not_stopped(sending_queue);
    });

    t_data_accepting.join();

    decoding_queue.stop();
    t_data_decoding.join();

    compressing_queue.stop();
    t_data_compressing.join();

    sending_queue.stop();
    t_data_sending.join();

    std::cerr << subs2.send_packets_count() << std::endl;
    assert(subs2.send_packets_count() == subsystem1::max_runs);
}
上一篇下一篇

猜你喜欢

热点阅读