Pistache源码分析 —— Server的初始化和请求处理

2021-06-29  本文已影响0人  蟹蟹宁

前言

Pistache 是面向C++的Web 服务器,阅读源码可以很好的理解如何实现一个高并发的服务器,对TCP、HTTP等的理解也会更加深入。

本系列主要从两个方面全面的解析Pistache的源码

因为C++是面向对象的设计,设计的很多的Class,因此我的方法就是,首先按执行流程,解析上述两个过程都进行了那些操作,然后针对期间使用的类进行详细的解释。

一、Server的初始化

我们以官方的的实例代码为例:

int main(int argc, char* argv[])
{
    Port port(9080);    // 端口
    int thr = 2;            // worker线程数
    Address addr(Ipv4::any(), port);
    auto server = std::make_shared<Http::Endpoint>(addr);
    auto opts = Http::Endpoint::options().threads(thr);
    server->init(opts);
    server->setHandler(Http::make_handler<MyHandler>());
    server->serve();
}

可见启动一个服务器端的代码还是很少的,主要的事情包括:

我们一步一步来看:

1.1 定义Endpoint对象

Endpoint 类的详细解析,见 《 Pistache源码分析 —— Endpoint类 》

可以看到,定义Endpoint对象,最主要的工作就是,初始化listener字段,关于Listener类,详见《 Pistache源码分析 —— Listener类 》

初始化listener的结构函数中,主要是初始化addr_和transportFactory_两个字段。transportFactory_是一个函数,看名字就知道,他负责创建transport对象,而Transport类就是用于工作线程的处理函数,他主要负责当监听到用户请求时,调用Request类进行解析,然后交给用户处理程序处理,最后调用Respone类的功能将数据写回,详见《 Pistache源码分析 —— Transport类 》

Endpoint::Endpoint(const Address& addr)
        : listener(addr)
    { }

Listener::Listener(const Address& address)
        : addr_(address)
        , transportFactory_(defaultTransportFactory())
    { }

using TransportFactory = std::function<std::shared_ptr<Transport>()>;

Listener::TransportFactory Listener::defaultTransportFactory() const
{
    return [&] {
        if (!handler_)
            throw std::runtime_error("setHandler() has not been called");

        return std::make_shared<Transport>(handler_);
    };
}

1.2 初始化Endpoint

Endpoint::Options::Options()
    : threads_(1)
    , flags_()
    , backlog_(Const::MaxBacklog)
    , maxRequestSize_(Const::DefaultMaxRequestSize)
    , maxResponseSize_(Const::DefaultMaxResponseSize)
    , headerTimeout_(Const::DefaultHeaderTimeout)
    , bodyTimeout_(Const::DefaultBodyTimeout)
    , logger_(PISTACHE_NULL_STRING_LOGGER)
{ }

初始化的代码,也是很短,主要还是初始化各个字段:

void Endpoint::init(const Endpoint::Options& options)
{
    listener.init(options.threads_, options.flags_, options.threadsName_);
    listener.setTransportFactory([this, options] {
        if (!handler_)
            throw std::runtime_error("Must call setHandler()");
        auto transport = std::make_shared<TransportImpl>(handler_);
        transport->setHeaderTimeout(options.headerTimeout_);
        transport->setBodyTimeout(options.bodyTimeout_);
        return transport;
    });
    options_ = options;
    logger_  = options.logger_;
}
void Listener::init(size_t workers, Flags<Options> options,
                    const std::string& workersName, int backlog,
                    PISTACHE_STRING_LOGGER_T logger)
{
    if (workers > hardware_concurrency())
    {
        // Log::warning() << "More workers than available cores"
    }

    options_     = options;
    backlog_     = backlog;
    useSSL_      = false;
    workers_     = workers;
    workersName_ = workersName;
    logger_      = logger;
}

1.3 配置用户处理类 Handler

用户自定义处理类,继承自Http::Handler,详见《 Pistache源码分析 —— Handler类 》
这个过程,真的好简单的:

void Endpoint::setHandler(const std::shared_ptr<Handler>& handler)
    {
        handler_ = handler;  //设置endpoint的handler_字段
        handler_->setMaxRequestSize(options_.maxRequestSize_);  // 设置handler_的最大请求字节数
        handler_->setMaxResponseSize(options_.maxResponseSize_);// 设置handler_的最大响应字节数
    }

1.4 开始运行

有两个运行接口,分别调用了Tcp::Listener::runTcp::Listener::runThreaded,调用接口使用了serveImpl进行封装。

void Endpoint::serve() { serveImpl(&Tcp::Listener::run); }

void Endpoint::serveThreaded() { serveImpl(&Tcp::Listener::runThreaded); }

