Pistache源码分析 —— 异步写机制
一、前言
Pistache为了进行异步编程实现了异步写网络请求,即respone.send
。它是基于Pistache的Promise以及PollableQueue机制共同实现的,流程其实和peerQueue类似,只要详细的看过前面的写的《Pistache —— Promise》以及《Pistache源码分析 —— PollableQueue类》机制后,对其原理已经不看源码,也能明白是如何实现的。
二、asyncWrite
template <typename Buf>
Async::Promise<ssize_t> asyncWrite(Fd fd, const Buf& buffer, int flags = 0)
{
// Always enqueue responses for sending. Giving preference to consumer
// context means chunked responses could be sent out of order.
return Async::Promise<ssize_t>(
[=](Async::Deferred<ssize_t> deferred) mutable {
BufferHolder holder { buffer };
WriteEntry write(std::move(deferred), std::move(holder), fd, flags);
writesQueue.push(std::move(write));
});
}
2.1 参数
-
Fd fd
要写入数据的文件描述符,在这是由accept()创建的client-fd,由peer类保存。 -
const Buf& buffer
要发送的数据,这里的Buf是模板,可以是RawBuffer或者FileBuffer,这两个类比较简单,就不详细展开了 -
int flags = 0
这是send(2)系统调用中使用的参数,因为最终还是调用send(2)/sendfile(2)来进行网络数据的传送
2.2执行流程
-
BufferHolder holder { buffer }
将要写的数据进行包装到BufferHolder中,BufferHolder将RawBuffer和FileBuffer统一封装,并提供了重要的offset字段,以标志数据发送到哪了,BufferHolder也不复杂,就不展开了。 - 将数据打包到WriteEntry中
WriteEntry中有一个重要的字段,那就是deferred,这是Promise给我们的进行数据填充的接口,见Promise - 放到writesQueue中
到这里asyncWrite,就可以返回了,worker线程的epoll会监听到writesQueue放入了数据,然后进行处理
三、epoll捕获写任务
void Transport::onReady(const Aio::FdSet& fds)
{
for (const auto& entry : fds)
{
if (entry.getTag() == writesQueue.tag())
{
handleWriteQueue();
}
看一下处理函数的实现:
void Transport::handleWriteQueue()
{
// Let's drain the queue
for (;;)
{
auto write = writesQueue.popSafe();
if (!write)
break;
auto fd = write->peerFd;
if (!isPeerFd(fd))
continue;
{
Guard guard(toWriteLock);
toWrite[fd].push_back(std::move(*write));
}
reactor()->modifyFd(key(), fd, NotifyOn::Read | NotifyOn::Write,
Polling::Mode::Edge);
}
}
用for循环处理writeQueue:
- 将 writeEntry 放到toWrite中
toWrite是Transport的字段,是一个哈希映射,用于记录每个连接(client-fd)的写请求(writeEntry)。每个transport或者说每个worker线程同时处理多个连接(client-fd),而每个连接,有可能存在多个写请求。
std::unordered_map<Fd, std::deque<WriteEntry>> toWrite;
- 修改存在写请求的连接的epoll关注事件
因为只要写缓冲区空闲,就可以报告写事件,这样就会一直受到POLLOUT事件(ET模式可能不会一直报告),因此在存在写请求的时候,开始关注POLLOUT事件是很合理的
这个时候,如果写缓冲区空闲,那么就会触发关注了POLLOUT事件fd:
else if (entry.isWritable())
{
auto tag = entry.getTag();
auto fd = static_cast<Fd>(tag.value());
{
Guard guard(toWriteLock);
auto it = toWrite.find(fd);
if (it == std::end(toWrite))
{
throw std::runtime_error(
"Assertion Error: could not find write data");
}
}
reactor()->modifyFd(key(), fd, NotifyOn::Read, Polling::Mode::Edge);
// Try to drain the queue
asyncWriteImpl(fd);
}
处理函数主要做两件事:
- 就是停止POLLOUT事件
目的是避免频繁的收到POLLOUT事件,直到又有了新的请求再开启,这与epoll的EPOLLNESHOT标志很像,EPOLLNESHOT标志会在接受到时间后,停止对fd的监控,一般用于多线程中,避免多个线程同时操作同一个fd,但是这里如果用EPOLLNESHOT标志的话,POLLOIN也无法收到了。
这里需要注意的是,上述整个过程,都是在同一个线程中实现的,这就是所谓的经典的epoll事件循环,send是发生在Readable(即HTTP请求到达)的事件处理,而异步写是实现是在writeQueue和Writable,他们都在一个线程中被处理,是不会并发的。 - asyncWriteImpl(fd)
开始执行写数据
四、asyncWriteImpl(fd)
下面这段看似复杂,其实就是就是调用send(2),sendfile(2)等系统调用来发送数据,可以先参考我翻译的手册:
linux手册翻译——send(2)
linux手册翻译——recv(2)
linux手册翻译——sendfile(2)
整个流程就是逻辑比较简单,主要需要理解非阻塞模式下的send过程:
- 首先我们是在收到了epoll的可写事件后再调用此函数的,说明当前的套接字(
fd
)是可以写入的 - send()和sendfile()在成功时,返回的是写入的字节大小,显然成功写入多少是取决于当前TCP套接字的写入缓冲区的剩余空间的
- 失败时,将返回-1 ,这个时候可能的情况:
-
errno == EAGAIN || errno == EWOULDBLOCK
这是典型的发生阻塞,说明当前fd不能写入了,返回值是成功写入字节数,接下来我们进行了以下操作:- 扣除了原buf已经成功发生的部分,然后将新任务内容重新放到了队列的头部
auto bufferHolder = buffer.detach(totalWritten); wq.pop_front(); wq.push_front(WriteEntry(std::move(deferred), bufferHolder, flags));
- 重新修改epoll,令其监听POLLOUT
reactor()->modifyFd(key(), fd, NotifyOn::Read | NotifyOn::Write, Polling::Mode::Edge);
这样就保证了任务写入的完整性,即一旦任务1开始写,那么在任务1写完之前,不能写任务2的内容。因为新的任务都是从队尾追加的,我们将优先处理头部的任务。然后这个时候理论上应该是推出整个循环的以等待fd的可写事件,但是代码中并没有退出循环,我认为这是一个BUG,我在Github上提交了这个ISSUE 。我认为这种设计是为了实现flush()函数,为了保证flush()会在将
toWrite
的所有缓冲区清空之前,不能返回。所以我的ISSUE 也不完善,因为没有考虑flush(),我觉得可以实现一个同步的写实现,专用于flush()。 -
errno == EBADF || errno == EPIPE || errno == ECONNRESET
这是fd出现了问题,写入已经无法完成,这时候需要将其从toWrite
中删除 -
其他的错误
作者认为,当前进行的写入任务出了问题,但是fd还有救,因此执行了cleanUp():auto cleanUp = [&]() { wq.pop_front(); if (wq.empty()) { toWrite.erase(fd); reactor()->modifyFd(key(), fd, NotifyOn::Read, Polling::Mode::Edge); stop = true; } lock.unlock() };
-
此函数首先移除当前的写入任务,然后判断fd中是否还有其他写任务,如果没有就结束执行。
- 如果执行成功,那么将判断写入数据总数是否已经等于要写入的数据总数,如果是那么执行上面的cleanUp(),如果没有,则继续循环执行send()和sendfile()
源码实现:
void Transport::asyncWriteImpl(Fd fd)
{
bool stop = false;
while (!stop)
{
std::unique_lock<std::mutex> lock(toWriteLock);
auto it = toWrite.find(fd);
// cleanup will have been handled by handlePeerDisconnection
if (it == std::end(toWrite))
{
return;
}
auto& wq = it->second;
if (wq.empty())
{
break;
}
auto& entry = wq.front();
int flags = entry.flags;
BufferHolder& buffer = entry.buffer;
Async::Deferred<ssize_t> deferred = std::move(entry.deferred);
auto cleanUp = [&]() {
wq.pop_front();
if (wq.empty())
{
toWrite.erase(fd);
reactor()->modifyFd(key(), fd, NotifyOn::Read, Polling::Mode::Edge);
stop = true;
}
lock.unlock();
};
size_t totalWritten = buffer.offset();
for (;;)
{
ssize_t bytesWritten = 0;
auto len = buffer.size() - totalWritten;
if (buffer.isRaw())
{
auto raw = buffer.raw();
const auto* ptr = raw.data().c_str() + totalWritten;
bytesWritten = sendRawBuffer(fd, ptr, len, flags);
}
else
{
auto file = buffer.fd();
off_t offset = totalWritten;
bytesWritten = sendFile(fd, file, offset, len);
}
if (bytesWritten < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
auto bufferHolder = buffer.detach(totalWritten);
// pop_front kills buffer - so we cannot continue loop or use buffer
// after this point
wq.pop_front();
wq.push_front(WriteEntry(std::move(deferred), bufferHolder, flags));
reactor()->modifyFd(key(), fd, NotifyOn::Read | NotifyOn::Write,
Polling::Mode::Edge);
}
// EBADF can happen when the HTTP parser, in the case of
// an error, closes fd before the entire request is processed.
// https://github.com/pistacheio/pistache/issues/501
else if (errno == EBADF || errno == EPIPE || errno == ECONNRESET)
{
wq.pop_front();
toWrite.erase(fd);
stop = true;
}
else
{
cleanUp();
deferred.reject(Pistache::Error::system("Could not write data"));
}
break;
}
else
{
totalWritten += bytesWritten;
if (totalWritten >= buffer.size())
{
if (buffer.isFile())
{
// done with the file buffer, nothing else knows whether to
// close it with the way the code is written.
::close(buffer.fd());
}
cleanUp();
// Cast to match the type of defered template
// to avoid a BadType exception
deferred.resolve(static_cast<ssize_t>(totalWritten));
break;
}
}
}
}
}