Handler相关问题

2022-06-26  本文已影响0人  小O机

一、在子线程中创建Handler,“Can't create handler inside thread XXX that has not called Looper.prepare()”

从Handler构造方法开始,直接上源码,

// Handler.java
public Handler(@Nullable Callback callback, boolean async) {
    ...

    mLooper = Looper.myLooper();
    if (mLooper == null) {
        // 这里就是异常抛出的地方
        throw new RuntimeException(
            "Can't create handler inside thread " + Thread.currentThread()
                    + " that has not called Looper.prepare()");
    }

    ...
}

接着看Looper.myLooper()做了什么,

// Looper.java
public static @Nullable Looper myLooper() {
    return sThreadLocal.get();
}

这段代码很简单,就是从sThreadLocal拿东西,从返回值知道,拿的是一个Looper对象,并且可能为null。接下来就需要好好了解一下这个ThreadLocal是个什么东西,简单讲,ThreadLocal就是一个普通工具类,用于管理线程的本地变量,使用自定义数据结构ThreadLocalMap(ThreadLocal的静态内部类,本质就是一个hashMap)来存储这些变量。
ThreadLocal既然有get(),那就肯定有set(),我们先不看get做了什么,先看set做了什么,

// ThreadLocal.java
public void set(T value) {
    // 获取当前线程
    Thread t = Thread.currentThread();
    // 获取当前线程的ThreadLocalMap变量
    ThreadLocalMap map = getMap(t);
    if (map != null)
        // 以当前ThreadLocal对象为key,存入传进来的value
        map.set(this, value);
    else
        // 为当前线程的ThreadLocalMap变量赋值
        createMap(t, value);
}

// ThreadLocal#getMap(Thread t)
ThreadLocalMap getMap(Thread t) {
    // threadLocals是Thread的一个类行为ThreadLocalMap的成员变量,用于保存该线程的本地变量
    return t.threadLocals;
}

// ThreadLocal#createMap(Thread t, T firstValue)
void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

现在我们知道ThreadLocal.set()的作用就是给当前线程的成员变量threadLocals赋值,以Looper为例,

// Looper.java
private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}

ThreadLocalMap的key就是Looper#sThreadLocal,value就是新建的Looper对象。接下来我们看看ThreadLocal.get()做了什么,

// ThreadLocal.java
public T get() {
    Thread t = Thread.currentThread();
    // 从当前线程获取ThreadLocalMap,从前文可知,如果没有调用set,这个值为null
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        // 以Looper#sThreadLocal为key,获取entry
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

/**
 * Variant of set() to establish initialValue. Used instead
 * of set() in case user has overridden the set() method.
 *
 * @return the initial value
 */
private T setInitialValue() {
    // 默认返回null
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        // 初始化ThreadLocalMap,但存入一个null值
        createMap(t, value);
    return value;
}

get()方法很简单,直接从当前线程的ThreadLocalMap中以Looper#sThreadLocal为key获取Looper对象,如果当前线程ThreadLocalMap为null,则该方法也会返回null。

现在来回答最开始的问题,子线程中创建Handler报异常,是因为没有调用Looper.prepare(),向当前线程(即子线程)的ThreadLocalMap类型的成员变量threadLocals中存入Looper对象,所以在构造Handler的时候无法获取当前线程的Looper对象,故而抛出异常。

二、Handler机制中怎么保证一个Thread只有一个Looper?

上文我们知道,在一个线程中,如果要用到Handler机制,必须先调用Looper.prepare()才能使用Handler的sendXX和postXX接口,先看看prepare()方法里做了什么:

// Looper.java
private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}

很明显,上面代码中异常就表明一个线程中只能有一个Looper,又回到了sThreadLocal上。这里值得注意的一点是Looper#sThreadLocal的初始化,

// Looper.java
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

static final说明sThreadLocal全局只会初始化一次,且不可变更。

