c++库linux深入

基于libuv/libevent的c++服务端高层协议栈

2022-03-14  本文已影响0人  冷雨寒江

1、缘起

我要实现一个网络应用,规划了一下,无非就是这三步:

1、接收请求;

2、请求处理;

3、发送结果;

下面是5个心路历程,仅供观赏,如有雷同,纯属巧合。

stage 1:新建一个工程,申请一个socket,bind、listen、accept、receive等待用户发送过来的数据, 不幸的是accept是阻塞的,阻塞意味着这个函数没执行完,代码无法进行下一步,而要继续,只有等accept监听到了有新的客户端连接上来,于是决定用 select等待,这个程序接收部分工作的很好,及时的收到消息,能够处理后发送,伪码像这样:

bind
listern
while(){
    select
    accept
    read
    处理
    发送
}           

stage 2:一天有同事抱怨我这个网络程序没有响应,查了一下,发现处理代码过慢导致没有及时的epoll导致错过了某条消息,我同事解释可能网络环境太差,过会重试就好了,同事试了一下,确实如此,问题“解决”,但是正义感驱使我更新了代码,将接收和处理分为两个线程:

bind
listern
while(){
    select
        a:如果可以read则将数据发送给其他线程
        b:如果可以写检查发送缓冲
}   
while(){
    处理网络数据
    发送给网络线程
}

stage 3: 到了这里,这个简单的网络程序终于完美了,网络异步接收、异步处理,在某次更新后,用户发现这个程序似乎好用了很多,可能跟公司最近升级了网络带宽有什么奇怪的关系,总之,生活平静了。

终于有一天,同事抱怨在某次请求数据很多的时候,比如下载一个大文件,文件下载后发现根本无法打开,出于职业敏感,我隐约感觉到可能我的代码bug有关,果然,处理线程发送文件的时候,根本没有考虑到网络线程的发送能力,一股脑的将几百M的数据分片拷贝过去,网络线程上次的发送还没完成,又来新的,根本无力应对。于是你决定数据过多的时候sleep一下,等到发送缓冲区清空的时候再发送过去,生活再次平静,处处充满了鸟语花香。

stage 4: 让我们将目光聚焦到网络接收线程上,select这种多路复用的技术都可以提供“可读”、“可写”接口,告诉你何时该发送,虽然上次的问题解决了,还留下个sleep的尾巴,每次sleep多少成了一项技术活,多了影响发送速度,少了徒增性能,再次进化,选择信号量,发送缓冲为空的时候且需要发送的时候,发送信号给处理线程,至少在网络接收、发送这件事上,无可挑剔。

stage 5: 直到一天,我需要跨平台、大并发的时候,意识到,需要一个统一的接口来实现网络应用,开始调研已有的、非常出名的网络开源库

2、先看有什么什么样的库和api

2.1、libuv

    on_read{
    处理消息
    uv_write
    }

优点

  • 没有提供网络上层应用,比较纯净
  • windows默认提供IOCP
  • 多平台提供统一的网络api

缺点

  • 提供的例子都是单线程处理网络消息后发送应答,官方没提供异步发送的例子,如uv_async_send

2.2、libevent

    socket_read_cb(bufferevent *bev, void *arg){
        size_t len = bufferevent_read(bev, msg, sizeof(msg)-1 );  
        在这里处理网络消息....
        bufferevent_write(bev, reply, strlen(reply) );
    }

优点

  • 文档全面,代码头文件注释丰富

缺点

  • 没有针对不同平台提供统一的socket接口
  • windows平台如果要使用IOCP还需要特别的设置
  • 杂糅了上层应用代码,如http、rpc
  • evconnlistener_new_bind 这个接口只能tcp, 不清楚为什么

最终用这两个库封装了tcp服务,两个不同的实现,同样的接口。

3、从上至下思考网络服务接口

有了上面的基础,我们知道网络协议能做什么,开源库做了什么,作为使用者我们需要什么?先看实际中的网络应用模型

3.1、实际中网络服务的框架

网络线程接收数据,根据业务决定线程并发数,理论上分为4种情况:

1、网络接收和数据处理共用一个线程,处理结束就发送,小型网络应用,几十个连接且没有复杂计算;

2、网络接收一个线程,数据处理一个线程,小型网络应用,几十个连接;

3、网络接收一个线程,数据处理N个线程,大并发,1k以上连接;

4、网络接收N个进程,数据处理N个线程(如nginx),超大并发,10k以上连接;

3.2、想象的理想接口

  • 获取通知接口,get_notify, 告诉上层三个事件:1、新连接;2、新消息;3、连接断开
  • 根据连接id接收网络消息
  • 发送接口,如果发送过快,则被阻塞,但提供超时接口

补充:

  • 新连接和连接断开的意义在于:方便上层做线程池策略
  • 如果用户接收过慢,接收消息将被覆盖,网络库不可能提供很大的缓存处理接收过慢的问题
  • 接口都是线程安全,方便上层多线程调用
  • 库对外屏蔽细节,不提供如connection这种抽象类给用户,只有连接id

3.3 示例

线程安全的等待消息
    bool tcp_server_wait_notify(
        tcp_server_ctx *ctx,
        uint64_t msecs,
        tcp_server_msg& msg);

线程安全的发送接口,阻塞发送,可以最快的将数据发送出去
bool tcp_server_async_send(
        tcp_server_ctx* ctx, 
        uid_type uid,  //客户id
        uint8_t* pdata, uint64_t dlen, //目标数据
        uint64_t mses)  //等待队列为空超时时间,如果缓冲区满则等待,等待超时则失败

