关于boost的async_与io_serivce.run的联系

2019-02-18  本文已影响0人  高斯_Cpper

前序:
现在很多服务器都使用boost::asio作为异步socket通信,但很多人只会copy其中的代码,却不了解io_serivce与async_函数之间的联系,下面我们就来深入了解它们之间的关系,并怎么利用它们来实现高性能服务器。

我们知道async_的函数所绑定的函数是异步的形式,它在系统完成后并不会立即回调(io_serivce.run没有调用),而是要等到io_serivce.run时才会调用,所以io_serivce.run是一个死循环函数。

由于高性能服务器通常会有两种方式去实现
一、多线程io收发数据,主线程处理逻辑(当前不考虑多进程的情况)
二、多线程io收发数据,多线程处理逻辑(这种情况需要考虑到线程安全问题)

如果让io_serivce实现多线程收发数据是关键,我们了解到io_serivce.run是处理系统io数据的回调,所以可以用不同的线程去调用run()函数,这样就实现了多线程io的回调,但问题在于客户端的连接处理也是在不同的线程,这样就会有线程安全问题了。
-------------------------------------------------------------例子 ---------------------------------------------------------
int main()
{
boost::asio::io_service io_svc;
std::vector<std::thread> vecThread;
for (int i = 0; i < 5; ++i)
{
vecThread.emplace_back(std::thread({ io_svc.run(); }));
}
return 0;
}

如果按上面的代码将一个io_svc 分配多个线程按并发处理,处理的对象是TCP 服务时。发现对同一个TCP 客户端的连接的处理会在多个线程中。

如果是这样的话,那这一个客户端发送过来数据被分成多段后,在服务端会不会乱,而这样我是不是需要加锁控制,可如果加锁的话又应该怎么加呢?

按时我的想法是,如果 是同一个话,它如果只在同一个线程中那么这个我就不用担心了,因为它会顺序到达,那么我收到的头部和数据部就不会被分到两个线程中去了。

这主要问题是TCP 服务端使用异步的方式接收客户端的数据,首先收到固定的头部数据,然后解析头部得到数据部的长度,然后再次异步接收实际的数据。如果这两个步骤被分到了两个线程中去了。导致了这两个部分发生了并发操作,那就尴尬了。

以上是摘子网友的一个疑问,确实是会产生接收到数据会到不同的线程中去,所以在使用async_函数要加以控制,能使它合理的回调。
下面某个网友回复的做法

方法:单个线程用于数据接收,放到一个队列,开多个消费线程用于数据处理

这种方法是可以做到线程安全的问题,但问题在于io效率比较低,所以不推荐。

比较好的做法,多线程收数据,这里采用收head再收body,async这里要绑定具体长度,当收到数据时,写入buffer时,要加锁,原因是处理事务的线程也要取buffer的数据,所以要锁。

下面收数据的一个例子

void NetSocket::ReadMsg(mutable_buffers_1& _buffers,uint32_t dataLen)
{
    async_read(*this, _buffers, transfer_exactly(dataLen),
        boost::bind(&NetSocket::RecvMsg, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
    TimeoutStart();  // 处理长时间未收到数据的socket
}

void NetSocket::RecvMsg(const boost::system::error_code& ec, size_t bytes_transferred)
{
    TimeoutCancel();  // 取消超时处理
    if (ec)
    {
        printf("[ERROR]:Recv error msg,%s\n", ec.message().c_str());
        assert(0);
        SetLocalClose(true);
        (m_pNotify)(*this);
        return;
    }

    if (m_eRecvStage == REVC_FSRS_HEAD)   //处理头
    {
        m_eRecvStage = REVC_FSRS_BODY;
        uint32_t nextSizeLen = 0;
        memcpy(&nextSizeLen, m_arrRecvBuffer, bytes_transferred);
        ReadMsg(m_cRecvBodyBuffer, nextSizeLen);
    }
    else   //处理body,这里收到的长度刚好为一个body
    {
        
        uint32_t nextSizeLen = 0;
        memcpy(&nextSizeLen, m_arrRecvBuffer, PACKAGE_HEADER_SIZE);
        assert(nextSizeLen == bytes_transferred);

        boost::mutex::scoped_lock lock(m_writeBufferMutex);
        DataBuffer* dataBuff = new DataBuffer((uint8_t*)(m_arrRecvBuffer + PACKAGE_HEADER_SIZE), bytes_transferred);
        if (recvQueueHeader_ == NULL && recvQueueLastest_ == NULL)
        {
            recvQueueHeader_ = dataBuff;
            recvQueueLastest_ = dataBuff;
        }
        else
        {
            recvQueueLastest_->next = dataBuff;
            recvQueueLastest_ = recvQueueLastest_->next;
        }
        lock.unlock();
        m_eRecvStage = REVC_FSRS_HEAD;
        ReadMsg(m_cRecvHeadBuffer, PACKAGE_HEADER_SIZE);
        (m_pNotify)(*this);
    }
}
```cpp
-------------------------------
上面的代码解决了线程安全的问题,现在讲如何开启io_service多线程的代码
1、在创建一个NetServer时,就预先创建一定数量的socket,如1000个
2、同时绑定1000个socket的async_accept,这就相当于可以同时并发1000个连接过来
3、创建8个线程去调用io_service.run()。

上一篇下一篇

猜你喜欢

热点阅读