运维相关

Linux中的IO复用机制——Epoll

2019-10-30  本文已影响0人  toMyLord

在Linux中IO复用机制主要目的是为了实现在单进程环境下,同时监控多个描述符(文件描述符,套接字描述符等)的目的。一旦监控的描述符就绪,就会通知相应进程并解除该进程的阻塞状态,使进程能够对就绪描述符进行处理。

一、什么是IO复用

UNIX有五大IO模型:

1、异步IO

其中前四种IO统称为同步IO。那么同步IO和异步IO的区别是什么呢?
在同步文件IO中,线程启动一个IO操作然后就立即进入等待状态,直到IO操作完成后才醒来继续执行。而异步文件IO方式中,线程发送一个IO请求到内核,然后继续处理其他的事情,内核完成IO请求后,将会通知线程IO操作完成了。
如果某进程有大量IO需要处理,则选择异步IO方式可以显著提高效率。因为选择异步IO之后,进程不必等待IO完成,即可继续处理其他的事情,直到IO事件结束,操作系统会通知该进程IO已经结束。

2、阻塞IO

阻塞IO.png
从上图可以直观的看出:进程在使用recvfrom()系统调用后,进程由用户态转为核心态,同时内核检测recvfrom()函数对应fd的数据集中没有新数据传入,并开始等待。等待过程中整个进程是阻塞的,不占用任何CPU资源。直到内核发现有新的数据写入对应数据集,此时内核就唤醒处于就绪队列的该进程。进程唤醒后仍处于核心态,将数据集中的数据拷贝并通过recvfrom()函数返回后,此进程才由核心态转化为用户态,并继续之后的工作。
由上述过程可知,如果使用阻塞IO,那么在等待IO就绪的过程中,整个进程是无法进行任何操作且不占用CPU资源的。直到IO结束,内核唤醒该进程,该进程才能继续执行。

3、非阻塞IO

非阻塞IO.png
从上图可知:在非阻塞IO的情况下,用户进程调用recvfrom()系统调用后,如果IO没有就绪,则不会等待IO就绪直接返回。因此如果想要等到IO结束,就需要不断的向内核询问IO是否完成。
上述过程可以看出,非阻塞IO的特点就是需要不断向服务器询问IO是否就绪。

4、IO多路复用

IO多路复用.png
上图用select机制作为例子,如果一个进程调用了select,那么整个进程都会被阻塞,直到select所监听的所有fd中,出现完成IO的情况,进程就会解除阻塞。
一个进程通过调用select函数监听多个fd的IO情况,就是一个典型的IO多路复用的例子。

二、Select与Poll机制

1、Select机制

我们先分析一下select函数

/**
 * 该函数是select函数的声明
 * @parameter: maxfdp1 指定待测试的文件描述字个数,它的值是待测试的最大描述字加1。
 * @parameter: *readset/*writeset/*exceptset 均为fd_set类型,可以将fd_set理解为一个集合,这个集合中存放的是文件描述符。中间的三个参数指定我们要让内核测试读、写和异常条件的文件描述符集合。如果对某一个的条件不感兴趣,就可以把它设为空指针。
 * @parameter: *timeout timeout告知内核等待所指定文件描述符集合中的任何一个就绪可花多少时间。其timeval结构用于指定这段时间的秒数和微秒数。
 * @return: int 若有就绪描述符返回其数目,若超时则为0,若出错则为-1
 */
int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout);

//以下是select的具体使用方法:
FD_ZERO(&fds);              //每次循环都必须清空FD_Set
FD_SET(sock_fd, &fds);      //将sock_fd加入集合

//此select设定为对整个集合内的fd写监听,监听的最长时间为timeout
int n = select(maxfd, NULL, &fds, NULL, &timeout);
switch(n) {
    case -1:
        fprintf(stderr, "Select error:%s \n\a", strerror(errno));
        exit(1);
    case 0:
        printf("select time out, lost packet!\n");
        ......
        break;
    default:
        //判断sock_fd是否还在集合中
        if(FD_ISSET(sock_fd, &fds)) {
            //还在集合中则说明对该fd监听到了写操作
            ......
        } else{
            //没有对该fd监听到写操作
            ......
        }
}

