服务器开发C++老鸟新集BOOST

Boost.Asio和Libuv服务器实现案例

2017-04-12  本文已影响1438人  Brent姜

1 简介

Boost.Asio和Libuv都是非常优秀的网络通讯框架。本文使用两种技术,在CentOS上各自实现一套服务程序,实现从命名管道读取数据作为输入,然后将所有数据通过Socket/TCP转发到Socket客户端的功能。

异步数据处理就是指,任务触发后不需要等待它们完成。 相反,LibUV/Boost.Asio 会在任务完成时触发一个应用。

网络程序设计中有两种主流的设计模式:Reactor和Proactor。两者的区别,用知乎上的一个神回答解释:

如何深刻理解reactor和proactor?
reactor:能收了你跟俺说一声。
proactor: 你给我收十个字节,收好了跟俺说一声。

LibUV是Node.js底层所采用的异步通讯框架,所有的通讯机制可以在同一个线程中异步完成。Nodejs的背后是V8和libuv和IOCP(windows),libuv接受V8解析后的请求事件,用event loop来注册事件,并提交用户代码来进行实际操作,libuv是属于所谓的reactor pattern的,即用户可以注册一个”读回调“来处理发生在字节流上的读操作。

Linux下高性能的网络库中大多使用的Reactor 模式去实现,Boost Asio在Linux下用epoll和select去模拟proactor模式,影响了它的效率和实现复杂度。为什么Boost.Asio使用Proactor模式呢?借用知乎陈聪的回答来解释吧:

Windows 下很难实现高效可伸缩的 Reactor。首先,Win32 API 里 WaitForMultipleObjects 只能同时等待 64 个 handle (MAXIMUM_WAIT_OBJECTS);其次 WinSock 的 select() 实现又很 buggy,特别是在错误处理方面有很多奇葩行为(具体见各种跨平台网络库代码中对此的注释);最后,Windows Vista 新增的 WSAPoll() 函数与 POSIX 的 poll() 又不尽兼容( daniel.haxx.se/blog/201 )。

Windows 有自己的一套高效异步IO模型(几乎等同于Proactor),同时支持文件IO和网络IO;但 Linux 只有高效的网络同步IO(epoll 之类的 io multiplexing 是同步的Reactor,且不支持磁盘文件),二者的高效IO编程模型从根本上不兼容(Windows 可以把网络事件发到 GUI 线程的事件队列中,有点类似 Reactor,但是似乎一个进程只能有一个 GUI 线程,因此在多核系统上其伸缩性受限)。

因此,ASIO 要想高效且跨平台,只能用 Proactor 模型了。不可避免地会在 Linux 上损失一点儿效率。

换句话说,proactor需要利用操作系统的底层异步API,由内核线程并行进行实际的I/O读写操作,可以实现灵活的异步回调,并且在回调之前数据已经准备好并放在用户缓存中了。

在2017年,Boost.Asio可能会进入C++标准。为什么 Proactor 是最佳模型?

2 使用Libuv

2.1简介

LibUV是Node.js底层所采用的异步通讯框架,所有的通讯机制可以在同一个线程中异步完成。不过libuv不仅仅支持异步io操作,而且还具有一个强劲的线程池,用于支持多线程并行的cpu密集型操作(参考[3])。

本文描述的Libuv服务程序就仅使用了一个线程(即主线程)。这样的好处是无需做资源同步控制,代码简洁。
LibUV使用异步事件驱动编程。程序的运行主体是一个事件分发循环(event-loop)。在事件驱动编程中,程序会关注每一个事件,并且对每一个事件的发生做出反应。libuv会负责将来自操作系统的事件收集起来,或者监视其他来源的事件。这样,用户就可以注册回调函数,回调函数会在事件发生的时候被调用。event-loop会一直保持运行状态。用伪代码描述如下:

while there are still events to process:
    e = get the next event
    if there is a callback associated with e:
        call the callback

2.2 初始化Libuv

创建一个uv_loop_t*类型的全局变量。

uv_loop_t* loop;

在main()函数中进行初始化:

