程序员Android开发Android开发

Linux之I/O 多路复用之Select、Poll、EPoll

2021-01-28  本文已影响0人  巴黎没有摩天轮Li

前言

我们知道Android消息机制是通过Handler、Looper与MessageQueue建立的整个体系,Looper.loop()方法是开启了一个死循环,不停地从MessageQueue中取消息,但是为什么死循环不会浪费CPU资源呢?这就要开启我们的EPoll了。

Message next() {
    for (;;) {
        // 调用了native pollonce
        nativePollOnce(ptr, nextPollTimeoutMillis);
    }
}

前提铺垫

网络数据 I/O

计算机是如何接收网络数据的呢?我们知道计算机网卡是用来进行无线通讯的,网卡首先接收到的数据,通过网卡接口传输到硬件电路中,再将数据写入到内存中某个地址。其中涉及到DMA(Direct Memory Access)传输以及IO通路选择。

中断信号

我们在学习计组的时候,一定听说过CPU中断信号。也就是说,若硬件产生的信号,需要让CPU进行响应,换句话说,硬件产生的信号一般为中断信号,因为该信号一旦不作出相应,就会将数据丢失掉。比如,当计算机收到了断电信号时,CPU会立即去通知保存数据的程序,防止数据丢失,电容可以保存一些电量用于提供CPU运行一小段时间。那么计算机如何知道已经接受到了网卡数据呢?网卡写入内存以后,网卡会向CPU发出一个中断信号,操作系统就能够知道有新数据到来,通过网卡中断程序去处理数据。

进程阻塞为什么不会消耗CPU资源?

阻塞的概念我们已经理解了,我们先看一段基础的网络编程代码。

int socket = createSocket()
bindSocket();
listenSocket();
int c = accept(socket,.....);
// 死循环之类的从socket inputStream 阻塞线程
recv(c,...)

先创建Socket,绑定,监听 和 接受 在通过死循环接收数据。recv 是一个阻塞方法,当程序执行recv就一直等到收到数据再往下执行。
所以操作系统为了执行多任务,也就类似进程调度,会分为运行与等待状态等等(线程同样如此)。运行就是所谓的获取到CPU使用权,阻塞就是等待状态,也就失去CPU使用权。CPU就是不停地进行各个进程间的使用切换,所以由于速度处理的很快,看上去是同时执行多个任务。
如何知道哪些任务要进行CPU处理,哪些需要让出CPU呢?

Epoll在Android中的应用

Code from Android Data Source 5.1 -- Looper.cpp
Epoll 初始化
Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    int wakeFds[2];
    // Android 采用pipe通道进行数据的通信 result = 0成功,失败返回-1
    int result = pipe(wakeFds);
    // 管道读端 fd
    mWakeReadPipeFd = wakeFds[0];
    // 管道写端 fd
    mWakeWritePipeFd = wakeFds[1];
    result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
    result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
    mIdling = false;
    // Allocate the epoll instance and register the wake pipe.
    // 创建Epoll句柄,返回一个文件描述符 方法参数为EPOLL_SIZE_HINT = 8的内核监听容量
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeReadPipeFd;
    // epoll 事件注册函数 第一个传入mEpollFd文件描述符,第二个参数代表动作,EPOLL_CTL_ADD代表注册新的fd到epfd中
    // 第三个参数 代表监听哪个文件描述符 也就是说在Android中epoll是通过监听读端fd,来告知MessageQueue来数据了
    // 第四个参数 用于处理监听什么事件
    result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
}

其中epoll_ctl()方法中的第二个参数代表执行操作宏定义,分为三种。

接下来,MessageQueue next()方法实则是调用了pollOnce方法,我们来看下。