selectFD_SET是通过一组宏函数进行实现的,支持的最大监听数不超过1024个。select函数底层是通过轮询机制实现的,因此对CPU占用很高。同时,每次调用select都需要把fd_set集合从用户态拷贝到内核态,因此该函数的效率很低。

2、Poll机制

poll的机制与select类似,与select在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理。poll机制相对于select机制,解决了select的最大文件描述符支持为1024的问题(支持任意大小的描述符集),并没有解决性能开销问题。
下面是pll的函数原型:

//poll改变了文件描述符集合的描述方式,使用了pollfd结构而不是select的fd_set结构,
//使得poll支持的文件描述符集合限制远大于select的1024
typedef struct pollfd {
        int fd;                         // 需要被检测或选择的文件描述符
        short events;                   // 对文件描述符fd上感兴趣的事件
        short revents;                  // 文件描述符fd上当前实际发生的事件
} pollfd_t;

/**
 * 实现IO多路复用的poll函数
 * @parameter: fds fds是一个struct pollfd类型的数组,用于存放需要检测其状态的socket描述符,
 * 并且调用poll函数之后fds数组不会被清空;一个pollfd结构体表示一个被监视的文件描述符,
 * 通过传递fds指示 poll() 监视多个文件描述符。其中,结构体的events域是监视该文件描述符的事件掩码,
 * 由用户来设置这个域,结构体的revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域。
 * @parameter: nfds 记录数组fds中描述符的总数量
 * @parameter: timeout 最长阻塞时间
 */
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

三、Epoll机制

epoll在Linux2.6内核正式提出,是基于事件驱动的IO方式,相对于select来说,epoll没有描述符个数限制,使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
Linux中提供的epoll相关函数如下:

/**
 * epoll_create 函数创建一个epoll句柄。
 * @parameter: size 参数size表明内核要监听的描述符数量。
 * @return: 调用成功时返回一个epoll句柄描述符,失败时返回-1。
 */
int epoll_create(int size);


//epoll_event 结构体定义如下:
struct epoll_event {
    __uint32_t events;  /* Epoll events */
    epoll_data_t data;  /* User data variable */
};
typedef union epoll_data {
    void *ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
} epoll_data_t;

/**
 * epoll_ctl 函数注册要监听的事件类型。
 * @parameter: epfd 表示epoll句柄。由上述 epoll_create() 函数申请得到。
 * @parameter: op 表示fd操作类型,有3种:EPOLL_CTL_ADD 注册新的fd到epfd中;EPOLL_CTL_MOD
 * 修改已注册的fd的监听事件;EPOLL_CTL_DEL 从epfd中删除一个fd。
 * @parameter: fd 是要监听的描述符。
 * @parameter: event 表示要监听的事件。
 * @return: 调用成功时返回一个epoll句柄描述符,失败时返回-1。
 */
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

/**
 * epoll_wait 函数等待事件的就绪。
 * @parameter: epfd 表示epoll句柄。由上述 epoll_create() 函数申请得到。
 * @parameter: events 表示从内核得到的就绪事件集合。
 * @parameter: maxevents 告诉内核events的大小。
 * @parameter: timeout 表示等待的超时事件。
 * @return: 成功时返回就绪的事件数目,调用失败时返回 -1,等待超时返回 0。
 */
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
水平触发(LT):默认工作模式,即当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序可以不立即处理该事件;下次调用epoll_wait时,会再次通知此事件
边缘触发(ET): 当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次通知此事件。(直到你做了某些操作导致该描述符变成未就绪状态了,也就是说边缘触发只在状态由未就绪变为就绪时只通知一次)。
LT和ET原本应该是用于脉冲信号的,可能用它来解释更加形象。Level和Edge指的就是触发点,Level为只要处于水平,那么就一直触发,而Edge则为上升沿和下降沿的时候触发。比如:0->1 就是Edge,1->1 就是Level。
ET模式很大程度上减少了epoll事件的触发次数,因此效率比LT模式下高。
selectpollepoll三者机制的区别如下图所示:

