epoll的简单使用

2017-03-17  本文已影响0人  一叶之界

epoll的功能:实现I/O复用,即多路I/O。

一、epoll的系统调用函数

epoll只有epoll_create,epoll_ctl 和 epoll_wait 3个系统调用函数

1、epoll_create(int size)

创建一个epoll的句柄,size表示内核监听的数目量。

注意:当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

函数声明:epoll_create(int size);

注释:生成一个epoll的专用描述符,其实是在内核中申请一块空间,用来监听想要监听的socket fd上发生了什么事件。size是epoll监听的最大socket fd数目。

2、epoll_ctl (int epfd, int op, int fd, struct epoll_event *event)

将被监听的描述符添加到epoll句柄或从epool句柄中删除或者对监听事件进行修改。该函数用于控制某个epoll文件描述符上的事件,可以注册事件,修改事件,删除事件。
函数声明:epoll_ctl (int epfd, int op, int fd, struct epoll_event *event);

参数:
epfd:由 epoll_create 生成的epoll专用的文件描述符;
op:要进行的操作例如注册事件,可能的取值EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修改、EPOLL_CTL_DEL 删除;
fd:关联的文件描述符;
event:指向epoll_event的指针,即在内核中需要监听什么事件。

返回值:成功返回0,失败返回1。

// structepoll_event 的结构体:
typedef union epoll_data 
{  
  void *ptr;  
  int fd;  
  __uint32_t u32;  
  __uint64_t u64;  
} epoll_data_t;  
struct epoll_event 
{
  __uint32_t events; /* Epoll events */  
  epoll_data_t data; /* User data variable */  
};

events可以是以下几个宏的集合:

  1. EPOLLIN: 触发该事件,表示对应的文件描述符上有可读数据。(包括对端SOCKET正常关闭);
  2. EPOLLOUT: 触发该事件,表示对应的文件描述符上可以写数据;
  3. EPOLLPRI: 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
  4. EPOLLERR: 表示对应的文件描述符发生错误;
  5. EPOLLHUP: 表示对应的文件描述符被挂断;
  6. EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
  7. EPOLLONESHOT: 只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。
// 案例:
//定义epoll_event事件的变量
struct epoll_event ev;
//设置与要处理的事件相关的文件描述符
ev.data.fd=listenfd;
//设置要处理的事件类型
ev.events=EPOLLIN|EPOLLET;
//注册epoll事件
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);

3、int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)

等待事件的发生。参数events用来从内核得到事件的集合,maxevents表示内核events有多大(数组成员的个数),这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。
函数声明:int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

参数:
epfd:由epoll_create 生成的epoll专用的文件描述符;
epoll_event:用于回传代处理事件的数组;
maxevents:每次能处理的事件数;
timeout:等待I/O事件发生的超时值(单位我也不太清楚);-1相当于阻塞,0相当于非阻塞。一般用-1即可返回发生事件数。

返回值:成功返回处理事件的数目,失败返回0,表示已超时;

二、epoll的两种模式ET和LT

假如有这样一个例子:(LT方式,即默认方式下,内核会继续通知,可以读数据,ET方式,内核不会再通知,可以读数据)

  1. 我们已经把一个用来从管道中读取数据的文件句柄(RFD)添加到epoll描述符
  2. 这个时候从管道的另一端被写入了2KB的数据
  3. 调用epoll_wait(2),并且它会返回RFD,说明它已经准备好读取操作
  4. 然后我们读取了1KB的数据
  5. 调用epoll_wait(2)......
Edge Triggered工作模式:

如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait(2)之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候ET工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。在上面的例子中,会有一个事件产生在RFD句柄上,因为在第2步执行了一个写操作,然后,事件将会在第3步被销毁。因为第4步的读取操作没有读空文件输入缓冲区内的数据,因此我们在第5步调用epoll_wait(2)完成后,是否挂起是不确定的。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷。(LT方式可以解决这种缺陷)
i 基于非阻塞文件句柄
ii 只有当read(2)或者write(2)返回EAGAIN时(认为读完)才需要挂起,等待。但这并不是说每次read()时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read()返回的读到的数据长度小于请求的数据长度时(即小于sizeof(buf)),就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。

Level Triggered工作模式(默认的工作方式)

以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll(2),并且无论后面的数据是否被使用,因此他们具有同样的职能。因为即使使用ET模式的epoll,在收到多个chunk的数据的时候仍然会产生多个事件。调用者可以设定EPOLLONESHOT标志,在epoll_wait(2)收到事件后epoll会与事件关联的文件句柄从epoll描述符中禁止掉。因此当EPOLLONESHOT设定后,使用带有EPOLL_CTL_MOD标志的epoll_ctl(2)处理文件句柄就成为调用者必须作的事情。

ET和LT的区别:
  1. LT(leveltriggered)是缺省的工作方式,并且同时支持block和no-blocksocket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。
  1. ET(edge-triggered)是高速工作方式,只支持no-blocksocket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了。
