I/O复用模型:epoll

2021-08-16  本文已影响0人  404Not_Found

epoll 的不同之处

用法不同之处

epoll 是 linux下多路复用IO接口 select/poll 的增强版, 它能显著提高程序在大量并发链接中只有少量活跃情况下的系统CPU利用率。
原因:

  1. 不用在每次等待事件之前,都必须重新准备要被侦听的文件描述符集合。(无需事先准备监控容器)
  2. 获取事件的时候,无需遍历监控容器,从而找到监控集(fd != -1),并且找到真正有I/O的文件描述符。

所以只要比哪里哪些被内核IO事件异步唤醒,而加入Ready队列的文件描述符即可。

select legacy 行为:


select.png

select 组织 文件时放在 while(1)中的

/*select*/
while(1) {
  rset = allset //while(1) 中重新组监控文件集合
  select(maxfd + 1, &reset, NULL, NULL, NULL);
}

其实从 poll 开始, 组织文件集就已经放在了 while(1) 外


poll.png
/*
poll
struct pollfd client[OPEN_MAX];
*/
while(1) {
  poll(client, maxfd+1, -1);
}

epoll 的实现 更是如此。poll/epoll 都是好莱坞模式,监控的事件提前注册好,等着演就行了。

反应堆的不同

select/poll 反应堆.png epoll 反应堆.png

epoll 反应堆是一棵二叉树, 想监控哪个fd就放进来。
epoll会监控这棵树,哪个fd 的 读/写 触发了。最终会在用户空间返回一个ready的就绪队列。即反应集。

平衡二叉树查找的事件复杂度是 log(2n);

所以epoll 拿到的是一个真正的反应集合。
有一种说法是,epoll 适合大并发量,少活跃度的情况(就是连接虽然多,但是请求很少)。有待验证。

epoll API

  1. 创建epoll句柄,参数size 告诉内核监听额文件描述符个数,根内存大小有关
int epoll_create(int size);
  1. 控制某个epoll监控的FD 中的事件:注册、修改、删除
int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event);

epfd: 为epoll_create 的句柄
op: 为表示的动作,用3个宏来表示

EPOLL_CTL_ADD // 注册新的fd 到 epfd
EPOLL_CTL_MOD // 修改已经注册的fd 的监听事件
EPOLL_CTL_DEL  // 从epfd中删除一个fd

event: 告诉内核需要监听的事件

struct epoll_event {
  _unit32t events; /*Epoll Events*/
  epoll_data_t data; /*User data varaible*/
};

/*event*/
EPOLLIN:对应的fd可以读(包括对端socket正常关闭)
EPOLLOUT: 对应的fd描述符可写
EPOLLPRI: 对应的fd有紧急的数据可读(非外带数据)
EPOLLERR: 对应的fd 发生错误
EPOLLHUP: 对应的fd被挂起
EPOLLET:监控的fd 设为边缘触发模式(edge triggered)
EPOLLONESHOT: 只监控一次,监听完这次事件后,如果还想监听,则还需要手动加到EPOLL队列中。

/*联合体,看具体情况使用哪个*/
typedef union epoll_data {
  void *ptr;
  int     fd;
  uint32_t   u32;
  uint64_t   u64;
} epoll_data_t;
  1. 等待监控文件描述符上有事件产生
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

events: 用来从内核中拿到命中靶子的命中点
maxevents: 告知内核这个evets多大,这个maxevents的值不能大于创建epoll_create()的size.
timeout: 超时 事件
return,nready 有IO反应的个数

epoll 代码举例

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <ctype.h>
#include "my_socket.h"
#define MAXLINE 80
#define SERV_PORT 8888
#define OPEN_MAX 1024

int main(int argc, char **argv)
{
   int i, j, listenfd, connfd, sockfd;
   int nready, efd, res;
   ssize_t n;
   char buf[MAXLINE], str[INET_ADDRSTRLEN];

   struct sockaddr_in cliaddr, servaddr;
   struct epoll_event tep, ep[OPEN_MAX];

   listenfd = Socket(AF_INET, SOCK_STREAM, 0);

   bzero(&servaddr, sizeof(servaddr));
   servaddr.sin_family = AF_INET;
   servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
   servaddr.sin_port = htons(SERV_PORT);

   Bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
   Listen(listenfd, 20);

   efd = epoll_create(OPEN_MAX);
   if(efd == -1)
       perr_exit("epoll_create");

   tep.events = EPOLLIN;
   tep.data.fd = listenfd;
   res = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &tep);
   if(res == -1)
       perr_exit("epoll_ctl");

   while(1)
   {
       nready = epoll_wait(efd, ep, OPEN_MAX, -1);

       if(nready == -1)
           perr_exit("epoll_wait");
       for(i = 0; i < nready; i++)
       {
           if((ep[i].events & EPOLLIN))
               continue;
           if(ep[i].data.fd == listenfd)
           {
              connfd = Accept(listenfd, (struct sockaddr*)&cliaddr, &clilen);
              printf("received from %s at PORT %d\n",
                     inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
                     noths(cliaddr.sin_port));

              tep.events = EPOLLIN;
              tep.data.fd = connfd;
              res = epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &tep);
              if(res == -1)
                  perr_exit("poll_ctl");
           }
           else
           {
               sockfd = ep[i].data.fd;
               n = Read(sockfd, buf, MAXLINE);
               if(n == 0)
               {
                   res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL);
                   if(res == -1)
                       perr_exit("epoll_ctl");
                   Close(sockfd);
                   printf("client[%d] closed connection\n", sockfd);
               }
               else
               {
                  for(j = 0; j< n; j++)
                      buf[j] = toupper(buf[j]);
                  Write(sockfd, buf, n);
               }
           }
       }
   }

    close(listenfd);
    close(efd);

    return 0;
}

代码分析:

int efd = epoll_create(OPEN_MAX);
struct epoll_event tep;
tep.event = EPOLLIN;
tep.data.fd = listenfd;

struct epoll_event ep[OPEN_MAX];
res = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &tep);
nready = epoll_wait(efd, ep, OPEN_MAX, -1);
for( i = 0; i < nready; i++ )
{
    if(!(ep[i].events & EPOLLIN)
        continue;
    if(ep[i].data.fd == listenfd)
    {
        /* 对于listenfd 的处理 */
    }
    else
    {
      /*对于 connfd的处理*/
    }
}
上一篇 下一篇

猜你喜欢

热点阅读