Android通信方式篇(三)-消息机制(Native层)

2019-01-03  本文已影响17人  Stan_Z

在前面介绍的Java层中,我们看到了MessageQueue有若干native方法,想必肯定与native层有关,但其实Native层本身就有一套完整的消息机制。另外,在整个消息机制中,MessageQueue它是连接Java层和Native层的纽带,两层都可以向其消息队列中添加消息。

那么还是接着MessageQueue初始化开始分析,之前了解到,MessageQueue初始化会执行nativeInit():

//android_os_MessageQueue.cpp
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }
    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jlong>(nativeMessageQueue);
}

创建了NativeMessageQueue

//android_os_MessageQueue.cpp
NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}

然后在NativeMessageQueue初始化过程中,又创建了native Looper

//Looper.cpp
Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
                        strerror(errno));
    AutoMutex _l(mLock);
    rebuildEpollLocked();
}

这里创建了一个eventfd,代码来自7.0,这部分和5.0 pipe管道的mWakeReadPipeFd和mWakeWritePipeFd稍微有点不一样,前者是等待/响应,后者是读取/写入。

接着重点看下rebuildEpollLocked()方法:

void Looper::rebuildEpollLocked() {
...
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event));
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd;
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
      //最后三行往mEpollFd代理中、注册、一个叫mWakeEventFd流、的数据流入事件(EPOLLIN)。
   ...
}

通过epoll_create创建了一个epoll专用的文件描述符,EPOLL_SIZE_HINT表示mEpollFd上能监控的最大文件描述符数。最后调用epoll_ctl监控mWakeEventFd文件描述符的Epoll事件,即当mWakeEventFd中有内容可读时,就唤醒当前正在等待的线程.。这里简单说就是Android native层用了Epoll模型。那什么是Epoll模型呢?

首先解释下为什么要引入这个Epoll模型,简单讲,它就是实现queue.next 在无消息时候的阻塞。Lopper肯定不会做啥工作都没干的死循环。这也就是为什么Android中主线程不会因为Looper.loop()里的死循环卡死。

Epoll简单介绍

传统的阻塞型I/O(一边写,一边读),一个线程只能处理一个一个IO流。如果一个线程想要处理多个流,可以采用了非阻塞、轮询I/O方式,但是传统的非阻塞处理多个流的时候,会遍历所有流,但是如果所有流都没数据,就会白白浪费CPU。于是出现了select、poll、epoll三种常见的代理方式。

select,poll,epoll都是IO多路复用的机制。I/O多路复用就通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的。

接着我们来分析下Native层消息的发送和获取。其实native层更多的是负责消息的调度,比如说何时阻塞、何时唤醒线程,避免CPU浪费。

发送消息

发送消息在native比较简单,handler发送消息后,会到MessageQueue的enqueueMessage,此时在线程阻塞的情况下,会调用nativeWake来唤起线程。

//android_os_MessageQueue.cpp
void NativeMessageQueue::wake() {
   mLooper->wake();
}

对应wake方法:

//Looper.cpp
void Looper::wake() {
   ...
   uint64_t inc = 1;
   ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
   if (nWrite != sizeof(uint64_t)) {
 ...
   }
}

这里TEMP_FAILURE_RETRY是一个宏定义,顾名思义,就是不断地尝试往mWakeEventFd流里面写一个无用数据直到成功,以此来唤醒queue.next。

提取消息

也就是Java层Looper 死循环中的queue.next

//MessageQueue.java
Message next() {
       ...
       for (;;) {
          ...
           nativePollOnce(ptr, nextPollTimeoutMillis);
          ...
       }
   }

对于Native:

//android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

继续往下看

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
   ...
   mLooper->pollOnce(timeoutMillis);
   ...
}
//Looper.cpp
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
        ...
        result = pollInner(timeoutMillis);
    }
}

最后到了:

int Looper::pollInner(int timeoutMillis) {
   ...
   int result = POLL_WAKE;
   mPolling = true;
   struct epoll_event eventItems[EPOLL_MAX_EVENTS];
   int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
   mPolling = false;
   mLock.lock();
   for (int i = 0; i < eventCount; i++) {
       int fd = eventItems[i].data.fd;
       uint32_t epollEvents = eventItems[i].events;
       if (fd == mWakeEventFd) {
           if (epollEvents & EPOLLIN) {
               awoken();
           } else {
               ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
           }
       }
       ...
   }
   ...
   mLock.unlock();
   ...
   return result;
}

在这里,我们注意到epoll_wait方法,这里会得到一段时间内(结合消息计算得来的)收到的事件个数,这里对于queue来说就是空闲(阻塞)状态。过了这个时间后,看看事件数,如果为0,则意味着超时。否则,遍历所有的事件,看看有没有mWakeEventFd,且是EPOLLIN事件的,有的话就真正唤醒线程、解除空闲状态。

最后是两张关系图:

1.两层类对应关系图:

  1. 两层功能模型:

参考:
https://www.cnblogs.com/qcloud1001/p/7993756.html
http://gityuan.com/2015/12/27/handler-message-native/

上一篇下一篇

猜你喜欢

热点阅读