Linux

反应器模式+epoll 实现服务器

2018-04-25  本文已影响3人  爱秋刀鱼的猫
1. 什么是reactor模式

reactor模式是 主线程只负责监听文件描述符上面是否有事件发生,如果有的话,就立即将事件通知工作线程 或 处理新连接的线程。 除此之外,主线程不做任何实质性的事情。

特点:
  1. 尽可能多的使用了c++11的语法
  2. 基于反应器模式的服务器
  3. 使用了epoll来应对高并发
2. reactor + epoll 是实现一个服务器

以下是核心的伪代码:

//主线程一直在这里死循环
while (!pReatcor->m_bStop)
{
    //step1 : 调用epoll_wait,判断有没有监听的时间发生 
    struct epoll_event ev[1024];
    int n = epoll_wait(pReatcor->m_epollfd, ev, 1024, 10);
    if (n == 0)
        continue;
    else if (n < 0)
    {
        std::cout << "epoll_wait error" << std::endl;
        continue;
    }

    //step2 : 如果有监听到有事件发生
    int m = min(n, 1024);
    for (int i = 0; i < m; ++i){
        //如果事件是 新的连接,那么通知 接受连接请求的工作线程去完成
        if (ev[i].data.fd == pReatcor->m_listenfd)
            pReatcor->m_acceptcond.notify_one();
        //如果事件是 read/write ,那么通知 读写socket的工作线程去完成
        else
        {
            {
                std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);
                pReatcor->m_listClients.push_back(ev[i].data.fd);
            }
            pReatcor->m_workercond.notify_one();
            //std::cout << "signal" << std::endl;
        }// end if
    }// end for-loop
}// end while

下面是接受请求的线程的代码:

void CMyReactor::accept_thread_proc(CMyReactor* pReatcor){
    std::cout << "accept thread, thread id = " << std::this_thread::get_id() << std::endl;
    while (true){
        int newfd;
        struct sockaddr_in clientaddr;
        socklen_t addrlen;
        {
            //accept_thread 启动了之后,如果没有请求到就会阻塞在这里
            std::unique_lock<std::mutex> guard(pReatcor->m_acceptmutex);
            pReatcor->m_acceptcond.wait(guard);   //主线程调用notify之后,accept_thread知道有新的连接,然后开始accept
            if (pReatcor->m_bStop)
                break;
            newfd = ::accept(pReatcor->m_listenfd, (struct sockaddr *)&clientaddr, &addrlen);
        }//释放锁

        if (newfd == -1)
            continue;
        std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" 
        << ::ntohs(clientaddr.sin_port) << std::endl;

        //将新socket设置为non-blocking
        int oldflag = ::fcntl(newfd, F_GETFL, 0);
        int newflag = oldflag | O_NONBLOCK;
        if (::fcntl(newfd, F_SETFL, newflag) == -1){
            std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;
            continue;
        }

        //将socket加入epoll的监听
        struct epoll_event e;
        memset(&e, 0, sizeof(e));
        e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
        e.data.fd = newfd;
        if (::epoll_ctl(pReatcor->m_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)
        {
            std::cout << "epoll_ctl error, fd =" << newfd << std::endl;
        }
    }
    std::cout << "accept thread exit ..." << std::endl;
}

下面是读写的工作线程:

void CMyReactor::worker_thread_proc(CMyReactor* pReatcor)
{
    std::cout << "new worker thread, thread id = " << std::this_thread::get_id() << std::endl;
    while (true)
    {
        //拿到一个可读的句柄
        int clientfd;
        {
            //工作线程在这里加锁,从m_listClients里面读取一个可读的socket
            //m_listClients 里面的元素就是主线程把可读的socket放入这个list
            std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);
            while (pReatcor->m_listClients.empty()){
                if (pReatcor->m_bStop){
                    std::cout << "worker thread exit ..." << std::endl;
                    return;
                }       
                pReatcor->m_workercond.wait(guard);
            }
            clientfd = pReatcor->m_listClients.front();
            pReatcor->m_listClients.pop_front();
        }
        
        //拿到句柄之后,开始读写
        //开始读
        std::cout << std::endl;  //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来
        std::string strclientmsg;
        char buff[256];
        bool bError = false;
        while (true)
        {
            memset(buff, 0, sizeof(buff));
            int nRecv = ::recv(clientfd, buff, 256, 0);
            if (nRecv == -1)
            {
                if (errno == EWOULDBLOCK)
                    break;
                else
                {
                    std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;
                    pReatcor->close_client(clientfd);
                    bError = true;
                    break;
                }

            }
            //对端关闭了socket,这端也关闭。
            else if (nRecv == 0){
                std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;
                pReatcor->close_client(clientfd);
                bError = true;
                break;
            }
            strclientmsg += buff;
        }//end 读while
        //出错了,就不要再继续往下执行了
        if (bError)
            continue;
        std::cout << "client msg: " << strclientmsg;

        //开始写
        while (true)
        {
            int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);
            if (nSent == -1)
            {
                if (errno == EWOULDBLOCK)
                {
                    std::this_thread::sleep_for(std::chrono::milliseconds(10));
                    continue;
                }
                else
                {
                    std::cout << "send error, fd = " << clientfd << std::endl;
                    pReatcor->close_client(clientfd);
                    break;
                }
            }
            std::cout << "send: " << strclientmsg;
            strclientmsg.erase(0, nSent);
            if (strclientmsg.empty())
                break;
        }//end 写while
    }//end while
}
主要逻辑框架
源码

https://github.com/GreenGitHuber/code_something/tree/master/epoll%2Breactor%E6%A8%A1%E5%BC%8F%E7%9A%84%E6%9C%8D%E5%8A%A1%E5%99%A8

other

如果觉得有点难懂,可以先看下面的文章:
使用epoll开发服务器 :https://www.jianshu.com/p/0998e9ebfec3

上一篇下一篇

猜你喜欢

热点阅读