int main(int argc,char **argv) {
    …
    loop = uv_default_loop();
    uv_pipe_t pipe;
    uv_fs_t file_req;
    int fd=uv_fs_open(loop,&file_req,argv[2],O_CREAT|O_RDWR,0644,NULL);
    uv_pipe_init(loop, &pipe, 0);
    uv_pipe_open(&pipe,fd);
    uv_read_start((uv_stream_t*)&pipe,alloc_buffer,read_pipe);

    return uv_run(loop, UV_RUN_DEFAULT);
}

2.3 建立Socket服务器

在main()函数中构建Socket/TCP服务器:

int DEFAULT_PORT=0;
int main(int argc,char **argv) {
    …
    uv_tcp_t server;
    uv_tcp_init(loop, &server);
    uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);
    uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
    int r = uv_listen((uv_stream_t*)&server, DEFAULT_BACKLOG, on_new_connection);
    if (r) {
        fprintf(stderr, "Listen error %s\n", uv_strerror(r));
        return 1;
    }

    return uv_run(loop, UV_RUN_DEFAULT);
}

其中on_new_connection()函数用于接收客户端连接。

vector<uv_stream_t*>client_pool;
void on_new_connection(uv_stream_t *server, int status) {
    if (status < 0) {
        fprintf(stderr, "New connection error %s\n", uv_strerror(status));
        // error!
        return;
    }

    uv_tcp_t *client = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, client);
    if (uv_accept(server, (uv_stream_t*)client) == 0) {
        client_pool.push_back((uv_stream_t*)client);
        uv_read_start((uv_stream_t*)client, alloc_buffer, recv_cb);
    }
    else {
        uv_close((uv_handle_t*)client, NULL);
    }
}

在收到客户端连接时,将新建立的客户端连接对象放入client_pool数组中,作为当前客户端连接队列。
当客户端退出或异常关闭时,从该队列中删除,在recv_cb()函数中判断客户端关闭状态。recv_cb()函数在接收到客户端发送的数据时被调用,当返回的nread<0时,表明连接状态发生异常变化。

2.4实时监听管道

在main()函数中构建管道监听服务:

int main(int argc,char **argv) {
    …
    uv_pipe_t pipe;
    uv_fs_t file_req;
    int fd=uv_fs_open(loop,&file_req,argv[2],O_CREAT|O_RDWR,0644,NULL);
    uv_pipe_init(loop, &pipe, 0);
    uv_pipe_open(&pipe,fd);
    uv_read_start((uv_stream_t*)&pipe,alloc_buffer,read_pipe);

    return uv_run(loop, UV_RUN_DEFAULT);
}

当管道中出现新数据时,libuv会异步调用read_pipe()函数。

void read_pipe(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
    if (nread <0) {
        if (nread != UV_EOF)
            LOGE("Read pipe error "<<uv_err_name(nread)<<"\r\n");
        uv_close((uv_handle_t*)stream, NULL);   
        if (buf->base)
            free(buf->base);
        //free(buf);
    }
    else {
        if (nread > 0) {
            // 此处进行Socket转发
        }
        else{
            /* Everything OK, but nothing read. */
            if(buf->base)
                free(buf->base);
        }
    }

}

注意,当管道读取正常但无数据时,需要使用free(buf->base)清空缓存,因为在libuv的文档中有这么一句:
Note
nread might be 0, which does not indicate anerror or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under read(2).

2.5Socket数据转发

在管道监听回调函数read_pipe()中,向所有连接的Socket客户端转发管道中的数据。

int curmcount=0;
for(int i=0;i<client_pool.size();i++)
{               
    write_req_t *req=(write_req_t*)malloc(sizeof(write_req_t));
    req->buf=uv_buf_init(buf->base,nread);
    curmcount++;
    uv_write((uv_write_t *)req, client_pool[i], &req->buf, 1, echo_write);
}   
if(curmcount>0){
    M[buf->base] = curmcount;
}
else
{
    /* Everything OK, no client. */
    if(buf->base)
        free(buf->base);
}