先看serveImpl封装函数:

        void serveImpl(Method method)
        {
#define CALL_MEMBER_FN(obj, pmf) ((obj).*(pmf))
            if (!handler_)
                throw std::runtime_error("Must call setHandler() prior to serve()");

            listener.setHandler(handler_);
            listener.bind();

            CALL_MEMBER_FN(listener, method)
            ();
#undef CALL_MEMBER_FN
        }
    };

主要分为三个步骤

在这里核心的就是bind操作,他的主要任务就是创建server-fd,即执行socket、bind、listen等操作,accept是在epoll收到可读事件后执行的操作,这属于请求处理的部分。

1.4.1 bind

void Listener::bind() { bind(addr_); }

void Listener::bind(const Address& address)
{
    addr_ = address;
    struct addrinfo hints;
    memset(&hints, 0, sizeof(struct addrinfo));
    hints.ai_family   = address.family();
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags    = AI_PASSIVE;
    hints.ai_protocol = 0;
    const auto& host = addr_.host();
    const auto& port = addr_.port().toString();
    AddrInfo addr_info;
    TRY(addr_info.invoke(host.c_str(), port.c_str(), &hints));

    int fd = -1;

    const addrinfo* addr = nullptr;
    for (addr = addr_info.get_info_ptr(); addr; addr = addr->ai_next)
    {
        auto socktype = addr->ai_socktype;
        if (options_.hasFlag(Options::CloseOnExec))
            socktype |= SOCK_CLOEXEC;

        fd = ::socket(addr->ai_family, socktype, addr->ai_protocol);
        if (fd < 0)
            continue;

        setSocketOptions(fd, options_);

        if (::bind(fd, addr->ai_addr, addr->ai_addrlen) < 0)
        {
            close(fd);
            continue;
        }

        TRY(::listen(fd, backlog_));
        break;
    }

    // At this point, it is still possible that we couldn't bind any socket. If it
    // is the case, the previous loop would have exited naturally and addr will be
    // null.
    if (addr == nullptr)
    {
        throw std::runtime_error(strerror(errno));
    }

    make_non_blocking(fd);
    poller.addFd(fd, Flags<Polling::NotifyOn>(Polling::NotifyOn::Read),
                 Polling::Tag(fd));
    listen_fd = fd;

    auto transport = transportFactory_();

    reactor_.init(Aio::AsyncContext(workers_, workersName_));
    transportKey = reactor_.addHandler(transport);
}
    listener.setTransportFactory([this, options] {
        if (!handler_)
            throw std::runtime_error("Must call setHandler()");
        auto transport = std::make_shared<TransportImpl>(handler_);
        transport->setHeaderTimeout(options.headerTimeout_);
        transport->setBodyTimeout(options.bodyTimeout_);
        return transport;
    });

其核心作用就是构造一个Transport对象。

reactor_是Reactor的对象,负责管理工作线程,因此在初始化reactor_时,以工作线程个数作为参数。而我们前面提到:“Transport类就是用于工作线程的处理函数,他主要负责当监听到用户请求时,调用Request类进行解析,然后交给用户处理程序处理,最后调用Respone类的功能将数据写回”。因此在这里我们将处理函数transport赋值给了工作线程管理类Reactor。

在执行reactor_.addHandler是返回了一个key值,并复制给了Listener的成员变量transportKey,这是因为在设计上,Reactor可以拥有多个处理函数,因此在设计上这个key值就是处理函数的索引,但是在实现上,目前来说,只有一个处理函数。

需要注意,到目前,一共出现了两个处理函数(更精准的叫法应该是处理类),一个是用户自定义的HTTP::Handler,这个主要是用于处理用户请求的;另一个则是Transport类,他本身是作为Reactor类处理函数出现的,因为线程的最终目的还是要处理用户请求,实际上就是通过Transport类来调用HTTP::Handler的接口来实现的,而且Transport类的父类是Pistache::AIO::Handler,HTTP::Handler的父类是Pistache::TCP::Handler,的确就是有两个handler.反正自己好好理解吧,实在不行请参考:Transport类Reactor类

1.4.2 run

之前提到有两个执行接口:
void Endpoint::serve() { serveImpl(&Tcp::Listener::run); }
void Endpoint::serveThreaded() { serveImpl(&Tcp::Listener::runThreaded); }
主要对应了Listener的两个Run方法 run 和 runThreaded:

    // runThreaded区别于run的地方在于,runThreaded会启动一个新的线程来执行run()
    // 这样主线程就会从runThreaded返回
    void Listener::runThreaded()
    {
        shutdownFd.bind(poller);
        acceptThread = std::thread([=]() { this->run(); });
    }

两者的区别就在于是否需要启动一个新的线程在执行epoll.wait