// epoll的常规用法
 for(;;)
 {  
           nfds = epoll_wait(kdpfd, events, maxevents, -1);  
           for(n = 0; n < nfds; ++n) 
          {  
               if(events[n].data.fd == listenfd) //有新的连接
               {  
                   client = accept(listener, (struct sockaddr *) &local,  
                                   &addrlen);   //accept这个连接
                   if(client < 0)
                  {  
                       perror("accept");  
                       continue;  
                   }  
                   setnonblocking(client);  
                   ev.events = EPOLLIN | EPOLLET;  
                   ev.data.fd = client;  
                   //将新的fd添加到epoll的监听队列中
                   if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) 
                  {  
                       fprintf(stderr, "epoll set insertion error: fd=%d\n",  
                               client);  
                       return -1;  
                   }  
               }  
               else if( events[i].events&EPOLLIN ) //接收到数据,读socket
               {
                    n = read(sockfd, line, MAXLINE)) < 0    //读
                    ev.data.ptr = md;     //md为自定义类型,添加数据
                    ev.events=EPOLLOUT|EPOLLET;
                    epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓
                }
                else if(events[i].events&EPOLLOUT) //有数据待发送,写socket
                {
                    struct myepoll_data* md = (myepoll_data*)events[i].data.ptr;    //取数据
                    sockfd = md->fd;
                    send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //发送数据  
                    ev.data.fd=sockfd;
                    ev.events=EPOLLIN|EPOLLET;
                    epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据
                }
                else  
                {
                    // 其他处理; 
                }
           }  
}  
// 案例:
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
using namespace std;
#define MAXLINE 5
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5000
#define INFTIM 1000

void set_non_blocking(int sock) // 非阻塞
{
    int opts;
    opts = fcntl(sock, F_GETFL);
    if (opts < 0)
    {
        perror("fcntl(sock, GETFL)");
        exit(1);
    }
    opts = opts|O_NONBLOCK;
    if (fcntl(sock, F_SETFL, opts) < 0)
    {
        perror("fcntl(sock, SETFL, opts)");
        exit(1);
    }
}

int main(int argc, char* argv[])
{
    int i, maxi, listenfd, connfd, sockfd, epfd, nfds, portnumber;
    ssize_t n;
    char line[MAXLINE];
    socklen_t clilen;

    if ( 2 == argc )
    {
        if((portnumber = atoi(argv[1])) < 0 )
        {
            fprintf(stderr, "Usage:%s portnumber/a/n", argv[0]);
            return 1;
        }
    }
    else
    {
        fprintf(stderr, "Usage:%s portnumber/a/n", argv[0]);
        return 1;
    }

    //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
    struct epoll_event ev, events[20];
    //生成用于处理accept的epoll专用的文件描述符
    epfd=epoll_create(256);
    struct sockaddr_in clientaddr;
    struct sockaddr_in serveraddr;
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    //把socket设置为非阻塞方式
    //setnonblocking(listenfd);
    //设置与要处理的事件相关的文件描述符
    ev.data.fd = listenfd;
    //设置要处理的事件类型
    ev.events = EPOLLIN|EPOLLET;
    //ev.events = EPOLLIN;
    //注册epoll事件
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
    bzero(&serveraddr, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    char *local_addr = "127.0.0.1";
    inet_aton(local_addr, &(serveraddr.sin_addr)); //htons(portnumber);
    serveraddr.sin_port = htons(portnumber);
    bind(listenfd, (sockaddr *)&serveraddr, sizeof(serveraddr));
    listen(listenfd, LISTENQ);
    maxi = 0;

    for ( ; ; ) 
    {
        //等待epoll事件的发生
        nfds = epoll_wait(epfd,events,20,500);
        //处理所发生的所有事件
        for(i=0; i<nfds; ++i)
        {
            if (events[i].data.fd == listenfd)//如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接。
            {
                connfd = accept(listenfd, (sockaddr *)&clientaddr, &clilen);
                if(connfd<0){
                    perror("connfd<0");
                    exit(1);
                }
                //setnonblocking(connfd);
                char *str = inet_ntoa(clientaddr.sin_addr);
                cout << "accapt a connection from " << str << endl;
                //设置用于读操作的文件描述符
                ev.data.fd=connfd;
                //设置用于注测的读操作事件
                ev.events=EPOLLIN|EPOLLET;
                //ev.events=EPOLLIN;
                //注册ev
                epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
            }
            //如果是已经连接的用户,并且收到数据,那么进行读入。
            else if (events[i].events&EPOLLIN) 
            {
                cout << "EPOLLIN" << endl;
                if ((sockfd = events[i].data.fd) < 0)
                    continue;
                if ( (n = read(sockfd, line, MAXLINE)) < 0) 
                {
                    if (errno == ECONNRESET) 
                    {
                        close(sockfd);
                        events[i].data.fd = -1;
                    } 
                    else
                    {
                        std::cout << "readline error" << std::endl;
                    }
                } 
                else if (n == 0) 
                {
                    close(sockfd);
                    events[i].data.fd = -1;
                }
                line[n] = '/0';
                std::cout << "read " << line << std::endl;
                //设置用于写操作的文件描述符
                ev.data.fd = sockfd;
                //设置用于注测的写操作事件
                ev.events = EPOLLOUT|EPOLLET;
                //修改sockfd上要处理的事件为EPOLLOUT
                //epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd,&ev);
            }
            else if (events[i].events&EPOLLOUT)  //如果有数据发送
            {
                sockfd = events[i].data.fd;
                write(sockfd, line, n);
                //设置用于读操作的文件描述符
                ev.data.fd=sockfd;
                //设置用于注测的读操作事件
                ev.events=EPOLLIN|EPOLLET;
                //修改sockfd上要处理的事件为EPOLIN
                epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
            }
        }
    }
    return 0;
}
上一篇下一篇

猜你喜欢

热点阅读