Pistache源码分析 —— 异步写机制

2021-07-01  本文已影响0人  蟹蟹宁

一、前言

Pistache为了进行异步编程实现了异步写网络请求,即respone.send。它是基于Pistache的Promise以及PollableQueue机制共同实现的,流程其实和peerQueue类似,只要详细的看过前面的写的《Pistache —— Promise》以及《Pistache源码分析 —— PollableQueue类》机制后,对其原理已经不看源码,也能明白是如何实现的。

二、asyncWrite

template <typename Buf>
Async::Promise<ssize_t> asyncWrite(Fd fd, const Buf& buffer, int flags = 0)
{
    // Always enqueue responses for sending. Giving preference to consumer
    // context means chunked responses could be sent out of order.
    return Async::Promise<ssize_t>(
        [=](Async::Deferred<ssize_t> deferred) mutable {
            BufferHolder holder { buffer };
            WriteEntry write(std::move(deferred), std::move(holder), fd, flags);
            writesQueue.push(std::move(write));
        });
}
2.1 参数
2.2执行流程

三、epoll捕获写任务

void Transport::onReady(const Aio::FdSet& fds)
{
    for (const auto& entry : fds)
    {
        if (entry.getTag() == writesQueue.tag())
        {
            handleWriteQueue();
        }

看一下处理函数的实现:

void Transport::handleWriteQueue()
{
    // Let's drain the queue
    for (;;)
    {
        auto write = writesQueue.popSafe();
        if (!write)
            break;
        auto fd = write->peerFd;
        if (!isPeerFd(fd))
            continue;
        {
            Guard guard(toWriteLock);
            toWrite[fd].push_back(std::move(*write));
        }

        reactor()->modifyFd(key(), fd, NotifyOn::Read | NotifyOn::Write,
                            Polling::Mode::Edge);
    }
}

用for循环处理writeQueue:

这个时候,如果写缓冲区空闲,那么就会触发关注了POLLOUT事件fd:

else if (entry.isWritable())
{
    auto tag = entry.getTag();
    auto fd  = static_cast<Fd>(tag.value());

    {
        Guard guard(toWriteLock);
        auto it = toWrite.find(fd);
        if (it == std::end(toWrite))
        {
            throw std::runtime_error(
                "Assertion Error: could not find write data");
        }
    }

    reactor()->modifyFd(key(), fd, NotifyOn::Read, Polling::Mode::Edge);

    // Try to drain the queue
    asyncWriteImpl(fd);
}

处理函数主要做两件事:

四、asyncWriteImpl(fd)

下面这段看似复杂,其实就是就是调用send(2),sendfile(2)等系统调用来发送数据,可以先参考我翻译的手册:
linux手册翻译——send(2)
linux手册翻译——recv(2)
linux手册翻译——sendfile(2)
整个流程就是逻辑比较简单,主要需要理解非阻塞模式下的send过程:

此函数首先移除当前的写入任务,然后判断fd中是否还有其他写任务,如果没有就结束执行。

源码实现:
void Transport::asyncWriteImpl(Fd fd)
{
    bool stop = false;
    while (!stop)
    {
        std::unique_lock<std::mutex> lock(toWriteLock);

        auto it = toWrite.find(fd);

        // cleanup will have been handled by handlePeerDisconnection
        if (it == std::end(toWrite))
        {
            return;
        }
        auto& wq = it->second;
        if (wq.empty())
        {
            break;
        }

        auto& entry                       = wq.front();
        int flags                         = entry.flags;
        BufferHolder& buffer              = entry.buffer;
        Async::Deferred<ssize_t> deferred = std::move(entry.deferred);

        auto cleanUp = [&]() {
            wq.pop_front();
            if (wq.empty())
            {
                toWrite.erase(fd);
                reactor()->modifyFd(key(), fd, NotifyOn::Read, Polling::Mode::Edge);
                stop = true;
            }
            lock.unlock();
        };

        size_t totalWritten = buffer.offset();
        for (;;)
        {
            ssize_t bytesWritten = 0;
            auto len             = buffer.size() - totalWritten;

            if (buffer.isRaw())
            {
                auto raw        = buffer.raw();
                const auto* ptr = raw.data().c_str() + totalWritten;
                bytesWritten    = sendRawBuffer(fd, ptr, len, flags);
            }
            else
            {
                auto file    = buffer.fd();
                off_t offset = totalWritten;
                bytesWritten = sendFile(fd, file, offset, len);
            }
            if (bytesWritten < 0)
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {

                    auto bufferHolder = buffer.detach(totalWritten);

                    // pop_front kills buffer - so we cannot continue loop or use buffer
                    // after this point
                    wq.pop_front();
                    wq.push_front(WriteEntry(std::move(deferred), bufferHolder, flags));
                    reactor()->modifyFd(key(), fd, NotifyOn::Read | NotifyOn::Write,
                                        Polling::Mode::Edge);
                }
                // EBADF can happen when the HTTP parser, in the case of
                // an error, closes fd before the entire request is processed.
                // https://github.com/pistacheio/pistache/issues/501
                else if (errno == EBADF || errno == EPIPE || errno == ECONNRESET)
                {
                    wq.pop_front();
                    toWrite.erase(fd);
                    stop = true;
                }
                else
                {
                    cleanUp();
                    deferred.reject(Pistache::Error::system("Could not write data"));
                }
                break;
            }
            else
            {
                totalWritten += bytesWritten;
                if (totalWritten >= buffer.size())
                {
                    if (buffer.isFile())
                    {
                        // done with the file buffer, nothing else knows whether to
                        // close it with the way the code is written.
                        ::close(buffer.fd());
                    }

                    cleanUp();

                    // Cast to match the type of defered template
                    // to avoid a BadType exception
                    deferred.resolve(static_cast<ssize_t>(totalWritten));
                    break;
                }
            }
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读