Handler相关问题
一、在子线程中创建Handler,“Can't create handler inside thread XXX that has not called Looper.prepare()”
从Handler构造方法开始,直接上源码,
// Handler.java
public Handler(@Nullable Callback callback, boolean async) {
...
mLooper = Looper.myLooper();
if (mLooper == null) {
// 这里就是异常抛出的地方
throw new RuntimeException(
"Can't create handler inside thread " + Thread.currentThread()
+ " that has not called Looper.prepare()");
}
...
}
接着看Looper.myLooper()
做了什么,
// Looper.java
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
这段代码很简单,就是从sThreadLocal
拿东西,从返回值知道,拿的是一个Looper对象,并且可能为null。接下来就需要好好了解一下这个ThreadLocal是个什么东西,简单讲,ThreadLocal就是一个普通工具类,用于管理线程的本地变量,使用自定义数据结构ThreadLocalMap(ThreadLocal的静态内部类,本质就是一个hashMap)来存储这些变量。
ThreadLocal既然有get(),那就肯定有set(),我们先不看get做了什么,先看set做了什么,
// ThreadLocal.java
public void set(T value) {
// 获取当前线程
Thread t = Thread.currentThread();
// 获取当前线程的ThreadLocalMap变量
ThreadLocalMap map = getMap(t);
if (map != null)
// 以当前ThreadLocal对象为key,存入传进来的value
map.set(this, value);
else
// 为当前线程的ThreadLocalMap变量赋值
createMap(t, value);
}
// ThreadLocal#getMap(Thread t)
ThreadLocalMap getMap(Thread t) {
// threadLocals是Thread的一个类行为ThreadLocalMap的成员变量,用于保存该线程的本地变量
return t.threadLocals;
}
// ThreadLocal#createMap(Thread t, T firstValue)
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
现在我们知道ThreadLocal.set()
的作用就是给当前线程的成员变量threadLocals
赋值,以Looper为例,
// Looper.java
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
ThreadLocalMap的key就是Looper#sThreadLocal
,value就是新建的Looper
对象。接下来我们看看ThreadLocal.get()
做了什么,
// ThreadLocal.java
public T get() {
Thread t = Thread.currentThread();
// 从当前线程获取ThreadLocalMap,从前文可知,如果没有调用set,这个值为null
ThreadLocalMap map = getMap(t);
if (map != null) {
// 以Looper#sThreadLocal为key,获取entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
/**
* Variant of set() to establish initialValue. Used instead
* of set() in case user has overridden the set() method.
*
* @return the initial value
*/
private T setInitialValue() {
// 默认返回null
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
// 初始化ThreadLocalMap,但存入一个null值
createMap(t, value);
return value;
}
get()方法很简单,直接从当前线程的ThreadLocalMap中以Looper#sThreadLocal
为key获取Looper对象,如果当前线程ThreadLocalMap为null,则该方法也会返回null。
现在来回答最开始的问题,子线程中创建Handler报异常,是因为没有调用
Looper.prepare()
,向当前线程(即子线程)的ThreadLocalMap类型的成员变量threadLocals
中存入Looper对象,所以在构造Handler的时候无法获取当前线程的Looper对象,故而抛出异常。
二、Handler机制中怎么保证一个Thread只有一个Looper?
上文我们知道,在一个线程中,如果要用到Handler机制,必须先调用Looper.prepare()
才能使用Handler的sendXX和postXX接口,先看看prepare()方法里做了什么:
// Looper.java
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
很明显,上面代码中异常就表明一个线程中只能有一个Looper,又回到了sThreadLocal
上。这里值得注意的一点是Looper#sThreadLocal
的初始化,
// Looper.java
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
static final
说明sThreadLocal
全局只会初始化一次,且不可变更。
// ThreadLocal.java
public void set(T value) {
// 获取当前线程
Thread t = Thread.currentThread();
// 获取当前线程的ThreadLocalMap变量
ThreadLocalMap map = getMap(t);
if (map != null)
// 以当前ThreadLocal对象为key,存入传进来的value
map.set(this, value);
else
// 为当前线程的ThreadLocalMap变量赋值
createMap(t, value);
}
从set源码可看到,往threadLocals
中存入数据的key始终是sThreadLocal
,这就说明在调用Looper.prepare()
的时候始终只会更新当前线程的Looper,当然这个会抛异常,不会存在更新Looper的情况。
现在回答问题,给线程设置Looper是通过调用
Looper.prepare()
实现,在该方法中有做限制,一个线程仅能绑定一个Looper。
三、MessageQueue是什么数据结构,为什么采用这样的数据结构?
先看看Message的源码:
public final class Message implements Parcelable {
...
/*package*/ Message next;
...
}
很明显是链表结构,采用这种结构有以下几种好处:
1. 解耦:在项目启动之初来预测未来项目会遇到什么需求是很困难的。但消息队列用于数据存取,通过定义基于数据的借口层,存取两边都实现这一接口。这允许你独立扩展或修改两边的处理过程,只要确保他们遵循同样的借口约束。
2. 冗余:有些情况下,处理数据的过程会失败。除非数据持久化,否则将造成数据丢失。消息队列将数据持久化直到他们被完全处理,通过这一方式避免数据丢失的风险。在消息队列所采用的“插入-获取-删除”范式中,在把一个消息删除之前,需要在处理过程中明确指出该消息已经被处理完毕,以确保数据被安全的保存到使用完毕。
3. 扩展性:因为消息队列解耦了处理过程,所以增大消息入列和处理的频率就变得很容易:只要另外增加处理逻辑就行,不需要更改代码。
4. 缓冲:消息队列本身就是一个很好的缓冲结构,写入端的速度可以足够快,不用受读取端效率影响。该缓冲有助于控制和优化数据流经过系统的速度。
5. 顺序保证:消息队列本来就是“先进先出”的数据结构,能够保证数据能够按照特定的顺序来处理。
6. 异步通信:消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。即写入端和读取端是异步的。
7. 内存友好:链表数据结构在内存占用上不连续,能提高内存申请效率。
四、Handler中的Looper是怎么运作的?
Looper作为Handler机制中中承上启下的一个组件,其作用是启动一个死循环,不断从MessageQueue
中获取message并处理,如果message为null,则退出循环;否则通过message.target.dispatchMessage()
将消息分发给handler处理。
// Looper.java
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
// 从当前Thread绑定的Looper中获取MessageQueue
final MessageQueue queue = me.mQueue;
...
// 启动死循环,轮询Message
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
...
try {
// 分发Message到Handler
msg.target.dispatchMessage(msg);
} finally {
...
}
...
// 回收Message
msg.recycleUnchecked();
}
}
这里的死循环保证能源源不断的从MessageQueue中通过next()
获取下一个需要执行的Message,只有在无Message要处理的时候才会退出循环。下面看看next()
做了什么。
Message next() {
// 1. 如果 native消息队列指针映射已经为0,即虚引用,说明消息队列已经退出,没有消息了。
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
// 2. 死循环,当未获取到需要 `分发处理` 的消息时,保持空转
for (;;) {
…
// 3. 调用native层方法,poll message,注意,消息还存在于native层
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
// 如果头节点消息是一个同步屏障,则找到消息队列中第一个异步消息
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
// 指针后移,直到指向第一个异步消息
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
// 还没到执行时间,计算需要等待的时间
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
// 将mMessages设置为下一个要执行的消息
mMessages = msg.next;
}
// 释放指针对下一个消息的引用
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
// 没有消息,消息队列一直阻塞
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
// 下面是处理IdleHandler的逻辑
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[I];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
处理逻辑也比较清晰,如果头节点是同步屏障,先处理异步消息;由于消息队列是按照处理时间排好序的,所以直接取第一个消息,根据需要处理的时间设置好等待时间,等待时间为0则直接返回msg;如果没有取到消息,则一直等待下去;最后处理IdleHandler。
另外提个问题:为什么在Looper和MQ中都需要死循环?
- Looper中的循环负责消息队列中消息的分发,死循环保障消息分发一直处于运行中,不循环就停止分发;
- MQ的
next()
中的死循环负责获取消息,保障Looper可以获取有效的消息,使Looper可以一直运行下去,next()
只要发现有效消息就返回,即跳出死循环。
五、Handler插入延时消息是怎么处理的?
Handler插入消息最终都是走到sendMessageAtTime(msg, long)
,然后通过enqueueMessage (queue, msg, long)
放入消息队列:
// Handler.java
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
// Handler.java
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
// 持有当前Handler
msg.target = this;
// 是否为异步消息
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
上面有两点值得注意:
sendMessageAtTime(msg, long)
第二个参数使用的是android.os.SystemClock#uptimeMillis
,这个时间从开机开始计算。msg.target = this;
:msg持有Handler引用,而Handler如果作为匿名内部类,会持有外部类的引用,这里会出现引用的持有与释放问题,有内存泄漏隐患。
下面看看MQ的enqueueMessage做了什么:
boolean enqueueMessage(Message msg, long when) {
…
synchronized (this) {
…
msg.markInUse();
msg.when = when;
Message p = mMessages; // 当前头节点
boolean needWake;
// 如果消息队列为空,或者头节点消息处理时间小于当前消息,则将当前消息设置为头节点,并设置唤醒消息队列
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
// 此处意思是,如果当前队列头部有一个同步屏障,并且入列的消息msg是一个马上需要执行的异步消息,此时就需要唤醒队列
// 默认消息入列的时候不需要唤醒队列,即needWake = false。假如下面判断needWake = true,表明p是一个同步屏障
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
// 指针后移(使用临时变量prev保存上一个节点信息,使p == prev.next恒成立),直到移到消息队列尾,或者当前消息处理时间小于p的执行时间
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
// 如果needWake = true,说明p是同步屏障,此时已经将p后移了,则没有必要唤醒消息队列
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
// 将当前消息插入到p消息和prev消息之间
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
// 唤醒消息队列
nativeWake(mPtr);
}
}
return true;
}
Handler插入的延时消息是根据msg的when,选择合适的位置插入消息队列。因此,消息队列是一个按照时间排序的有序队列。
六、消息队列阻塞和唤醒的原理?
当消息队列中的第一个消息执行时间还没到,这个时候调用nativePollOnce(ptr, nextPollTimeoutMillis)
来阻塞当前线程并进入休眠,避免空转,当等待时间nextPollTimeoutMillis
到了的时候又会自动唤醒线程,同时也可以使用nativeWake(mPtr)
来唤醒线程,下面就来看看这两个本地方法是怎么运作的。
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
Java层通过JNI调到Native层的android_os_MessageQueue_nativePollOnce
方法,下面看看Looper#pollOnce
做了什么:
//Looper.h
int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, NULL, NULL, NULL);
}
//实现
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
...
result = pollInner(timeoutMillis);
}
}
先处理Native层滞留的Response,然后调用pollInner,继续往下看:
int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
// 即将处于idle状态
mPolling = true;
// fd最大的个数是16
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 等待时间发生或者超时,在nativeWake()方法,向管道写端写入字符,则方法会返回。
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
// 不再处于idle状态
mPolling = false;
// 请求锁 ,因为在Native Message的处理和添加逻辑上需要同步
// Acquire lock.
mLock.lock();
// Rebuild epoll set if needed.
// 如果需要,重建epoll
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
// epoll重建,直接跳转到Done
rebuildEpollLocked();
goto Done;
}
// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
// epoll事件个数小于0,发生错误,直接跳转Done
result = POLL_ERROR;
goto Done;
}
// Check for poll timeout.
// 如果需要,重建epoll
if (eventCount == 0) {
//epoll事件个数等于0,发生超时,直接跳转Done
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = POLL_TIMEOUT;
goto Done;
}
// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
// 循环处理所有的事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
//首先处理mWakeEventFd
if (fd == mWakeEventFd) {
//如果是唤醒mWakeEventFd有反应
if (epollEvents & EPOLLIN) {
/**重点代码*/
// 已经唤醒了,则读取并清空管道数据
awoken(); // 该函数内部就是read,从而使FD可读状态被清除
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
// 其他input fd处理,其实就是将活动放入response队列,等待处理
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 处理request,生成对应的response对象,push到响应数组
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// Invoke pending message callbacks.
// 再处理Native的Message,调用相应回调方法
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
// 释放锁
mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
// 处理消息事件
handler->handleMessage(message);
} // release handler
// 请求锁
mLock.lock();
mSendingMessage = false;
// 发生回调
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
// 释放锁
mLock.unlock();
// Invoke all response callbacks.
// 处理带有Callback()方法的response事件,执行Response相应的回调方法
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
// Invoke the callback. Note that the file descriptor may be closed by
// the callback (and potentially even reused) before the function returns so
// we need to be a little careful when removing the file descriptor afterwards.
// 处理请求的回调方法
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
// 移除fd
removeFd(fd, response.request.seq);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
// 清除response引用的回调方法
response.request.callback.clear();
// 发生回调
result = POLL_CALLBACK;
}
}
return result;
}
pollInner()
方法的处理流程:
- 先调用epoll_wait(),这是阻塞方法,用于等待事件发生或者超时。
- 对于epoll_wait()返回,当且仅当以下3种情况出现:
- POLL_ERROR:发生错误,直接跳转Done
- POLL_TIMEOUT:发生超时,直接跳转到Done
- 检测到管道有事情发生,则再根据情况做相应处理:
a. 如果检测到管道产生事件,则直接读取管道的数据
b. 如果是其他事件,则处理request,生成对应的response对象,push到response数组
- 进入Done标记位的代码:
a. 先处理Native的Message,调用Native的Handler来处理该Message
b. 再处理Resposne数组,POLL_CALLBACK类型的事件
参考https://www.tinymind.net.cn/articles/479757df993a94
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
// 将Java层传递下来的mPtr转换为nativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
//调用wake函数
nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
// Looper.h
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
// 向管道mWakeEventFd写入字符1
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
Looper类的
wake()
函数只是往mWakeEventfd中写了一些内容,这个fd只是通知而已,类似于pipe,最后会把epoll_wait()
唤醒,线程就不阻塞了继续发送 Native层的消息,然后处理之前的addFd事件,然后处理Java层的消息。
这里面涉及pipe/epoll机制。
pipe(管道),是指用于连接一个读进程和一个写进程的共享文件,又称pipe文件。
向管道(共享文件)提供输入的发送进程(即写进程),以字符流的形式将大量数据送入管道(写入过程);而接受管道输出的接收进程(即读进程),可从管道接收数据,标准的生产者消费者模式。
为了协调双方的通信,管道通信机制必须提供以下3 方面的协调能力:
- 互斥。当一个进程正在对 pipe 进行读/写操作时,另一个进程必须等待。
- 同步。当写(输入)进程把一定数量(如4KB)数据写入 pipe 后,便去睡眠等待,直到读(输出)进程取走数据后,再把它唤醒。当读进程读到一空 pipe 时,也应睡眠等待,直至写进程将数据写入管道后,才将它唤醒。
-
对方是否存在。只有确定对方已存在时,才能进行通信。
image.png
epoll是Linux I/O多路复用的一种机制,可以监视多个描述符fd,一旦某个描述符就绪,能够通知程序进行相应的操作。