使用tasks_processor管理网络请求

2021-04-14  本文已影响0人  FredricZhu

这个框架作者是在boost::asio库的基础上做二次开发,设计得挺骚的。

服务器和客户端都通过tasks_processor来启动和调度。如果加上io_service的多线程ios::run,就可以让服务器和客户端启动的task在多线程环境中执行。

加上timer_task这种调度任务后,可以使用boost::asio::deadline_timer来设定哪种任务先执行,哪种任务后执行,延迟到什么时候。

如果是标准的实时通信操作的话,客户端和服务器都应该写成一个
class tcp_client: public boost::enable_shared_from_this<tcp_client> 类似这样的类。
这样才便于持续操作。参考authorizer服务器的写法就可以。

当然认证服务器,写到作者这个样子就可以,毕竟只是一个demo,而且也不用在内存中保存历史认证信息,认证一次就够了。

没时间画类图了,直接上代码,
代码结构,


图片.png

CMakeLists.txt

cmake_minimum_required(VERSION 2.6)
project(lexical_cast)

add_definitions(-std=c++14)

include_directories("/usr/local/include")
link_directories("/usr/local/lib")
file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
foreach( sourcefile ${APP_SOURCES} )
    file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
    string(REPLACE ".cpp" "" file ${filename})
    add_executable(${file} ${sourcefile})
    target_link_libraries(${file} boost_filesystem boost_thread boost_system boost_serialization pthread boost_chrono)
endforeach( sourcefile ${APP_SOURCES} )

main.cpp

#include "utils/tasks_processor_network.hpp"

#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>

using namespace tp_network;
// 前向声明
class authorizer;
// 全局的authorizer_ptr声明
using authorizer_ptr = boost::shared_ptr<authorizer>;

class authorizer: public boost::enable_shared_from_this<authorizer> {
    tcp_connection_ptr connection_;
    boost::array<char, 512> message_;

    explicit authorizer(const tcp_connection_ptr& connection):
        connection_(connection) {

    }

    public:
        // 至少读一个字节
        static void on_connection_accept(const tcp_connection_ptr& connection) {
            authorizer_ptr auth(new authorizer(connection));
            auth->connection_.async_read(
                boost::asio::buffer(auth->message_),
                [auth](const boost::system::error_code& ec, std::size_t bytes_transferred) {
                    auth->on_data_received(ec, bytes_transferred);
                },
                1
            );
        }

        void on_data_received(const boost::system::error_code& ec, std::size_t bytes_transferred) {
            if(ec) {
                std::cerr << "authorizer.on_data_received: error during receiving response: " << ec << '\n';
                assert(false);
            }

            if(bytes_transferred == 0) {
                std::cerr << "authorizer.on_data_received: zero bytes received\n";
                assert(false);
            }

            // 现在我们在 message_中读到了数据,我们可以做一些验证登陆的工作
            // ...

            // 往客户端回写消息 ”OK“
            message_[0] = 'O';
            message_[1] = 'K';

            std::size_t bytes_to_send = 2;

            auto self(shared_from_this());
            // 现在把"OK"写回去
            connection_.async_write(
                boost::asio::buffer(message_, bytes_to_send),
                [self](const boost::system::error_code& ec, std::size_t bytes_transferred) {
                    self->on_data_send(ec);
                }
            );
        }

        // 发回客户端数据后的回调函数
        void on_data_send(const boost::system::error_code& ec) {
            if(ec) {
                std::cerr << "authorizer.on_data_send: error during send response: " << ec << '\n';
                assert(false);
            }

            connection_.shutdown();
        }
};

// 全局的是否auth成功的标志
bool g_authed = false;