每次从管道获得数据时,当客户端数量大于0时,遍历客户端列表,依次调用uv_write()进行数据发送。因为libuv是异步工作机制,我们必须确保写数据缓存一直有效,直到写操作成功返回。所以我们应该在写操作的回调函数echo_write()中删除当前写数据缓存,而不是在调用uv_write()之后立即删除,这就需要我们进行缓存管理。
我们使用一个std::map M来存储每一个写数据缓存及其使用计数的映射,通过echo_write()中计算调用次数来判断是否完成了所有客户端的异步发送工作。在echo_write()每次调用时,所对应的写数据缓存使用计数减一,当使用计数降为0时,删除缓存数据。

map<char*,int> M;
void echo_write(uv_write_t *req, int status) {
    if (status) {
        fprintf(stderr, "Write error %s\n", uv_strerror(status));
    }
    M[((write_req_t *)req)->buf.base]--;
    write_req_t *wr=(write_req_t *)req;
    if(M[((write_req_t *)req)->buf.base]==0)
       {
        auto pos=M.find(((write_req_t *)req)->buf.base);
        if(pos!=M.end())
            M.erase(pos);
        free(wr->buf.base); 
    }
    free(wr);
}

当没有客户端连接时,同样需要直接使用free(buf->base)清空缓存。
由于std::map会不断的扩大自身容器空间(非容器所存储的元素数量),通过erase()可以缩减容器空间,也可以通过swap()来一次性释放:

if (M.size() == 0){
    map<char*, int> temp;
    M.swap(temp);
}

2.6已知问题

当某些Socket客户端接收数据非常缓慢时,会导致M队列持续增长,出现内存持续增长问题。本文中不提供该问题的解决方式。

3使用Boost.Asio

3.1简介

名字本身就说明了一切:Asio意即异步输入/输出。该库可以让C++异步地处理数据,且平台独立。
使用Boost.Asio进行异步数据处理的应用程序基于两个概念:I/O服务和I/O对象。I/O服务抽象了操作系统的接口,允许第一时间进行异步数据处理,而I/O对象则用于初始化特定的操作。 鉴于Boost.Asio只提供了一个名为boost::asio::io_service的类作为I/O服务,Boost.Asio使用io_service同操作系统的输入/输出服务进行交互,它针对所支持的每一个操作系统都分别实现了优化的类,另外库中还包含了针对不同I/O对象的几个类。 其中,类boost::asio::ip::tcp::socket用于通过网络发送和接收数据,而类boost::asio::deadline_timer则提供了一个计时器,用于测量某个固定时间点到来或是一段指定的时长过去了。
不同于LibUV服务仅有一个线程的结构。在本程序中,使用两个线程,分别进行管道读取和Socket服务,后者为主线程,Asio的I/O服务运行在其中。

3.2 初始化Asio

首先,程序至少需要一个io_service实例。通常一个io_service的实例就足够了。在本程序中,使用一个全局的I/O服务对象:io_service:

boost::asio::io_service  io_service;
// ...
//其他代码
// ...
int main(int argc,char** argv)
{
    // ...
    return0;
}

3.2.1 I/O服务池vs I/O线程池

有时候为了处理某些长时间任务,需要使用多个I/O线程。
有两种I/O服务与I/O线程的对应关系:
·第一种,仅使用一个I/O服务,同时使用多个I/O线程,即多个线程同时调用io_service::run()函数:

vector<boost::shared_ptr<boost::thread> > threads;
void Start()
{
    for (int i = 0; i != nThreads; ++i)
    {
        boost::shared_ptr pTh(new boost::thread(
            boost::bind(&boost::asio::io_service::run, &io_service)));
        threads.push_back(pTh);
    }
}

此时,io_service.post()异步运行的方法可能会在线程池中的任意一个线程中被调用。如果其中涉及到资源同步问题,要么进行同步访问控制,要么使用strand[1],确保异步调用的方法被顺序执行。或者,为了避免这种问题,可以采用第二种方式。
·第二种,使用与线程同样数量的I/O服务对象,

vector<boost::shared_ptr<boost::thread> > threads;
vector<boost::shared_ptr<io_service> >io_services;
void Start()
{
    for (int i = 0; i != nThreads; ++i)
    {
        boost::shared_ptr pTh(new boost::thread(
            boost::bind(&boost::asio::io_service::run, &io_services[i])));
        threads.push_back(pTh);
    }
}

此时,每一个io_service的run()函数仅被一个线程调用。

3.3建立Socket服务器