// ThreadLocal.java
public void set(T value) {
    // 获取当前线程
    Thread t = Thread.currentThread();
    // 获取当前线程的ThreadLocalMap变量
    ThreadLocalMap map = getMap(t);
    if (map != null)
        // 以当前ThreadLocal对象为key,存入传进来的value
        map.set(this, value);
    else
        // 为当前线程的ThreadLocalMap变量赋值
        createMap(t, value);
}

从set源码可看到,往threadLocals中存入数据的key始终是sThreadLocal,这就说明在调用Looper.prepare()的时候始终只会更新当前线程的Looper,当然这个会抛异常,不会存在更新Looper的情况。

现在回答问题,给线程设置Looper是通过调用Looper.prepare()实现,在该方法中有做限制,一个线程仅能绑定一个Looper。

三、MessageQueue是什么数据结构,为什么采用这样的数据结构?

先看看Message的源码:

public final class Message implements Parcelable {
    ...
    /*package*/ Message next;
    ...
}

很明显是链表结构,采用这种结构有以下几种好处:

1. 解耦:在项目启动之初来预测未来项目会遇到什么需求是很困难的。但消息队列用于数据存取,通过定义基于数据的借口层,存取两边都实现这一接口。这允许你独立扩展或修改两边的处理过程,只要确保他们遵循同样的借口约束。
2. 冗余:有些情况下,处理数据的过程会失败。除非数据持久化,否则将造成数据丢失。消息队列将数据持久化直到他们被完全处理,通过这一方式避免数据丢失的风险。在消息队列所采用的“插入-获取-删除”范式中,在把一个消息删除之前,需要在处理过程中明确指出该消息已经被处理完毕,以确保数据被安全的保存到使用完毕。
3. 扩展性:因为消息队列解耦了处理过程,所以增大消息入列和处理的频率就变得很容易:只要另外增加处理逻辑就行,不需要更改代码。
4. 缓冲:消息队列本身就是一个很好的缓冲结构,写入端的速度可以足够快,不用受读取端效率影响。该缓冲有助于控制和优化数据流经过系统的速度。
5. 顺序保证:消息队列本来就是“先进先出”的数据结构,能够保证数据能够按照特定的顺序来处理。
6. 异步通信:消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。即写入端和读取端是异步的。
7. 内存友好:链表数据结构在内存占用上不连续,能提高内存申请效率。

四、Handler中的Looper是怎么运作的?

Looper作为Handler机制中中承上启下的一个组件,其作用是启动一个死循环,不断从MessageQueue中获取message并处理,如果message为null,则退出循环;否则通过message.target.dispatchMessage()将消息分发给handler处理。

// Looper.java
public static void loop() {
    final Looper me = myLooper();
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    // 从当前Thread绑定的Looper中获取MessageQueue
    final MessageQueue queue = me.mQueue;

    ...
    
    // 启动死循环,轮询Message
    for (;;) {
        Message msg = queue.next(); // might block
        if (msg == null) {
            // No message indicates that the message queue is quitting.
            return;
        }

        ...

        try {
            // 分发Message到Handler
            msg.target.dispatchMessage(msg);
        } finally {
            ...
        }

        ...
        
        // 回收Message
        msg.recycleUnchecked();
    }
}

这里的死循环保证能源源不断的从MessageQueue中通过next()获取下一个需要执行的Message,只有在无Message要处理的时候才会退出循环。下面看看next()做了什么。

