Handler消息机制

2020-09-06  本文已影响0人  hewenyu
// 涉及的Java层代码
frameworks\base\core\java\android\os\
        - Handler.java
        - Looper.java
        - Message.java
        - MessageQueue.java
        
// 涉及的 Native 层代码
frameworks\base\core\jni\android_os_MessageQueue.cpp
system\core\libutils\Looper.cpp

1.消息队列的创建

1.1 Looper 的创建

frameworks\base\core\java\android\os\Looper.java

public final class Looper {
    ...
    // 主线程的 Looper 对象
    private static Looper sMainLooper;  // guarded by Looper.class
    // 消息队列
    final MessageQueue mQueue;
    
    public static void prepare() {
        prepare(true);
    }

    // quitAllowed 表示消息队列是否可以退出
    private static void prepare(boolean quitAllowed) {
        // sThreadLocal 可以理解为线程局部变量,用来保存该线程关联的 Looper 对象
        // 一个线程多次调用 prepare() 方法会抛异常
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        // 创建 Looper 对象
        sThreadLocal.set(new Looper(quitAllowed));
    }

    // prepareMainLooper 函数创建的是主线程的 Looper 对象,在 ActivityThread 的 main 函数中调用
    public static void prepareMainLooper() {
        // 调用 prepare函数,入参是 false,表示主线程的消息队列不允许退出
        prepare(false);
        synchronized (Looper.class) {
            if (sMainLooper != null) {
                throw new IllegalStateException("The main Looper has already been prepared.");
            }
            // 保存主线程的 Looper 对象,方便其它线程获取
            sMainLooper = myLooper();
        }
    }

    // 构造函数私有化,不能被外部创建
    private Looper(boolean quitAllowed) {
        // 创建一个消息队列
        mQueue = new MessageQueue(quitAllowed);
        mThread = Thread.currentThread();
    }

    ...
}

可以看到 Looper 类主要做了如下事情

1.2 MessageQueue 消息队列的创建

frameworks\base\core\java\android\os\MessageQueue.java

public final class MessageQueue {
    ...
 
    private native static long nativeInit();
 
    MessageQueue(boolean quitAllowed) {
        // 记录消息队列是否允许退出
        mQuitAllowed = quitAllowed;
        // 调用 native 层的 init 函数并返回一个指针地址
        mPtr = nativeInit();
    }
 
    ...
}

Java 层中的消息队列 MessageQueue 在初始化的时候比较简,主要记录了两个字段,一个是消息队列是否允许退出,第二个则是调用了native层的 init 函数获取了一个指针地址,该函数位于 android_os_MessageQueue.cpp 文件内

frameworks\base\core\jni\android_os_MessageQueue.cpp


static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    // 创建Navice层的 NativeMessageQueue 对象
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }

    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jlong>(nativeMessageQueue);
}

NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    // 获取 Native 层的 Looper 对象
    // 通过 pthread_getspecific() 获取 Native 层的 Looper 对象,类似 Java 层的 mThreadLocal.get()
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {  
        // 创建Native层的 Looper 对象
        mLooper = new Looper(false);
        // 通过 pthread_setspecific() 保存 Native 层的 Looper 对象,类似 Java 层的 mThreadLocal.set()
        Looper::setForThread(mLooper);
    }
}

Native 层的 init 函数的主要功能为

system\core\libutils\Looper.cpp


// Native 层 Looper 对象构造函数
Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    // 创建一个用来通知事件的文件描述符 mWakeEventFd
    // EFD_NONBLOCK 表示文件执行 read/write 操作时,不会阻塞
    mWakeEventFd = eventfd(0, EFD_NONBLOCK);

    AutoMutex _l(mLock);
    // 重建 epoll 事件
    rebuildEpollLocked();
}

void Looper::rebuildEpollLocked() {
    // 关闭原有的 epoll 实例
    if (mEpollFd >= 0) {
        close(mEpollFd);
    }

    // 创建新的 epoll 实例并注册管道
    // EPOLL_SIZE_HINT = 8 表示 mEpollFd 最多可以关注 8 个fd
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    // 设置监听的事件
    struct epoll_event eventItem;
    // 将未使用的数据区域置0
    memset(& eventItem, 0, sizeof(epoll_event));
    // 对应的文件描述符上有可读数据事件
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd;
    // 将唤醒事件 mWakeEventFd 注册到 epoll 实例
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);

    for (size_t i = 0; i < mRequests.size(); i++) {
        const Request& request = mRequests.valueAt(i);
        struct epoll_event eventItem;
        request.initEventItem(&eventItem);
        // 将 request 队列的事件添加到 epoll 实例
        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
    }
}

Native 层的 Looper 对象创建时做了如下操作

小结

消息队列的创建过程涉及到 Java 层和 Native 层,可以看到 Java 层的 LooperMessageQueue 对象的创建顺序和 Native 层刚好是相反的