然后你指定你想要监听的端口,再创建一个接收器——一个用来接收客户端连接的对象(boost::ip::tcp::acceptor),我们将io_service的对象引用以及接收器封装在类server中,通过start_accept()函数启动监听,并调用io_service_.run()启动Asio的I/O服务事件循环(类似于LibUV的event-loop)。
同样,为了维护客户端连接状态,使用了一个数组来保存客户端列表:

vector<boost::shared_ptr<session> >client_pool;

然后,定义一个类server来封装服务器的处理逻辑:

class server
{
public:
    server(boost::asio::io_service &io_service, string IP, short port) :io_service_(io_service), acceptor_(io_service,tcp::endpoint(address::from_string(IP), port))
    {
    }
    void run()
    {
        start_accept();
        io_service_.run();
    }
    void start_accept()
    {   
        boost::shared_ptr<session> new_session(new session(io_service_));
        acceptor_.async_accept(new_session->socket(), boost::bind(&server::handle_accept, this, new_session, boost::asio::placeholders::error));
    }
    void handle_accept(boost::shared_ptr<session> new_session, const boost::system::error_code &error)
    {
        start_accept();
        if (!error)
        {
            client_pool.push_back(new_session);
            return;
        }
    }

    // start_send() ...

private:
    boost::asio::io_service &io_service_;
    tcp::acceptor acceptor_;
};
    
server* g_s;

类server的start_accept()函数中,首先创建一个虚拟的Socket对象,通过aysnc_accept()函数来等待客户端的连接。然后当一个连接建立时,会异步调用handle_accept()函数。此时,一个简单的程序可能会建立一个线程,在线程中执行一些简单的指令,例如:

// sock为刚刚从接收器中得到的新的客户端连接对象
boost::thread(boost::bind(client_session, sock));
// ...
void client_session(socket_ptr sock) {
    while (true) {
        char data[512];
        size_t len = sock->read_some(buffer(data));
        if (len > 0)
            write(*sock, buffer("ok", 2));
    }
}

而在我们的程序中,我们仅仅是将客户端连接封装为一个类session对象,放入客户端队列中,当从管道接收到数据时,才会真正触发与客户端通信的指令。
然后,在main()函数中初始化服务器:

int main(int argc, char **argv)
{   
    sscanf(argv[1],"%d",&DEFAULT_PORT);
    g_s=new server(io_service, "0.0.0.0", DEFAULT_PORT);
    g_s->run();
    return 0;
}

我们在main()函数主线程中调用Server::run()函数,后者调用了io_service.run(),也就是说,本程序的I/O服务运行在主线程中。

3.4维护Socket连接(class session)

使用类session来封装一个远程客户端连接,包含一个socket对象和一个发送缓冲区,以及各个异步回调函数的定义。

class session :public boost::enable_shared_from_this<session>
{
public:
    session(boost::asio::io_service &io_service) :socket_(io_service)
    {
    }
    tcp::socket &socket()
    {
        return socket_;
    }
    void start_write()
    {
        //...
    }
private:
    void handle_read(const boost::system::error_code &error, size_t bytes_transferred)
    {
        if (error) return;      
    }
    void handle_write(const boost::system::error_code &error)
    {       
        //...
    }
    tcp::socket socket_;
    boost::asio::streambuf send_buffer;
};

由于仅使用了一个I/O服务和一个(默认的)I/O服务线程,客户端列表在该线程中维护,无需线程间同步访问控制。

3.5实时监听管道

利用额外的线程,通过Linux自身的文件接口访问管道,并同步监听。从管道中读取新数据后,通过I/O服务的post()函数,将接收到的数据从管道线程发送到I/O服务线程

char *pipe_name;
void read_pipe()
{   
    int fd=open(pipe_name,O_RDONLY);
    if(fd<0)
    {
        printf("Fail to open mybuf: %s .\n",strerror(errno));
        return;
    }
    int nbytes=0;
    char *buf=(char *)malloc(suggested_size);
    while(true)
    {
        nbytes=read(fd,buf,suggested_size);
        if(nbytes<=0)
        {
            continue;
        }
        boost::shared_ptr<string> msg(new string(buf,nbytes));
    
        io_service.post(boost::bind(&server::start_send, g_s, msg));
    }
    free(buf);
}
int main(int argc, char **argv)
{   
    //...
    sscanf(argv[1],"%d",&DEFAULT_PORT);
    pipe_name=argv[2];
    boost::thread(&read_pipe);
    g_s=new server(io_service, "0.0.0.0", DEFAULT_PORT);
    g_s->run();
    return 0;
}