因此我们主要来看run()的执行过程:

    void Listener::run()
    {
        // shutdownFd使用的是Linux的eventfd机制
        // 使用epoll监控shutdownFd,当shutdownFd被写入数据(即执行shutdownFd.notify)后变为可读,
        // 从而被poller捕获,从而终止下面的for循环
        if (!shutdownFd.isBound())
            shutdownFd.bind(poller);
        reactor_.run();

        for (;;)
        {
            std::vector<Polling::Event> events;
            int ready_fds = poller.poll(events);

            if (ready_fds == -1)
            {
                throw Error::system("Polling");
            }
            for (const auto& event : events)
            {
                if (event.tag == shutdownFd.tag())
                    return;

                if (event.flags.hasFlag(Polling::NotifyOn::Read))
                {
                    auto fd = event.tag.value();
                    if (static_cast<ssize_t>(fd) == listen_fd)
                    {
                        try
                        {
                            handleNewConnection();
                        }
                        catch (SocketError& ex)
                        {
                            PISTACHE_LOG_STRING_WARN(logger_, "Socket error: " << ex.what());
                        }
                        catch (ServerError& ex)
                        {
                            PISTACHE_LOG_STRING_FATAL(logger_, "Server error: " << ex.what());
                            throw;
                        }
                    }
                }
            }
        }
    }

到这里启动过程已经完成,就等新连接到达然后被epoll捕获,进而调用handleNewConnection(),那如何处理新连接和HTTP请求呢:

二、处理请求

这里需要区别,处理连接和处理请求的区别,处理连接指的是和用户建立TCP的连接 ,即执行accept操作,而处理请求是指,建立连接后,收到HTTP请求的处理过程。

void Listener::handleNewConnection()
{
    struct sockaddr_in peer_addr;
    int client_fd = acceptConnection(peer_addr);

    make_non_blocking(client_fd);

    std::shared_ptr<Peer> peer;
    peer = Peer::Create(client_fd, Address::fromUnix(&peer_addr));

    dispatchPeer(peer);
};

2.1 handleNewConnection()

在调用函数之前,epoll已经捕获到了新连接的到达,handleNewConnection()的主要工作包括:

2.2 dispatchPeer(peer);

void Listener::dispatchPeer(const std::shared_ptr<Peer>& peer)
{
    auto handlers  = reactor_.handlers(transportKey);
    auto idx       = peer->fd() % handlers.size();
    auto transport = std::static_pointer_cast<Transport>(handlers[idx]);

    transport->handleNewPeer(peer);
}
void Listener::bind() { bind(addr_); }
void Listener::bind(const Address& address)
{

    ...

    auto transport = transportFactory_();

    reactor_.init(Aio::AsyncContext(workers_, workersName_));
    transportKey = reactor_.addHandler(transport);
}

我之前说过,一个reactor_在设计上可以拥有多个handler,transportKey值就是reactor_的handler索引,但是实现上仅仅添加了一次,因此key的值是0。

但是reactor_.handlers(transportKey)是返回了多个handler,这是因为每当执行reactor_.addHandler()时,会将handler克隆多份,复制给reactor_管理的worker线程,因此reactor_.handlers(transportKey)其实是返回的是每个线程所对应的handler,他们都是我们传入的transport的克隆体,他们拥有相同的key值(即index=0)。

换句话说,每个reactor_可以拥有多种不同handler,不同的handler拥有自己的key值,即索引值。对于同一种handler,reactor_的每个worker线程都拥有一个改handler对象的克隆。而reactor_.handlers(transportKey)返回的就是指定key(索引)的所有worker线程的handler的集合。

这样没一个handler都唯一对应一个worker线程。