2.消息循环过程

frameworks\base\core\java\android\os\Looper.java

public static void loop() {
        // 获取当前线程的 Looper 对象
        final Looper me = myLooper();
        if (me == null) {   // 执行 loop 函数之前必须先调用 prepare() 函数创建 looper 对象
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        // 获取 looper 对象中的消息队列
        final MessageQueue queue = me.mQueue;

        for (;;) {
            // 获取消息队列中的消息,没有消息处理时阻塞线程
            Message msg = queue.next(); // might block
        
             try {
                // msg.target 即为 Handler,调用 Handler 的 dispatchMessage() 分发消息
                msg.target.dispatchMessage(msg);
               ...
            } finally {
               ...
            }
        
        }
}

消息队列创建完成之后,调用 Looper.loop() 函数启动消息循环执行如下操作

frameworks\base\core\java\android\os\MessageQueue.java


// 消息队列,要处理的消息列表
Message mMessages;

Message next() {
        // 注册到消息队列中的空闲消息处理器 (IdelHandler) 的数量
        int pendingIdleHandlerCount = -1;
        // 消息队列处理下一个消息需要进入睡眠状态等待的时间
        // 0: 即使当前消息队列没有新的消息处理,当前线程也不要进入睡眠等待状态
        // -1: 消息队列没有新的消息需要处理时,当前线程需要进入睡眠状态,直到它被其他线程唤醒
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }
            // 检查当前线程是否有新的消息需要处理
            nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {
                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                // msg.target == null 表示 msg 为同步消息屏障
                if (msg != null && msg.target == null) {
                    // msg 为同步屏障消息,通过do while 循环获取链表里的第一条异步消息
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {  // 消息不为空
                    if (now < msg.when) {
                        // 还未到达消息处理的时间,计算线程需要睡眠的时间
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {// 新消息需要处理,将消息从队列中取出并返回
                        // 标记当前线程已经从睡眠中唤醒
                        mBlocked = false;
                        if (prevMsg != null) {  // msg 为异步消息,更新消息队列
                            prevMsg.next = msg.next;
                        } else {    // msg 为同步消息,更新列表的头部
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        msg.markInUse();
                        return msg; // 返回需要处理的 msg
                    }
                } else {    // 消息队列没有消息,当前线程进入睡眠状态
                    nextPollTimeoutMillis = -1;
                }

                // 用于退出消息
                if (mQuitting) {
                    dispose();
                    return null;
                }

                // 判断是否有空闲消息处理器
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) { // 如果没有空闲消息需要处理,continue 进入睡眠
                    mBlocked = true;
                    continue;
                }

                if (mPendingIdleHandlers == null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }
            // 循环处理空闲消息
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; // release the reference to the handler

                boolean keep = false;
                try {
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }

                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }
            // 空闲消息处理完成,重置为0
            pendingIdleHandlerCount = 0;
    
            nextPollTimeoutMillis = 0;
        }
    }

queue.next() 函数内部同样维护了一个 for 循环来获取需要处理的消息,主要功能为

frameworks\base\core\jni\android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    // prt 即为 NativeMessageQueue 的指针对象
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    // pollObj 对象即为 MessageQueue 对象
    mPollObj = pollObj;
    // 调用 Native 层的 Looper 的 pollOnce 函数
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}

system\core\libutils\Looper.cpp


int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
    ...
        if (result != 0) {  // result != 0 表示有新的消息需要处理,跳出循环
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }
        // result == 0 表示没有消息需要处理
        // 由于位于 for 循环内部,没有需要处理的消息时,pollInner() 会被不断地调用
        result = pollInner(timeoutMillis);
    }
}

int Looper::pollInner(int timeoutMillis) {
    ...
    // Poll.
    int result = POLL_WAKE;

    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    // 监听 mEpollFd 实例中的文件描述符的 IO 读写事件
    // 如果这些文件描述符(fd)都没有发生 IO 读写事件,则当前线程会在 epoll_wait 中进入睡眠等待,睡眠的时间为 timeoutMillis
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    ...
    // 检查是哪一个文件描述符发生了 IO 读写事件
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd) {   // mWakeEventFd 即为 Looper 对象创建的用于监听消息的文件描述符
            if (epollEvents & EPOLLIN) {
            // 如果是 mWakeEventFd 文件描述符 && mWakeEventFd 发生的 IO 读写事件类型为 EPOLLIN
            // 则唤醒当前线程处理消息
                awoken();
            } else {
        ...
        }

        } else {
           ...
        }
    }
    ...

    return result;
}

// 唤醒当前线程,读取文件描述符中的数据
void Looper::awoken() {
    uint64_t counter;
    // 循环读取文件描述符中的数据
    TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}

nativePollOnce() 最终调用的是 Looper(native层) 对象的 pollOnce(),通过 for 循环不断的调用 pollInner() 函数来判断是否有新的消息需要处理