Boost.Asio可以让我们异步地运行任何方法。仅仅需要使用post():

void my_func() {
    ...
}
io_service.post(my_func);

这样就可以保证my_func在调用了io_service.run()方法的某一个线程中被调用(参见“I/O服务池vs I/O线程池”说明)。在我们的程序中,server::start_send()函数将会在主线程中执行(因为run()在主线程被调用)。

3.6 Socket数据转发

3.6.1 async_write与async_write_some

异步TCP通讯使用async_write与async_write_some进行数据发送。async_write_some发送一段数据,但是可能最终仅发送出一部分数据,async_write内部(可能多次)调用async_write_some来确保所有数据发送成功。
不同操作系统环境下async_write_some的执行过程可能不一样。在Windows下,async_write_some会将数据发送请求交给Windows操作系统内核IOCP服务,内核执行成功之后执行回调。如过先后多次执行async_write而不是等待回调之后依次再执行,可能会导致最终发送数据的前后顺序出现错误。

3.6.2程序中代码实现

为了确保数据发送顺序的正确性,我们必须在每一次async_write执行之后,在其回调函数中继续下一次async_write操作。所以我们建立了一个全局消息队列,针对每个客户端连接各自维护一个消息队列。

map<session*, queue<boost::shared_ptr<string> > >msgQueues;
class server
{
    // ...
    void start_send(boost::shared_ptr<string> &msg)
    {       
        int csize=client_pool.size();
        for(int i=0;i<csize;i++)
        {
            msgQueues[client_pool[i].get()].push(msg);  
            if(msgQueues[client_pool[i].get()].size()==1)
            {
                client_pool[i]->start_write();
            }
        }
    }
    // ...
}

管道中收到的消息通过server::start_send()加入到各个客户端的消息队列中,当且仅当消息队列长度为1(即原队列为空),再去触发消息队列的发送,确保后续各个客户端的每一条消息都是在回调流程中顺序发送的。
发往客户端的数据处理:

class session
{
    // ...
    void start_write(){
        if(msgQueues[this].size()>0)
        {
            std::ostream stream(&send_buffer);
            string msg(*msgQueues[this].front());
            stream<<msg;
            boost::asio::async_write(socket_,send_buffer,boost::bind(&session::handle_write,shared_from_this(),boost::asio::placeholders::error));
        }           
    }
    // ...
    void handle_write(const boost::system::error_code &error)
    {       
        //队列头发送完成后删除
        msgQueues[this].pop();  
        if(error)
        {   
            //清空消息队列        
            auto cur=msgQueues.find(this);
            if(cur!=msgQueues.end())
            {
                while(!msgQueues[this].empty())
                {
                    msgQueues[this].pop();
                }
                msgQueues.erase(cur);
            }
            //客户端写错误 删除客户端
                for(auto p=client_pool.begin();p!=client_pool.end();p++)
                {
                if(p->get()==this)
                {
                    client_pool.erase(p);
                    break;
                }                  
               }
        }
        // 继续处理下一条消息
        start_write();
    }
    // ...
}

4 性能比较

依据参考资料[2]的测试分析,如果要进行性能测试,应该选择合适的评比基准。
对于LibUV和Boost.Asio,两者在Windows下都是使用IOCP完成的,所以其实最终的性能确实应该是相当的,或许最重要的还是先知道两者实现上的异同吧。

参考资料

[1]参考http://www.crazygaze.com/blog/2016/03/17/how-strands-work-and-why-you-should-use-them/,以及Remotery(https://github.com/Celtoys/Remotery),查看多线程竞争资源的运行状态。
[2] Practical difference between epoll and Windows IO Completion Ports (IOCP)
[3] 征服优雅、高效的Libuv库之初识篇

上一篇下一篇

猜你喜欢

热点阅读