2.3 transport->handleNewPeer(peer);

    void Transport::handleNewPeer(const std::shared_ptr<Tcp::Peer>& peer)
    {
        auto ctx                   = context();
        const bool isInRightThread = std::this_thread::get_id() == ctx.thread();
        if (!isInRightThread)
        {
            PeerEntry entry(peer);
            peersQueue.push(std::move(entry));
        }
        else
        {
            handlePeer(peer);
        }
        int fd = peer->fd();
        {
            Guard guard(toWriteLock);
            toWrite.emplace(fd, std::deque<WriteEntry> {});
        }
    }
    void Transport::onReady(const Aio::FdSet& fds)
    {
        for (const auto& entry : fds)
        {
            if (entry.getTag() == writesQueue.tag())
            {
                handleWriteQueue();
            }
            else if (entry.getTag() == timersQueue.tag())
            {
                handleTimerQueue();
            }
            else if (entry.getTag() == peersQueue.tag())
            {
                handlePeerQueue();
            }
            else if (entry.getTag() == notifier.tag())
            {
                handleNotify();
            }

将会在Transport::onReady处理事件,可以看到当接受到peersQueue的事件时,就会调用handlePeerQueue()。handlePeerQueue()的处理逻辑如下,即对当前Queue中数据(即待处理的peer),执行handlePeer(data->peer);

    void Transport::handlePeerQueue()
    {
        for (;;)
        {
            auto data = peersQueue.popSafe();
            if (!data)
                break;

            handlePeer(data->peer);
        }
    }

2.4 handlePeer(data->peer);

    void Transport::handlePeer(const std::shared_ptr<Peer>& peer)
    {
        int fd = peer->fd();
        peers.insert(std::make_pair(fd, peer));

        peer->associateTransport(this);

        handler_->onConnection(peer);
        reactor()->registerFd(key(), fd, NotifyOn::Read | NotifyOn::Shutdown,
                              Polling::Mode::Edge);
    }

最重要的两步操作就是:

void Handler::onConnection(const std::shared_ptr<Tcp::Peer>& peer)
    {
        peer->putData(ParserData, std::make_shared<RequestParser>(maxRequestSize_));
    }

Request对象是封装在RequestParser中的,RequestParser提供了解析HTTP请求,以填充request对象的方法,详见Request类ParserImpl类

 void Transport::onReady(const Aio::FdSet& fds){
...
    else if (entry.isReadable())
            {
                auto tag = entry.getTag();
                if (isPeerFd(tag))
                {
                    auto& peer = getPeer(tag);
                    handleIncoming(peer);
                }
...
}

可以看到,最终请求由handleIncoming(peer);处理。

2.5 handleIncoming(peer);

此函数的主要作用就是从client-fd中获取数据,在Request类ParserImpl类中我们提到,数据可能需要多次读取,因为我并不知道TCP传入的请求有多大,因此buffer的大小是保守的,所以可能需要多次的读取,这也是epoll的ET模式的标准方式。这就需要我们的解析HTTP请求的方法支持多次读取获取数据,当获取到数据后,将执行 handler_->onInput(buffer, bytes, peer);

void Transport::handleIncoming(const std::shared_ptr<Peer>& peer)
{
    char buffer[Const::MaxBuffer] = { 0 };
    int fd             = peer->fd();
    for (;;)
    {
        ssize_t bytes;
        bytes = recv(fd, buffer, Const::MaxBuffer, 0);
        if (bytes == -1)
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
            {
                
            }
            else
            {
                handlePeerDisconnection(peer);
            }
            break;
        }
        else if (bytes == 0)
        {
            handlePeerDisconnection(peer);
            break;
        }

        else
        {
            handler_->onInput(buffer, bytes, peer);
        }
    }
}

2.6 handler_->onInput(buffer, bytes, peer);

此函数,将用于解析HTTP请求,以填充request对象。

void Handler::onInput(const char* buffer, size_t len,
                      const std::shared_ptr<Tcp::Peer>& peer)
{
    auto parser   = getParser(peer);
    auto& request = parser->request;
    try
    {
        if (!parser->feed(buffer, len))
        {
            parser->reset();
            throw HttpError(Code::Request_Entity_Too_Large,
                            "Request exceeded maximum buffer size");
        }

        auto state = parser->parse();

        if (state == Private::State::Done)
        {
            ResponseWriter response(request.version(), transport(), this, peer);
            
            request.copyAddress(peer->address());

            auto connection = request.headers().tryGet<Header::Connection>();

            if (connection)
            {
                response.headers().add<Header::Connection>(connection->control());
            }
            else
            {
                response.headers().add<Header::Connection>(ConnectionControl::Close);
            }

            onRequest(request, std::move(response));
            parser->reset();
        }
    }
    catch (const HttpError& err)
    {
        ResponseWriter response(request.version(), transport(), this, peer);
        response.send(static_cast<Code>(err.code()), err.reason());
        parser->reset();
    }

    catch (const std::exception& e)
    {
        ResponseWriter response(request.version(), transport(), this, peer);
        response.send(Code::Internal_Server_Error, e.what());
        parser->reset();
    }
}

主要的流程无非就是:

相关函数我们在ParserImpl类中有详细的描述。当state == Private::State::Done为真时,表示数据完成。
此时:

然后,就进入了用户的自定义的处理函数,传入的参数包括了:

至此,从服务器启动,到接受到用户连接再到接受到Http请求的完整过程就结束了。

以至于如何实现的异步写,已经多线程的Rector模型的实现,都在对应的类中详细展开。

完结,撒花~

2.7 后记

当用户的处理逻辑结束之后,onRequest(request, std::move(response))函数返回。然后将调用parser->reset()重置request对象。之后工作线程将返回到epoll的等待循环中,等待下一个事件。

其实下一个事件一般就是用户的异步写任务,即完成respone.send的后续,这部分我们将在《Pistache源码分析 —— 异步写机制》中展开。

上一篇 下一篇

猜你喜欢

热点阅读