比较.png
上述对比图存在错误:epoll的底层实现应该是红黑树!
epollLinux目前大规模网络并发程序开发的首选模型。在绝大多数情况下性能远超selectpoll。目前流行的高性能web服务器Nginx正式依赖于epoll提供的高效网络套接字轮询服务。但是,在并发连接不高的情况下,多线程+阻塞I/O方式可能性能更好。
以下是一个简单的基于epollC/S网络通信示例:
服务器代码(只看Server类的定义和main函数即可):
//
// Created by mylord on 2019/10/26.
//

#ifndef FILETRANSER_SERVER_H
#define FILETRANSER_SERVER_H

#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string>
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <vector>
#include <string.h>
#include <algorithm>
#include <sys/epoll.h>

#define MAX_BUFF_SIZE 1024

using namespace std;

typedef struct {
    sockaddr_in client_sock;
    int client_fd;
    string client_ip;
}ClientInfo;

class Server {
private:
    vector<ClientInfo> client_info;

    int listen_fd, listen_port, listen_size;
    sockaddr_in server_addr;
public:
    Server(int port = 5555, int size = 10);

    void Init();

    void Listen();

    int AcceptConnection();

    int Write(int sock_fd, char buff[]);

    int Read(int sock_fd, char buff[]);

    int Close(int client_fd);

    int getListenFd() const;
};


#endif //FILETRANSER_SERVER_H


int main() {
    Server server(5555, 10);
    char buffer[1204];
    struct epoll_event ev, events[10];   //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
    int epoll_fd = epoll_create(10);    //创建一个epoll的句柄,并告诉内核这个监听的数目为10  
    int listen_fd = server.getListenFd();
    int nfds = 0;                       //记录需要处理的事件数

    ev.data.fd = listen_fd;
    ev.events = EPOLLIN | EPOLLET;      //linsten_fd可读,边缘触发
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev);

    server.Listen();
    cout << "server is listening......" << endl;

    while(true) {
        nfds = epoll_wait(epoll_fd, events, 10, -1);

        for(int i = 0; i < nfds; i++) {
            if(events[i].data.fd == listen_fd) {
                int client_fd = server.AcceptConnection();

                ev.data.fd = client_fd;
                ev.events = EPOLLIN | EPOLLET;
                epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev);
            } else if(events[i].events & EPOLLIN) {  //epoll池中的时间被边缘触发且是因为收到数据待读入
                int nbytes = 0;
                if ((nbytes = server.Read(events[i].data.fd, buffer)) <= 0) {
                    cout << "fd-" << events[i].data.fd << " receive data error!" << endl;
                    server.Close(events[i].data.fd);
                    continue;
                }
                cout << "EPOLL receive from " << events[i].data.fd << ":" << buffer << endl;
                strcpy(buffer, "received from epoll server!");
                server.Write(events[i].data.fd, buffer);
            }
        }
    }
}


Server::Server(int port, int size): listen_port(port), listen_size(10) {
    this->Init();
}