android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
         //.... 省略了大量代码
        // 最终调用了 pollInner 方法并传入了超时时长
        result = pollInner(timeoutMillis);
    }
}
int Looper::pollInner(int timeoutMillis) {
    // Poll.
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

    // We are about to idle.
    // 是否是空转状态 也就是在调用epoll_wait时,加入一个标记,表明是否阻塞
    mIdling = true;

    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    // 执行epoll_wait方法进行阻塞,等待fd socket 返回数据
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // No longer idling.
   // 取消空转状态,表明目前已经收到了数据
    mIdling = false;

    // Acquire lock.
    // 加锁用于后续的fd遍历
    mLock.lock();

// 伪代码... 
// epoll_wait 之后一般是一个循环
for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        // 若接收到的数据来源是正在监听的pipe通道所开辟的fd,则说明当前Looper绑定的Thread中的MessageQueue接收到了数据,
        // 可以继续走Java代码中的For循环之后的代码了,继续获取CPU执行权。
        if (fd == mWakeReadPipeFd) {
            if (epollEvents & EPOLLIN) {
                //  唤醒 也就是说开始读管道中的数据
                awoken();
            } else {
                 //... 省略
            }
        } else {
           //... 省略
        }
    }
}


void Looper::awoken() {
    char buffer[16];
    ssize_t nRead;
    do {
        // 读管道中的数据
        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}

以上就是Epoll在android中的应用。具体细节,之后还会另写一篇文章详细说明。

要解决的问题

一个应用进程对应一个Socket文件,操作系统中应用无上限,因此代表有多个Socket文件接收到信息后,要知道到底发送给那个应用进程?所有的进程关心的Socket都由一个线程进行处理,所以也就是构成I/O多路复用问题。

数据结构

当然这些Socket集合,当FD=XXX发生了事件变化,那么是那些进程需要关心的呢?
这里就涉及到Socket集合的数据结构了。
一个Socket文件,能够由多个进程使用,而一个进程也可以使用多个Socket文件,因此Socket与进程是多对多的关系。另外,一个Socket也会有不同的事件类型。所以操作系统将很难判断出将哪样的事件给那个进程。

链表/数组线性结构

Select与Poll都是采用了线性结构。Select允许用户传入三个集合,这三个集合分别对应了Socket的三种不同的状态的集合,读集合、写集合、异常集合。

Select

fd_set read_fd_set, write_fd_set, error_fd_set;
while(true) {
  // 进程读关心、写关心、异常时关心的Socket集合。
  select(..., &read_fd_set, &write_fd_set, &error_fd_set); 
  for (i = 0; i < FD_SETSIZE; ++i)
        if (FD_ISSET (i, &read_fd_set)){
          // Socket可以读取
        } else if(FD_ISSET(i, &write_fd_set)) {
          // Socket可以写入
        } else if(FD_ISSET(i, &error_fd_set)) {
          // Socket发生错误
        } 
}

因此呢,每次进行Select操作的时候呢,会阻塞当前的线程,在阻塞期间所有的操作系统产生的每个信息,都会通过遍历的方式去查找这个消息是否存在于这三个集合之中。其中 FD_SETSIZE代表最大并发数量,一般是1024个并发。

Select缺点

Poll

while(true) {
  events = poll(fds, ...)
  for(evt in events) {
    fd = evt.fd;
    type = evt.revents;
    if(type & POLLIN ) {
       // 有数据需要读,读取fd中的数据
    } else if(type & POLLOUT) {
       // 可以写入数据
    } 
    else ...
  }
}

Poll 在性能上呢和Select差别不大,还是需要进行遍历,只是说在编程模型上进行了优化处理。

总结Select/Poll

都是阻塞式IO,都需要进行Socket线性集合的遍历找到当前进程所需要的Socket。

EPoll

为了解决上述的问题,epoll 通过更好的方案实现了从操作系统订阅消息。epoll 将进程关注的文件描述符存入一棵二叉搜索树,通常是红黑树的实现。在这棵红黑树当中,Key 是 Socket 的编号,值是这个 Socket 关注的消息。因此,当内核发生了一个事件:比如 Socket 编号 1000 可以读取。这个时候,可以马上从红黑树中找到进程是否关注这个事件。另外当有关注的事件发生时,epoll 会先放到一个队列当中。当用户调用epoll_wait时候,就会从队列中返回一个消息。epoll 函数本身是一个构造函数,只用来创建红黑树和队列结构。epoll_wait调用后,如果队列中没有消息,也可以马上返回。因此epoll是一个非阻塞模型。
总结一下,select/poll 是阻塞模型,epoll 是非阻塞模型。当然,并不是说非阻塞模型性能就更好。在多数情况下,epoll 性能更好是因为内部有红黑树的实现。

优点

上一篇 下一篇

猜你喜欢

热点阅读