void finish_socket_auth_task(
    const boost::system::error_code& ec,
    std::size_t bytes_transferred,
    const tcp_connection_ptr& connection,
    const boost::shared_ptr<std::string>& data
) {
    if(ec && ec != boost::asio::error::eof) {
        std::cerr << "finsh_socket_auth_task: Client error on recieve: " << ec.message() << '\n';
        assert(false);
    }

    if(bytes_transferred != 2) {
        std::cerr << "finish_socket_auth_task: wrong bytes count\n";
        assert(false);
    }

    data->resize(bytes_transferred);
    if(*data != "OK") {
        std::cerr << "finish_socket_auth_task: wrong response " << *data << '\n';
        assert(false);
    }

    g_authed = true;
    // 关闭TCP双向连接
    connection.shutdown();
    tasks_processor::get().stop();
}

// 接收认证服务器发回的消息的函数
void receive_auth_task(const boost::system::error_code& ec, const tcp_connection_ptr& connection, 
    const boost::shared_ptr<std::string>& data) {
    if(ec) {
        std::cerr << "receive_auth_task: client error on receive: " << ec.message() << '\n';
        assert(false);
    }

    tcp_connection_ptr soc(connection);
    soc.async_read(
        boost::asio::buffer(&(*data)[0], data->size()),
        [soc, data](const boost::system::error_code& ec, std::size_t bytes_transferred) {
                finish_socket_auth_task(ec, bytes_transferred, soc, data);
        },
        1
    );

}

const unsigned short port_num = 65001;
// 客户端发送auth的函数
void send_auth_task() {
    tcp_connection_ptr soc = tasks_processor::get().create_connection("127.0.0.1", port_num);
    boost::shared_ptr<std::string> data = boost::make_shared<std::string>("authname");

    soc.async_write(
        boost::asio::buffer(*data),
        [soc, data](const boost::system::error_code& ec, std::size_t bytes_transferred) {
            receive_auth_task(ec, soc, data);
        }
    );
}

int main(int argc, char* argv[]) {
    // 一秒钟之后发起请求,因为要等服务器先启动起来
    tasks_processor::get().run_after(boost::posix_time::seconds(1), &send_auth_task);
    // 在65001的TCP V4端口,启动auth服务器
    tasks_processor::get().add_listener(port_num, &authorizer::on_connection_accept);
    
    // 没有开始跑,
    assert(!g_authed);

    tasks_processor::get().start();
    assert(g_authed);
    return 0;
}

utils/tasks_processor_base.hpp

#ifndef _CHAPTOR06_06_TASKS_PROCESSOR_BASE_HPP_
#define _CHAPTOR06_06_TASKS_PROCESSOR_BASE_HPP_

#include <boost/noncopyable.hpp>
#include <boost/thread/thread.hpp>
#include <boost/asio/io_service.hpp>

#include <iostream>

namespace detail {
    // 封装task,避免异常发生中断执行
    template <class T>
    struct task_wrapped {
        private:
            T task_unwrapped_;
        
        public:
            explicit task_wrapped(const T& task_unwrapped):
                task_unwrapped_(task_unwrapped) {}
            
            void operator()() const {
                // 重置中断点
                try {
                    boost::this_thread::interruption_point();
                } catch(const boost::thread_interrupted&) {

                }

                try {
                    // 执行任务
                    task_unwrapped_();
                } catch(const std::exception& ex) {
                    std::cerr << "Exception: " << ex.what() << '\n';
                } catch(const boost::thread_interrupted&) {
                    std::cerr << "Thread interrupted\n";
                } catch(...) {
                    std::cerr << "Unknown exception\n";
                }
            }
    };

    template <class T>
    inline task_wrapped<T> make_task_wrapped(const T& task_unwrapped) {
        return task_wrapped<T>(task_unwrapped);
    }
};

namespace tp_base {
    class tasks_processor: private boost::noncopyable {
        protected:
            boost::asio::io_service ios_;
            boost::asio::io_service::work work_;

            tasks_processor():
                ios_(),
                work_(ios_)
             {}
            
        public:
            static tasks_processor& get();

            template <class T>
            inline void push_task(const T& task_unwrapped) {
                ios_.post(detail::make_task_wrapped(task_unwrapped));
            }