pollInner() 调用 epoll_wait 监听前面创建的的 epoll 实例中的文件描述符的 IO 读写事件

3.消息的发送和处理

3.1 消息发送

Android 系统提供了一个 Handler 类用来向一个线程消息队列发送消息

frameworks\base\core\java\android\os\Handler.java

public class Handler {
    ...

    public final boolean sendMessage(Message msg)
    {
        return sendMessageDelayed(msg, 0);
    }
    
    public final boolean post(Runnable r)
    {
       return  sendMessageDelayed(getPostMessage(r), 0);
    }

    public final boolean sendMessageDelayed(Message msg, long delayMillis)
    {
        if (delayMillis < 0) {
            delayMillis = 0;
        }
        return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
    }

    public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
        MessageQueue queue = mQueue;
        if (queue == null) {
            return false;
        }
        return enqueueMessage(queue, msg, uptimeMillis);
    }
    
    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
        // 将 Handler 设置成 msg 的 target
        msg.target = this;
        // mAsynchronous == true 表示发送的是异步消息
        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        // 将 msg 加入消息队列
        return queue.enqueueMessage(msg, uptimeMillis);
    }
    
    ...
}

Handler 类提供了多个重载函数用于发送各种消息,这些函数最终都会调用 enqueueMessage() 其功能如下

frameworks\base\core\java\android\os\MessageQueue.java

boolean enqueueMessage(Message msg, long when) {

    synchronized (this) {
        if (mQuitting) {    // 如果消息队列正在退出,则回收消息并退出
            msg.recycle();
            return false;
        }

        msg.markInUse();    //  标记 msg 为正在被使用
        msg.when = when;    // 设置消息触发的时间
        // 获取链表的第一条消息
        // mMessages 根据 msg.when 从小到大排列
        Message p = mMessages;
        boolean needWake;
        if (p == null || when == 0 || when < p.when) {
            // 如果链表为空 || 当前 msg 的优先级最大 || 当前 msg 触发时间 < 系统时间
            // 则设置 msg 为链表的第一条消息,优先执行
            msg.next = p;
            mMessages = msg;
            // mBlocked: 记录目标线程是否处于睡眠状态, true 表示当前线程正在睡眠
            // needWake: 记录本次消息插入后,是否需要执行唤醒当前线程的操作
            needWake = mBlocked;
        } else {
            // 当前线程正在睡眠 && p 为同步消息屏障 && msg 为异步消息
            // 需要唤醒当前线程执行异步消息
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            Message prev;
            for (;;) {
                prev = p;
                p = p.next;
                // 根据 msg.when 找到 msg 插入的位置为 prev 后面
                if (p == null || when < p.when) {
                    break;
                }
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }
            }
        // 插入 msg
            msg.next = p;
            prev.next = msg;
        }
        // 当前线程正在睡眠,需要执行唤醒操作
        if (needWake) {
            nativeWake(mPtr);
        }
    }
    return true;
}

可以看到 MessageQueue 内部的的消息链表mMessages 是根据 msg.when 消息发送的时间来排序的,enqueueMessage() 在将 msg 插入消息队列之后会判断是否需要执行 nativeWake() 唤醒当前线程;

frameworks\base\core\jni\android_os_MessageQueue.cpp

void NativeMessageQueue::wake() {
    mLooper->wake();
}

system\core\libutils\Looper.cpp

void Looper::wake() {
    uint64_t inc = 1;
    // 向 mWakeEventFd 文件描述符中写入一个数据
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    ...
}

nativeWake() 函数用于唤醒当前线程,通过 wake() 会向 mWakeEventFd 文件描述符写入数据,此时会触发 epoll 的监听事件,从第二节 pollInner() 中的 epoll_wait 函数中唤醒当前线程,最终回到 MessageQueuenext() 中返回 msg;

3.2 消息处理

frameworks\base\core\java\android\os\Looper.java

public static void loop() {
    ...
    for (;;) {
        // 从消息队列中拿到了需要处理的消息
        Message msg = queue.next();
        if (msg == null) {
            return; // msg == null 表示退出消息循环
        }
        ...
        try {
            msg.target.dispatchMessage(msg);
           
        } finally {
           
        }
        ...
        // 回收 msg 防止重复创建
        msg.recycleUnchecked();
    }
}

frameworks\base\core\java\android\os\Handler.java

public void dispatchMessage(Message msg) {
    // 1. 发送的 msg 设置了 callback ,触发 msg 的 callback (handler.post())
    if (msg.callback != null) {
        handleCallback(msg);
    } else {
        // 2. 创建 Handler 时设置了 callback,触发 handler 的 callback
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        // 3. 正常情况下会走该分支,覆写 Handler.handleMessage() 处理消息
        handleMessage(msg);
    }
}

queue.next() 回来之后会触发 handler(msg.target) 的 dispatchMessage() 分发 msg

上一篇下一篇

猜你喜欢

热点阅读