反应器模式+epoll 实现服务器
2018-04-25 本文已影响3人
爱秋刀鱼的猫
1. 什么是reactor模式
reactor模式是 主线程只负责监听文件描述符上面是否有事件发生,如果有的话,就立即将事件通知工作线程 或 处理新连接的线程。 除此之外,主线程不做任何实质性的事情。
特点:
- 尽可能多的使用了c++11的语法
- 基于反应器模式的服务器
- 使用了epoll来应对高并发
2. reactor + epoll 是实现一个服务器
以下是核心的伪代码:
//主线程一直在这里死循环
while (!pReatcor->m_bStop)
{
//step1 : 调用epoll_wait,判断有没有监听的时间发生
struct epoll_event ev[1024];
int n = epoll_wait(pReatcor->m_epollfd, ev, 1024, 10);
if (n == 0)
continue;
else if (n < 0)
{
std::cout << "epoll_wait error" << std::endl;
continue;
}
//step2 : 如果有监听到有事件发生
int m = min(n, 1024);
for (int i = 0; i < m; ++i){
//如果事件是 新的连接,那么通知 接受连接请求的工作线程去完成
if (ev[i].data.fd == pReatcor->m_listenfd)
pReatcor->m_acceptcond.notify_one();
//如果事件是 read/write ,那么通知 读写socket的工作线程去完成
else
{
{
std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);
pReatcor->m_listClients.push_back(ev[i].data.fd);
}
pReatcor->m_workercond.notify_one();
//std::cout << "signal" << std::endl;
}// end if
}// end for-loop
}// end while
下面是接受请求的线程的代码:
void CMyReactor::accept_thread_proc(CMyReactor* pReatcor){
std::cout << "accept thread, thread id = " << std::this_thread::get_id() << std::endl;
while (true){
int newfd;
struct sockaddr_in clientaddr;
socklen_t addrlen;
{
//accept_thread 启动了之后,如果没有请求到就会阻塞在这里
std::unique_lock<std::mutex> guard(pReatcor->m_acceptmutex);
pReatcor->m_acceptcond.wait(guard); //主线程调用notify之后,accept_thread知道有新的连接,然后开始accept
if (pReatcor->m_bStop)
break;
newfd = ::accept(pReatcor->m_listenfd, (struct sockaddr *)&clientaddr, &addrlen);
}//释放锁
if (newfd == -1)
continue;
std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":"
<< ::ntohs(clientaddr.sin_port) << std::endl;
//将新socket设置为non-blocking
int oldflag = ::fcntl(newfd, F_GETFL, 0);
int newflag = oldflag | O_NONBLOCK;
if (::fcntl(newfd, F_SETFL, newflag) == -1){
std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;
continue;
}
//将socket加入epoll的监听
struct epoll_event e;
memset(&e, 0, sizeof(e));
e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
e.data.fd = newfd;
if (::epoll_ctl(pReatcor->m_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)
{
std::cout << "epoll_ctl error, fd =" << newfd << std::endl;
}
}
std::cout << "accept thread exit ..." << std::endl;
}
下面是读写的工作线程:
void CMyReactor::worker_thread_proc(CMyReactor* pReatcor)
{
std::cout << "new worker thread, thread id = " << std::this_thread::get_id() << std::endl;
while (true)
{
//拿到一个可读的句柄
int clientfd;
{
//工作线程在这里加锁,从m_listClients里面读取一个可读的socket
//m_listClients 里面的元素就是主线程把可读的socket放入这个list
std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);
while (pReatcor->m_listClients.empty()){
if (pReatcor->m_bStop){
std::cout << "worker thread exit ..." << std::endl;
return;
}
pReatcor->m_workercond.wait(guard);
}
clientfd = pReatcor->m_listClients.front();
pReatcor->m_listClients.pop_front();
}
//拿到句柄之后,开始读写
//开始读
std::cout << std::endl; //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来
std::string strclientmsg;
char buff[256];
bool bError = false;
while (true)
{
memset(buff, 0, sizeof(buff));
int nRecv = ::recv(clientfd, buff, 256, 0);
if (nRecv == -1)
{
if (errno == EWOULDBLOCK)
break;
else
{
std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;
pReatcor->close_client(clientfd);
bError = true;
break;
}
}
//对端关闭了socket,这端也关闭。
else if (nRecv == 0){
std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;
pReatcor->close_client(clientfd);
bError = true;
break;
}
strclientmsg += buff;
}//end 读while
//出错了,就不要再继续往下执行了
if (bError)
continue;
std::cout << "client msg: " << strclientmsg;
//开始写
while (true)
{
int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);
if (nSent == -1)
{
if (errno == EWOULDBLOCK)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
else
{
std::cout << "send error, fd = " << clientfd << std::endl;
pReatcor->close_client(clientfd);
break;
}
}
std::cout << "send: " << strclientmsg;
strclientmsg.erase(0, nSent);
if (strclientmsg.empty())
break;
}//end 写while
}//end while
}
主要逻辑框架
源码
other
如果觉得有点难懂,可以先看下面的文章:
使用epoll开发服务器 :https://www.jianshu.com/p/0998e9ebfec3