            void start() {
                ios_.run();
            }

            void stop() {
                ios_.stop();
            }
    };
};
#endif

utils/tasks_processor_timer.hpp

#ifndef _CHAPTOR06_06_TASKS_PROCESSOR_TIMER_HPP_
#define _CHAPTOR06_06_TASKS_PROCESSOR_TIMER_HPP_
// 给基础的tasks_processor_base加上timer功能

#include "tasks_processor_base.hpp"

#include <boost/asio/io_service.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/system/error_code.hpp>
#include <boost/make_shared.hpp>

#include <iostream>

namespace detail {
    using duration_type_t = boost::asio::deadline_timer::duration_type;

    template <class Functor>
    struct timer_task: public task_wrapped<Functor> {
        private:
            using base_t = task_wrapped<Functor>;
            boost::shared_ptr<boost::asio::deadline_timer> timer_;

        public:
            template <class Time>
            explicit timer_task(boost::asio::io_service& ios,
                const Time& duration_or_time,
                const Functor& task_unwrapped):
                    base_t(task_unwrapped),
                    timer_(boost::make_shared<boost::asio::deadline_timer>(
                        ios, duration_or_time
                        )) 
            {
           
            }

            void push_task() const {
                timer_->async_wait(*this);
            }

            void operator()(const boost::system::error_code& error) const {
                if(!error) {
                    base_t::operator()();
                } else {
                    std::cerr << error << '\n';
                }
            }
    };

    template <class Time, class Functor>
    inline timer_task<Functor> make_timer_task(
        boost::asio::io_service& ios,
        const Time& duration_or_time,
        const Functor& task_unwrapped
    ) {
        return timer_task<Functor>(ios, duration_or_time, task_unwrapped);
    }
}; // namespace detail

namespace tp_timers {
    class tasks_processor: public tp_base::tasks_processor {
        public:
            static tasks_processor& get();

            using duration_type_t = boost::asio::deadline_timer::duration_type;
            template <class Functor>
            void run_after(duration_type_t duration, const Functor& f) {
                detail::make_timer_task(ios_, duration, f).push_task();
            }

            using time_type_t = boost::asio::deadline_timer::time_type;
            template <class Functor>
            void run_at(time_type_t time, const Functor& f) {
                detail::make_timer_task(ios_, time, f).push_task();
            }
    };
}; // namespace tp_timers
#endif

utils/tasks_processor_network.hpp

#ifndef _CHAPTOR06_06_TASKS_PROCESSOR_NETWORK_HPP_
#define _CHAPTOR06_06_TASKS_PROCESSOR_NETWORK_HPP_
// 给task_processor_timer加上网络处理功能
#include "tasks_processor_timer.hpp"

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/read.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/function.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/lexical_cast.hpp>

#include <map>

// 将当前TCP连接的套接字
// 和读取,写入,关闭函数绑定
class tcp_connection_ptr {

    boost::shared_ptr<boost::asio::ip::tcp::socket> socket_;
    public:

        explicit tcp_connection_ptr(boost::shared_ptr<boost::asio::ip::tcp::socket> socket):
            socket_(socket)
         {}

        explicit tcp_connection_ptr(
            boost::asio::io_service& ios,
            const boost::asio::ip::tcp::endpoint& endpoint
        ):
              socket_(boost::make_shared<boost::asio::ip::tcp::socket>(ios))
        {    
                socket_->connect(endpoint);
        }

        template <class Functor>
        void async_write(
            const boost::asio::const_buffers_1& buf, const Functor& f
        ) {
            boost::asio::async_write(*socket_, buf, f);
        }
        
        template <class Functor>
        void async_write(
            const boost::asio::mutable_buffers_1& buf, const Functor& f
        ) {
            boost::asio::async_write(*socket_, buf, f);
        }

        template <class Functor>
        void async_read(
            const boost::asio::mutable_buffers_1& buf, 
            const Functor& f,
            std::size_t at_least_bytes
        ) {
            boost::asio::async_read(
                *socket_, buf, boost::asio::transfer_at_least(at_least_bytes), f
            );
        }

