基于libuv/libevent的c++服务端高层协议栈
1、缘起
我要实现一个网络应用,规划了一下,无非就是这三步:
1、接收请求;
2、请求处理;
3、发送结果;
下面是5个心路历程,仅供观赏,如有雷同,纯属巧合。
stage 1:新建一个工程,申请一个socket,bind、listen、accept、receive等待用户发送过来的数据, 不幸的是accept是阻塞的,阻塞意味着这个函数没执行完,代码无法进行下一步,而要继续,只有等accept监听到了有新的客户端连接上来,于是决定用 select等待,这个程序接收部分工作的很好,及时的收到消息,能够处理后发送,伪码像这样:
bind
listern
while(){
select
accept
read
处理
发送
}
stage 2:一天有同事抱怨我这个网络程序没有响应,查了一下,发现处理代码过慢导致没有及时的epoll导致错过了某条消息,我同事解释可能网络环境太差,过会重试就好了,同事试了一下,确实如此,问题“解决”,但是正义感驱使我更新了代码,将接收和处理分为两个线程:
bind
listern
while(){
select
a:如果可以read则将数据发送给其他线程
b:如果可以写检查发送缓冲
}
while(){
处理网络数据
发送给网络线程
}
stage 3: 到了这里,这个简单的网络程序终于完美了,网络异步接收、异步处理,在某次更新后,用户发现这个程序似乎好用了很多,可能跟公司最近升级了网络带宽有什么奇怪的关系,总之,生活平静了。
终于有一天,同事抱怨在某次请求数据很多的时候,比如下载一个大文件,文件下载后发现根本无法打开,出于职业敏感,我隐约感觉到可能我的代码bug有关,果然,处理线程发送文件的时候,根本没有考虑到网络线程的发送能力,一股脑的将几百M的数据分片拷贝过去,网络线程上次的发送还没完成,又来新的,根本无力应对。于是你决定数据过多的时候sleep一下,等到发送缓冲区清空的时候再发送过去,生活再次平静,处处充满了鸟语花香。
stage 4: 让我们将目光聚焦到网络接收线程上,select这种多路复用的技术都可以提供“可读”、“可写”接口,告诉你何时该发送,虽然上次的问题解决了,还留下个sleep的尾巴,每次sleep多少成了一项技术活,多了影响发送速度,少了徒增性能,再次进化,选择信号量,发送缓冲为空的时候且需要发送的时候,发送信号给处理线程,至少在网络接收、发送这件事上,无可挑剔。
stage 5: 直到一天,我需要跨平台、大并发的时候,意识到,需要一个统一的接口来实现网络应用,开始调研已有的、非常出名的网络开源库
2、先看有什么什么样的库和api
2.1、libuv
on_read{
处理消息
uv_write
}
优点
- 没有提供网络上层应用,比较纯净
- windows默认提供IOCP
- 多平台提供统一的网络api
缺点
- 提供的例子都是单线程处理网络消息后发送应答,官方没提供异步发送的例子,如uv_async_send
2.2、libevent
socket_read_cb(bufferevent *bev, void *arg){
size_t len = bufferevent_read(bev, msg, sizeof(msg)-1 );
在这里处理网络消息....
bufferevent_write(bev, reply, strlen(reply) );
}
优点
- 文档全面,代码头文件注释丰富
缺点
- 没有针对不同平台提供统一的socket接口
- windows平台如果要使用IOCP还需要特别的设置
- 杂糅了上层应用代码,如http、rpc
- evconnlistener_new_bind 这个接口只能tcp, 不清楚为什么
最终用这两个库封装了tcp服务,两个不同的实现,同样的接口。
3、从上至下思考网络服务接口
有了上面的基础,我们知道网络协议能做什么,开源库做了什么,作为使用者我们需要什么?先看实际中的网络应用模型
3.1、实际中网络服务的框架
网络线程接收数据,根据业务决定线程并发数,理论上分为4种情况:
1、网络接收和数据处理共用一个线程,处理结束就发送,小型网络应用,几十个连接且没有复杂计算;
2、网络接收一个线程,数据处理一个线程,小型网络应用,几十个连接;
3、网络接收一个线程,数据处理N个线程,大并发,1k以上连接;
4、网络接收N个进程,数据处理N个线程(如nginx),超大并发,10k以上连接;
3.2、想象的理想接口
- 获取通知接口,get_notify, 告诉上层三个事件:1、新连接;2、新消息;3、连接断开
- 根据连接id接收网络消息
- 发送接口,如果发送过快,则被阻塞,但提供超时接口
补充:
- 新连接和连接断开的意义在于:方便上层做线程池策略
- 如果用户接收过慢,接收消息将被覆盖,网络库不可能提供很大的缓存处理接收过慢的问题
- 接口都是线程安全,方便上层多线程调用
- 库对外屏蔽细节,不提供如connection这种抽象类给用户,只有连接id
3.3 示例
线程安全的等待消息
bool tcp_server_wait_notify(
tcp_server_ctx *ctx,
uint64_t msecs,
tcp_server_msg& msg);
线程安全的发送接口,阻塞发送,可以最快的将数据发送出去
bool tcp_server_async_send(
tcp_server_ctx* ctx,
uid_type uid, //客户id
uint8_t* pdata, uint64_t dlen, //目标数据
uint64_t mses) //等待队列为空超时时间,如果缓冲区满则等待,等待超时则失败
4、为什么要这么设计
同时覆盖【 本文章节3.1】 提到的网络应用模型前3种需求,用户自行设置网络处理线程数量,业务不应该是网络库关心的,有很多网络库(如muduo)甚至提供了线程池,显然是为了应对用户业务需求而准备的,这有点过度设计。单线程无阻塞接收发送可以跑满网卡,就足够了,至于业务线程策略,可以根据客户端数量来选择线程池、或其他多线程的方式来实现,网络协议层不应该关心。
5、整体架构图
用户 -> 协议栈:服务端开始(指定监听端口)
客户端->协议栈:新连接
协议栈->用户:新客户连接 + uid
客户端->协议栈:网络消息
协议栈->用户:网络消息 + uid
用户 -> 协议栈:根据uid取消息
用户 ->用户:处理消息
用户 --> 协议栈:阻塞发送结果 + uid
协议栈-->客户端:协议栈发送应答
6、示例代码和实测性能
下面的示例代码包含两个线程,一个网络接收线程tcpSrvThread、一个http线程,http接收请求并发送结果,发送函数作为参数传给http模块,主要内容就是阻塞发送,为了应对发送大文件的情况,比如要发送200MB的文件。如果http业务过于耗时,则可以考虑增加线程池来处理。
std::thread httpAsyncThread([&app_exit, addr, port]() {
ws_tcp::server_err err = ws_tcp::no_error;
ws_tcp::tcp_server_ctx* pctx
= ws_tcp::tcp_server_start(addr, port, err);
if (pctx == nullptr) {
printf("tcp_server_start failed:%d\n",err);
return;
}
printf("listern on:%s:%d\n", addr, port);
const uint64_t default_wait_ms = 2000;
using queueValueType = std::tuple<std::string, ws_tcp::uid_type>;
safe_queue<queueValueType> input;
std::thread tcpSrvThread([
&app_exit,
pctx,
default_wait_ms,
&input]() {
const uint64_t tmpToReceiveCap = 1024 * 1024;
uint8_t* tmpToReceive = new uint8_t[tmpToReceiveCap];
while (!app_exit) {
ws_tcp::tcp_server_msg msg;
if (tcp_server_wait_notify(pctx, 20, msg)) {
if (msg.type == ws_tcp::new_connection) {
ws_tcp::addr_type addr;
if (tcp_server_get_peer(pctx,
msg.uid, addr)) {
printf("> new con:uid:%lld, from:%s:%d\n",
msg.uid,
addr.first.c_str(),
addr.second);
}
else {
printf("> new con,uid:%lld\n", msg.uid);
}
}
else if (msg.type == ws_tcp::ready_to_read) {
uint64_t size = 0;
printf("> can read,uid:%lld\n", msg.uid);
tcp_server_peek_data(
pctx,
msg.uid,
tmpToReceive,
tmpToReceiveCap,
size);
if (size) {
input.Produce(std::make_tuple(
std::string((char*)tmpToReceive, size),
msg.uid));
}
}
else if (msg.type == ws_tcp::connection_disconnected) {
printf("> disconnected,uid:%lld\n", msg.uid);
}
}
}
delete[] tmpToReceive;
});
while (!app_exit) {
queueValueType item;
if (input.Consume(item,200)) {
ws_http::serve_http(std::get<0>(item), std::get<1>(item),
[pctx, default_wait_ms](
uint8_t* data,
uint64_t dlen,
ws_tcp::uid_type uid, bool cas)->bool {
printf("async send,uid:%lld\n", uid);
return tcp_server_async_send(pctx,
uid,
data,
dlen,
default_wait_ms,
cas);
});
}
}
tcpSrvThread.join();
ws_tcp::tcp_server_stop(pctx);
});
600并发调用,速度每秒100次
- 测试命令: ./http_load -rate 100 -seconds 20 -parallel 600 urls.txt
- 硬件 64 Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz
- 操作系统:CentOS Linux release 7.7.1908 (Core)
libuv测试结果
56149 fetches, 600 max parallel, 2.31334e+07 bytes, in 20 seconds
412 mean bytes/connection
2807.45 fetches/sec, 1.15667e+06 bytes/sec
msecs/connect: 5.13405 mean, 1007.01 max, 3.427 min
msecs/first-response: 207.671 mean, 651.198 max, 5.081 min
HTTP response codes:
libevent测试结果
27553 fetches, 600 max parallel, 1.13518e+07 bytes, in 20.002 seconds
412 mean bytes/connection
1377.51 fetches/sec, 567535 bytes/sec
msecs/connect: 41.1577 mean, 7023.26 max, 3.414 min
msecs/first-response: 174.899 mean, 6715.36 max, 4.1 min
HTTP response codes:
code 200 -- 27553
结论:同样的c++接口,同样的上层代码。libuv性能高出libevent2倍有余
以上
2022/03/15
文中的libuv版本为:libuv-v1.44.1
libevent版本为:libevent-2.1.12-stable
作者qq:7956968