4、为什么要这么设计

同时覆盖【 本文章节3.1】 提到的网络应用模型前3种需求,用户自行设置网络处理线程数量,业务不应该是网络库关心的,有很多网络库(如muduo)甚至提供了线程池,显然是为了应对用户业务需求而准备的,这有点过度设计。单线程无阻塞接收发送可以跑满网卡,就足够了,至于业务线程策略,可以根据客户端数量来选择线程池、或其他多线程的方式来实现,网络协议层不应该关心。

5、整体架构图

用户 -> 协议栈:服务端开始(指定监听端口)
客户端->协议栈:新连接
协议栈->用户:新客户连接 + uid
客户端->协议栈:网络消息
协议栈->用户:网络消息 + uid
用户 -> 协议栈:根据uid取消息
用户 ->用户:处理消息
用户 --> 协议栈:阻塞发送结果 + uid
协议栈-->客户端:协议栈发送应答

6、示例代码和实测性能

下面的示例代码包含两个线程,一个网络接收线程tcpSrvThread、一个http线程,http接收请求并发送结果,发送函数作为参数传给http模块,主要内容就是阻塞发送,为了应对发送大文件的情况,比如要发送200MB的文件。如果http业务过于耗时,则可以考虑增加线程池来处理。

    std::thread httpAsyncThread([&app_exit, addr, port]() {
        ws_tcp::server_err err = ws_tcp::no_error;
        ws_tcp::tcp_server_ctx* pctx 
            = ws_tcp::tcp_server_start(addr, port, err);

        if (pctx == nullptr) {
            printf("tcp_server_start failed:%d\n",err);
            return;
        }
        printf("listern on:%s:%d\n", addr, port);

        const uint64_t default_wait_ms = 2000;
        using queueValueType = std::tuple<std::string, ws_tcp::uid_type>;
        safe_queue<queueValueType> input;

        std::thread tcpSrvThread([
            &app_exit,
            pctx,
                default_wait_ms,
                &input]() {

            const uint64_t tmpToReceiveCap = 1024 * 1024;
            uint8_t* tmpToReceive = new uint8_t[tmpToReceiveCap];
            
            while (!app_exit) {
                ws_tcp::tcp_server_msg msg;
                if (tcp_server_wait_notify(pctx, 20, msg)) {

                    if (msg.type == ws_tcp::new_connection) {

                        ws_tcp::addr_type addr;
                        if (tcp_server_get_peer(pctx,
                            msg.uid, addr)) {
                            printf("> new con:uid:%lld, from:%s:%d\n",
                                msg.uid,
                                addr.first.c_str(),
                                addr.second);
                        }
                        else {
                            printf("> new con,uid:%lld\n", msg.uid);
                        }
                    }
                    else if (msg.type == ws_tcp::ready_to_read) {
                        uint64_t size = 0;
                        printf("> can read,uid:%lld\n", msg.uid);
                        
                        tcp_server_peek_data(
                            pctx,
                            msg.uid,
                            tmpToReceive,
                            tmpToReceiveCap,
                            size);

                        if (size) {
                            input.Produce(std::make_tuple(
                                std::string((char*)tmpToReceive, size), 
                                msg.uid));
                        }
                    }
                    else if (msg.type == ws_tcp::connection_disconnected) {
                        printf("> disconnected,uid:%lld\n", msg.uid);
                    }
                }
            }

            delete[] tmpToReceive;
        });

        while (!app_exit) {

            queueValueType item;
            if (input.Consume(item,200)) {
      
                ws_http::serve_http(std::get<0>(item), std::get<1>(item), 
                    [pctx, default_wait_ms](
                    uint8_t* data,
                    uint64_t dlen,
                    ws_tcp::uid_type uid, bool cas)->bool {

                    printf("async send,uid:%lld\n", uid);

                    return tcp_server_async_send(pctx,
                        uid,
                        data,
                        dlen,
                        default_wait_ms,
                        cas);
                });
            }
        }

        tcpSrvThread.join();
        ws_tcp::tcp_server_stop(pctx);
    });

600并发调用,速度每秒100次

  • 测试命令: ./http_load -rate 100 -seconds 20 -parallel 600 urls.txt
  • 硬件 64 Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz
  • 操作系统:CentOS Linux release 7.7.1908 (Core)
libuv测试结果
56149 fetches, 600 max parallel, 2.31334e+07 bytes, in 20 seconds
412 mean bytes/connection
2807.45 fetches/sec, 1.15667e+06 bytes/sec
msecs/connect: 5.13405 mean, 1007.01 max, 3.427 min
msecs/first-response: 207.671 mean, 651.198 max, 5.081 min
HTTP response codes:
libevent测试结果
27553 fetches, 600 max parallel, 1.13518e+07 bytes, in 20.002 seconds
412 mean bytes/connection
1377.51 fetches/sec, 567535 bytes/sec
msecs/connect: 41.1577 mean, 7023.26 max, 3.414 min
msecs/first-response: 174.899 mean, 6715.36 max, 4.1 min
HTTP response codes:
  code 200 -- 27553

结论:同样的c++接口,同样的上层代码。libuv性能高出libevent2倍有余

以上
2022/03/15

文中的libuv版本为:libuv-v1.44.1
libevent版本为:libevent-2.1.12-stable
作者qq:7956968

上一篇下一篇

猜你喜欢

热点阅读