        void shutdown() const {
            socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
            socket_->close();
        }
};

namespace detail {
    // tcp 接收器类,循环接收TCP连接,
    class tcp_listener: public boost::enable_shared_from_this<tcp_listener> {
        using acceptor_t = boost::asio::ip::tcp::acceptor;
        acceptor_t acceptor_;
        boost::asio::io_service& io_service_;

        boost::function<void(tcp_connection_ptr)> func_;

        public:
            template <class Functor> 
            tcp_listener(
                boost::asio::io_service& io_service,
                unsigned short port,
                const Functor& task_unwrapped
            ):
                io_service_(io_service),
                acceptor_(io_service, boost::asio::ip::tcp::endpoint(
                    boost::asio::ip::tcp::v4(), port
                )),
                func_(task_unwrapped)
             {}
            

            void stop() {
                acceptor_.close();
            }

            // 主要用来接收连接,串起接收链
            void push_task() {
                // acceptor 关闭, 不能再接收连接
                if(!acceptor_.is_open()) {
                    return;
                }

                using socket_t = boost::asio::ip::tcp::socket;
                boost::shared_ptr<socket_t> socket = 
                    boost::make_shared<socket_t>(io_service_);

                auto self(shared_from_this());
                tcp_connection_ptr new_connection(socket);
                acceptor_.async_accept(*socket, [self, new_connection](const boost::system::error_code& ec) {
                    self->handle_accept(new_connection, ec);
                }); 

            }
        
        private:
            void handle_accept(
                const tcp_connection_ptr& new_connection,
                const boost::system::error_code& error
            ) {
                // 继续串起接收链,接收下一个新连接
                push_task();
                // 说明async_accept成功
                if(!error) {
                    auto self(shared_from_this());
                    // 连接成功以后,构造一个task_wrapped,执行一下函数
                    make_task_wrapped([self, new_connection](){
                        self->func_(new_connection);
                    })
                    ();
                } else {
                    std::cerr << error << '\n';
                }
            }


    };
}; // namespace detail


namespace tp_network {
    class tasks_processor: public tp_timers::tasks_processor {
        using listeners_map_t = std::map<
            unsigned short, 
            boost::shared_ptr<detail::tcp_listener>
        >;
        listeners_map_t listeners;

        public:
            // 获取全局单例的get方法
            static tasks_processor& get();

            template <class Functor>
            void add_listener(unsigned short port_num, const Functor& f) {
                auto it = listeners.find(port_num);
                // 这个端口在listeners map中已经存在,不需要重复添加
                if(it != listeners.end()) {
                    throw std::logic_error(
                        "Such listener for port '" +
                        boost::lexical_cast<std::string>(port_num) +
                        "' already created"
                    );
                }
                listeners[port_num] = boost::make_shared<detail::tcp_listener>(boost::ref(ios_), port_num, f);
                listeners[port_num]->push_task(); // 开始接收连接,之后一直接收
            }

            void remove_listener(unsigned short port_num) {
                auto it = listeners.find(port_num);
                // 没找着,没法移除
                if(it == listeners.end()) {
                    throw std::logic_error(
                        "No listener for port '" +
                        boost::lexical_cast<std::string>(port_num) +
                        "' created"
                    );
                } 
                (*it).second->stop();
                listeners.erase(it);
            }
            
            tcp_connection_ptr create_connection(const char* addr, unsigned short port_num) {
                return tcp_connection_ptr(ios_, boost::asio::ip::tcp::endpoint(
                    boost::asio::ip::address_v4::from_string(addr),
                    port_num
                ));
            }
    };

    tasks_processor& tasks_processor::get() {
        static tasks_processor proc;
        return proc;
    }
}; // namespace tp_network
#endif

程序应该没有输出就是正常的。

上一篇下一篇

猜你喜欢

热点阅读