Message next() {
    // 1. 如果 native消息队列指针映射已经为0,即虚引用,说明消息队列已经退出,没有消息了。
    final long ptr = mPtr;
    if (ptr == 0) {
        return null;
    }

    int pendingIdleHandlerCount = -1; // -1 only during first iteration
    int nextPollTimeoutMillis = 0;
    // 2. 死循环,当未获取到需要 `分发处理` 的消息时,保持空转
    for (;;) {
        …
     
        // 3. 调用native层方法,poll message,注意,消息还存在于native层
        nativePollOnce(ptr, nextPollTimeoutMillis);

        synchronized (this) {
            // Try to retrieve the next message.  Return if found.
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            // 如果头节点消息是一个同步屏障,则找到消息队列中第一个异步消息
            if (msg != null && msg.target == null) {
                // Stalled by a barrier.  Find the next asynchronous message in the queue.
                do {
                        // 指针后移,直到指向第一个异步消息
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
            if (msg != null) {
                if (now < msg.when) {
                    // Next message is not ready.  Set a timeout to wake up when it is ready.
                        // 还没到执行时间,计算需要等待的时间
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // Got a message.
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                             // 将mMessages设置为下一个要执行的消息
                        mMessages = msg.next;
                    }
                        // 释放指针对下一个消息的引用
                    msg.next = null;
                    if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                    msg.markInUse();
                    return msg;
                }
            } else {
                // No more messages.
                   // 没有消息,消息队列一直阻塞
                nextPollTimeoutMillis = -1;
            }

            // Process the quit message now that all pending messages have been handled.
            if (mQuitting) {
                dispose();
                return null;
            }

              // 下面是处理IdleHandler的逻辑

            // If first time idle, then get the number of idlers to run.
            // Idle handles only run if the queue is empty or if the first message
            // in the queue (possibly a barrier) is due to be handled in the future.
            if (pendingIdleHandlerCount < 0
                    && (mMessages == null || now < mMessages.when)) {
                pendingIdleHandlerCount = mIdleHandlers.size();
            }
            if (pendingIdleHandlerCount <= 0) {
                // No idle handlers to run.  Loop and wait some more.
                mBlocked = true;
                continue;
            }

            if (mPendingIdleHandlers == null) {
                mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
            }
            mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
        }

        // Run the idle handlers.
        // We only ever reach this code block during the first iteration.
        for (int i = 0; i < pendingIdleHandlerCount; i++) {
            final IdleHandler idler = mPendingIdleHandlers[I];
            mPendingIdleHandlers[i] = null; // release the reference to the handler

            boolean keep = false;
            try {
                keep = idler.queueIdle();
            } catch (Throwable t) {
                Log.wtf(TAG, "IdleHandler threw exception", t);
            }

            if (!keep) {
                synchronized (this) {
                    mIdleHandlers.remove(idler);
                }
            }
        }

        // Reset the idle handler count to 0 so we do not run them again.
        pendingIdleHandlerCount = 0;

        // While calling an idle handler, a new message could have been delivered
        // so go back and look again for a pending message without waiting.
        nextPollTimeoutMillis = 0;
    }
}

处理逻辑也比较清晰,如果头节点是同步屏障,先处理异步消息;由于消息队列是按照处理时间排好序的,所以直接取第一个消息,根据需要处理的时间设置好等待时间,等待时间为0则直接返回msg;如果没有取到消息,则一直等待下去;最后处理IdleHandler。

另外提个问题:为什么在Looper和MQ中都需要死循环?

五、Handler插入延时消息是怎么处理的?

Handler插入消息最终都是走到sendMessageAtTime(msg, long),然后通过enqueueMessage (queue, msg, long)放入消息队列:

// Handler.java
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    if (queue == null) {
        RuntimeException e = new RuntimeException(
                this + " sendMessageAtTime() called with no mQueue");
        Log.w("Looper", e.getMessage(), e);
        return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
}

// Handler.java
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    // 持有当前Handler
    msg.target = this;
    // 是否为异步消息
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}

上面有两点值得注意:

  1. sendMessageAtTime(msg, long)第二个参数使用的是android.os.SystemClock#uptimeMillis,这个时间从开机开始计算。
  2. msg.target = this;:msg持有Handler引用,而Handler如果作为匿名内部类,会持有外部类的引用,这里会出现引用的持有与释放问题,有内存泄漏隐患。

下面看看MQ的enqueueMessage做了什么:

boolean enqueueMessage(Message msg, long when) {
    …

    synchronized (this) {
        …

        msg.markInUse();
        msg.when = when;
        Message p = mMessages;  // 当前头节点
        boolean needWake;
        // 如果消息队列为空,或者头节点消息处理时间小于当前消息,则将当前消息设置为头节点,并设置唤醒消息队列
        if (p == null || when == 0 || when < p.when) {
            // New head, wake up the event queue if blocked.
            msg.next = p;
            mMessages = msg;
            needWake = mBlocked;
        } else {
            // Inserted within the middle of the queue.  Usually we don't have to wake
            // up the event queue unless there is a barrier at the head of the queue
            // and the message is the earliest asynchronous message in the queue.
            // 此处意思是,如果当前队列头部有一个同步屏障,并且入列的消息msg是一个马上需要执行的异步消息,此时就需要唤醒队列
            // 默认消息入列的时候不需要唤醒队列,即needWake = false。假如下面判断needWake = true,表明p是一个同步屏障
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            Message prev;
            for (;;) {
                // 指针后移(使用临时变量prev保存上一个节点信息,使p == prev.next恒成立),直到移到消息队列尾,或者当前消息处理时间小于p的执行时间
                prev = p;
                p = p.next;
                if (p == null || when < p.when) {
                    break;
                }
                // 如果needWake = true,说明p是同步屏障,此时已经将p后移了,则没有必要唤醒消息队列
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }
            }
            // 将当前消息插入到p消息和prev消息之间
            msg.next = p; // invariant: p == prev.next
            prev.next = msg;
        }

        // We can assume mPtr != 0 because mQuitting is false.
        if (needWake) {
            // 唤醒消息队列
            nativeWake(mPtr);
        }
    }
    return true;
}

Handler插入的延时消息是根据msg的when,选择合适的位置插入消息队列。因此,消息队列是一个按照时间排序的有序队列

六、消息队列阻塞和唤醒的原理?

当消息队列中的第一个消息执行时间还没到,这个时候调用nativePollOnce(ptr, nextPollTimeoutMillis)来阻塞当前线程并进入休眠,避免空转,当等待时间nextPollTimeoutMillis到了的时候又会自动唤醒线程,同时也可以使用nativeWake(mPtr)来唤醒线程,下面就来看看这两个本地方法是怎么运作的。

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}

Java层通过JNI调到Native层的android_os_MessageQueue_nativePollOnce方法,下面看看Looper#pollOnce做了什么:

//Looper.h
int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollOnce(int timeoutMillis) {
    return pollOnce(timeoutMillis, NULL, NULL, NULL);
}

//实现
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p",
                        this, ident, fd, events, data);
#endif
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }

        ...

        result = pollInner(timeoutMillis);
    }
}

先处理Native层滞留的Response,然后调用pollInner,继续往下看:

int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif

    // Adjust the timeout based on when the next message is due.
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
                this, mNextMessageUptime - now, timeoutMillis);
#endif
    }

    // Poll.
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

    // We are about to idle.
     // 即将处于idle状态
    mPolling = true;
    // fd最大的个数是16
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    // 等待时间发生或者超时,在nativeWake()方法,向管道写端写入字符,则方法会返回。
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // No longer idling.
    // 不再处于idle状态
    mPolling = false;
     // 请求锁 ,因为在Native Message的处理和添加逻辑上需要同步
    // Acquire lock.
    mLock.lock();

    // Rebuild epoll set if needed.
    // 如果需要,重建epoll
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        // epoll重建,直接跳转到Done
        rebuildEpollLocked();
        goto Done;
    }

    // Check for poll error.
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error, errno=%d", errno);
        // epoll事件个数小于0,发生错误,直接跳转Done
        result = POLL_ERROR;
        goto Done;
    }

    // Check for poll timeout.
    // 如果需要,重建epoll
    if (eventCount == 0) {
    //epoll事件个数等于0,发生超时,直接跳转Done
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - timeout", this);
#endif
        result = POLL_TIMEOUT;
        goto Done;
    }

    // Handle all events.
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
   // 循环处理所有的事件
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        //首先处理mWakeEventFd
        if (fd == mWakeEventFd) {
            //如果是唤醒mWakeEventFd有反应
            if (epollEvents & EPOLLIN) {
                /**重点代码*/
                // 已经唤醒了,则读取并清空管道数据
                awoken();  // 该函数内部就是read,从而使FD可读状态被清除
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
            // 其他input fd处理,其实就是将活动放入response队列,等待处理
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                 // 处理request,生成对应的response对象,push到响应数组
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
Done: ;
    // Invoke pending message callbacks.
    // 再处理Native的Message,调用相应回调方法
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            // Remove the envelope from the list.
            // We keep a strong reference to the handler until the call to handleMessage
            // finishes.  Then we drop it so that the handler can be deleted *before*
            // we reacquire our lock.
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                 // 释放锁
                mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
                        this, handler.get(), message.what);
#endif
                // 处理消息事件
                handler->handleMessage(message);
            } // release handler
            // 请求锁
            mLock.lock();
            mSendingMessage = false;
             // 发生回调
            result = POLL_CALLBACK;
        } else {
            // The last message left at the head of the queue determines the next wakeup time.
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    // Release lock.
    // 释放锁
    mLock.unlock();

    // Invoke all response callbacks.
    // 处理带有Callback()方法的response事件,执行Response相应的回调方法
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
            ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
                    this, response.request.callback.get(), fd, events, data);