void Server::Init() {
    if((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
    {   //服务器端开始建立socket描述符
        fprintf(stderr, "Socket error:%s \n\a", strerror(errno));
        exit(1);
    }
    //服务器端填充tcp sockaddr结构
    bzero(&server_addr, sizeof(struct sockaddr_in));    //先将套接字地址数据结构清零
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htons(INADDR_ANY);
    server_addr.sin_port = htons(listen_port);

    if(bind(listen_fd, (struct sockaddr *)(&server_addr), sizeof(struct sockaddr)) == -1)
    {
        fprintf(stderr, "Bind error:%s\n\a", strerror(errno));
        exit(1);
    }
}

void Server::Listen() {
    if(listen(listen_fd, listen_size) == -1)
    {   //端口绑定成功,监听socketfd描述符,同时处理的最大连接请求数为10
        fprintf(stderr, "Listen error:%s\n\a", strerror(errno));
        exit(1);
    }
}

int Server::AcceptConnection() {
    int sockaddr_size = sizeof(struct sockaddr_in);
    ClientInfo temp;

    if ((temp.client_fd = accept(listen_fd, (struct sockaddr *)(&temp.client_sock),
                                     (socklen_t *) &sockaddr_size)) == -1)
    {   //调用accept接受一个连接请求
        fprintf(stderr, "Accept error:%s\n\a", strerror(errno));
        exit(1);
    }

    temp.client_ip.assign(inet_ntoa(temp.client_sock.sin_addr));

    cout << "Connected from "<< temp.client_ip << "\tclient fd is " << temp.client_fd << endl;

    client_info.push_back(temp);

    return temp.client_fd;
}

int Server::Write(int sock_fd, char buff[]) {
    int nbytes = 0;
    if((nbytes = write(sock_fd, buff, strlen(buff))) == -1)
        fprintf(stderr, "Write Error:%s\n", strerror(errno));
    return nbytes;
}

int Server::Read(int sock_fd, char buff[]) {
    int nbytes = 0;
    if ((nbytes = read(sock_fd, buff, MAX_BUFF_SIZE)) == -1)
        fprintf(stderr, "Read Error:%s\n", strerror(errno));

    buff[nbytes] = '\0';
    return nbytes;
}

int Server::Close(int client_fd) {
    //先查找client_fd的迭代器client_account_it
    auto client_it = find_if(client_info.begin(), client_info.end(),
            [client_fd](const ClientInfo &cli){ return cli.client_fd == client_fd; });

    client_info.erase(client_it);

    close(client_fd);

    printf("fd-%d exit!\n", client_fd);
}

int Server::getListenFd() const{
    return listen_fd;
}

客户端代码(只看Client类的定义和main函数即可):

//
// Created by mylord on 2019/10/26.
//

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include <iostream>
#include <fstream>

#define MAX_BUFF_SIZE 1024

class Client
{
private:
    int sock_fd;
    struct sockaddr_in client_addr;
    char server_ip[16];
    int server_port;

public:
    Client(char * server_ip, int server_port);

    int Connect();

    int Write(char * buff);

    int Read(char * buff);

    int Close();
};



Client::Client(char * server_ip, int server_port)
{
    strncpy(this->server_ip, server_ip, 16);
    this->server_port = server_port;

    struct hostent * host;
    if((host = gethostbyname(this->server_ip)) == NULL)
    {
        fprintf(stderr, "The host name %s is illegal.\n", server_ip);
        exit(1);
    }

    if((this->sock_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
    {
        fprintf(stderr, "Init socket fd error!\n");
        exit(1);
    }

    bzero(&this->client_addr, sizeof(this->client_addr));
    this->client_addr.sin_family = AF_INET;
    this->client_addr.sin_port = htons(this->server_port);
    this->client_addr.sin_addr = *((struct in_addr *)host->h_addr);

}


int Client::Connect()
{
    int nbytes = connect(this->sock_fd, (struct sockaddr *)(&this->client_addr),
                         sizeof(struct sockaddr));

    return nbytes;
}


int Client::Write(char * buff)
{
    int nbytes = write(this->sock_fd, buff, strlen(buff));

    return nbytes;
}

int Client::Read(char * buff)
{
    int nbytes = read(this->sock_fd, buff, MAX_BUFF_SIZE);
    buff[nbytes] = '\0';

    return nbytes;
}

int Client::Close()
{
    close(this->sock_fd);
    printf("fd-%d exit!\n", this->sock_fd);
}

int main(int argc, char *argv[]) {
    Client client(argv[1], 5555);
    char buff[1024];
    char send_msg[] = "hello epoll!";

    client.Connect();

    while(true) {
        client.Write(send_msg);
        client.Read(buff);
        std::cout << buff << std::endl;
        sleep(1);
    }
}

程序运行结果:


epoll服务器.png 10个同时运行的客户端.png

参考文献:
https://www.cnblogs.com/zhangmingda/p/9396994.html
https://www.jianshu.com/p/397449cadc9a


上一篇下一篇

猜你喜欢

热点阅读