#endif
            // Invoke the callback.  Note that the file descriptor may be closed by
            // the callback (and potentially even reused) before the function returns so
            // we need to be a little careful when removing the file descriptor afterwards.
            // 处理请求的回调方法
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                // 移除fd
                removeFd(fd, response.request.seq);
            }

            // Clear the callback reference in the response structure promptly because we
            // will not clear the response vector itself until the next poll.
             // 清除response引用的回调方法
            response.request.callback.clear();
             // 发生回调
            result = POLL_CALLBACK;
        }
    }
    return result;
}

pollInner()方法的处理流程:

  1. 先调用epoll_wait(),这是阻塞方法,用于等待事件发生或者超时。
  2. 对于epoll_wait()返回,当且仅当以下3种情况出现:
  • POLL_ERROR:发生错误,直接跳转Done
  • POLL_TIMEOUT:发生超时,直接跳转到Done
  • 检测到管道有事情发生,则再根据情况做相应处理:
    a. 如果检测到管道产生事件,则直接读取管道的数据
    b. 如果是其他事件,则处理request,生成对应的response对象,push到response数组
  1. 进入Done标记位的代码:
    a. 先处理Native的Message,调用Native的Handler来处理该Message
    b. 再处理Resposne数组,POLL_CALLBACK类型的事件
    参考https://www.tinymind.net.cn/articles/479757df993a94
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    // 将Java层传递下来的mPtr转换为nativeMessageQueue
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    //调用wake函数
    nativeMessageQueue->wake();
}

void NativeMessageQueue::wake() {
    mLooper->wake();
}

// Looper.h
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake", this);
#endif

    uint64_t inc = 1;
    // 向管道mWakeEventFd写入字符1
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            ALOGW("Could not write wake signal, errno=%d", errno);
        }
    }
}

Looper类的wake()函数只是往mWakeEventfd中写了一些内容,这个fd只是通知而已,类似于pipe,最后会把epoll_wait()唤醒,线程就不阻塞了继续发送 Native层的消息,然后处理之前的addFd事件,然后处理Java层的消息。

这里面涉及pipe/epoll机制。

pipe(管道),是指用于连接一个读进程和一个写进程的共享文件,又称pipe文件。

向管道(共享文件)提供输入的发送进程(即写进程),以字符流的形式将大量数据送入管道(写入过程);而接受管道输出的接收进程(即读进程),可从管道接收数据,标准的生产者消费者模式。

为了协调双方的通信,管道通信机制必须提供以下3 方面的协调能力:

epoll是Linux I/O多路复用的一种机制,可以监视多个描述符fd,一旦某个描述符就绪,能够通知程序进行相应的操作。

上一篇下一篇

猜你喜欢

热点阅读