Android消息处理机制工作原理
Looper 的工作机制
Looper 的创建与使用
Looper 的使用流程
- 创建 Looper 对象 Looper.prepare()
- 开始循环 Looper.loop()
- 在循环中获取 MessageQueue.next() 获取消息处理
- Looper.quit() 结束
在子线程中使用 Looper
//在子线程中使用 Looper
new Thread() {
@Override
public void run() {
super.run();
//在子线程中创建 Looper 对象
Looper.prepare();
final Handler handler = new Handler(Looper.myLooper());
//发送消息到消息队列中
handler.post(new Runnable() {
@Override
public void run() {
System.out.println("post message in " + Thread.currentThread().getName() + " thread");
handler.postDelayed(this, 1000);
}
});
//开始轮训
Looper.loop();
}
}.start();
Looper 是如何创建的,它与 ThreadLocal,Thread,MessageQueue 的是什么关系?
对于 Looper 来说,每一个线程会保存一份实例,它是通过 ThreadLocal 来保证的,并且在 prepare 方法内部会在 Looper 创建前进行判断当前线程本地变量 ThreadLocal 是否存有 Looper,如果没有才会去创建一个 Looper,这就保证了一个线程只会有一个 Looper ,而对应的 mThread,mQueue 是在 Looper 的构造函数中实例化的,因此它也保证了一个 Looper 对应于一个线程 mThread和一个消息队列 mQueue。
//SDK28 Looper
public static void prepare() {
prepare(true);
}
private static void prepare(boolean quitAllowed) {
//从线程本地变量取,如果取到说明当前线程已经创建过了Looper了,不要重复创建,直接报错
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
//将创建的 Looper 保存到 ThreadLocal 中
sThreadLocal.set(new Looper(quitAllowed));
}
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
Handler 通讯原理
在 Handler 中不管是通过 sendXxx还是 postXxx 发送 Message 最终都会走到enqueueMessage
方法,也就是将 Message 放入到 MessageQueue 队列中。
多个线程是通过共享内存的 Message
的方式来实现线程间数据通讯。
//SDK 28 Handler#enqueueMessage
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
//指定 msg 的 target 为当前 Handler 对象,之后在消息分发时会用到
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
线程间通讯是通过共享 Message 的方式,而 Message 则是通过 MessageQueue
来管理的 ,因此在 Message 入队
和出队
就要考虑在多线程
操作的情况可能出现线程安全问题,因此在消息入队
enqueueMessage() 和消息出队
next() 内部使用了 synchronized
锁确保线程安全。这里可以将其看成是一种生产者消费者
模式
//MessageQueue#enqueueMessage
//===============================消息入队=====================================
boolean enqueueMessage(Message msg, long when) {
...
//加锁
synchronized (this) {
//...
// msg 添加到队列中
}
return true;
}
//===============================消息出队=====================================
Message next() {
//...
for (;;) {
//加锁
synchronized (this) {
//...
return msg;
}
//...
}
}
MessageQueue的工作原理
Message出队和入队的实现
MessageQueue 是用来管理 Message 的,这里就设计到 Message 入队和出队两个操作
入队:根据时间排序,时间优先级存储,那么就可以做到消息延迟执行的。
- 插入到队头
- 插入到队列中间
//MessageQueue#enqueueMessage
Message p = mMessages;//队头
//当前消息 when 是比对头还早,那么就插入到队头
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
//重新更新队头的指针指向为当前入队的消息
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
//从队头开始找,找到一个 when < p.when,也就是找到第一个 Message 比当前插入的消息还要晚执行的
for (;;) {//这里查找,使用链表的效率会比较低
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
//将该 msg 插入到队列中,这是一个优先级队列,存储的时候就会排序,它不是链表
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
消息出队的过程,因为消息在入队时已经是根据 Message.when 排好序了,因此会从队头 mMessages 开始取消息。
队列中有消息:
1.如果当前队头 mMessages 指向的消息还没有执行的时间,那么就开始等待
2.如果当前队头 mMessages 指向的消息到了执行时间,那么就取出 mMessages 指向的 msg,并且将其从队列中移除,并标记 markInUse()。
队列没有消息:
nextPollTimeoutMillis = -1;这里会进行阻塞,可以参考生产者消费者模式,当消费端没有数据可以消费时,那么就开始等待生成者生产数据。
出队:由Looper.loop(),启动轮询器,对queue进行轮询。当消息达到执行时间就取出来。当 message queue为空的时候,队列阻塞,等消息队列调用enqueueMessage的时候,通知队列,可以取出消息,停止阻塞。
//MessageQueue#next()
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;//队头
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {//队列中有消息
if (now < msg.when) {//如果队头的消息还没有到执行时间,那么就等待...
// Next message is not ready. Set a timeout to wake up when it is ready
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE
} else {
// 获取一个消息
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;//更新队头
}
//将该消息从队列中移除
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
//标记该消息已经使用过了
msg.markInUse();
return msg;
}
} else {//队列没有消息
// No more messages.
nextPollTimeoutMillis = -1;
}
MessageQueue的阻塞机制
要了解 MessageQueue 等待唤醒机制之前要看看这个 nextPollTimeoutMillis 的作用,它表示阻塞等待的时间,它分别有以下3种赋值情况:
-
nextPollTimeoutMillis如果该值为0,表示不等待
-
nextPollTimeoutMillis如果该值为-1,表示一直等待,表示当前队列为空,所以需要一直等待,直接有消息入队才会被唤醒。
-
nextPollTimeoutMillis如果该值为指定要等待的时间(非0/-1),一般是当前消息还没有到执行的时间点,所以需要等待。
消息出队的逻辑 next()
Message next() {
...
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
//最终调用 Linux 层实现等待
/*
* 1.nextPollTimeoutMillis==0 表示不需要等待
* 2.nextPollTimeoutMillis==-1,表示无限期等待,一般是队列为空
* 3.nextPollTimeoutMillis==某一个等待值(不为0、-1)表示当前消息还没有到时间执行,需要等待
*/
nativePollOnce(ptr, nextPollTimeoutMillis);
if(消息还没到时间执行){
//等待时间
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
}else{//队列没有消息,队列为空
nextPollTimeoutMillis = -1;
}
...
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
//标记为阻塞状态,这个值会在 enqueueMessage 中使用
mBlocked = true;
continue;
}
...
}
}
消息入队的逻辑 enqueueMessage
boolean enqueueMessage(Message msg, long when) {
if(队列为空||当前插入消息执行时间早于队头的时间){
//mBlocked 就是在 next 中设置
//在队列为空,消息还没有达到执行的时间点这个值 mBlocked 就为 true
needWake = mBlocked;
} else{
//插入到队列中间,这时表示队列不为空,如果当前处于阻塞等待状态,系统会自动唤醒,因为此时肯定是队头的消息还没有到执行的时间点。nextPollTimeoutMillis==某一个等待值(不为0、-1)表示当前消息还没有到时间执行,需要等待
...
}
if (needWake) {//表示需要唤醒
nativeWake(mPtr);
}
}
Handler 设计亮点
消息如何分发呢?
//1.取出消息
Message msg = queue.next(); // might block
//2.分发消息
msg.target.dispatchMessage(msg);
//3.回收消息
msg.recycleUnchecked();
消息分发
public void dispatchMessage(Message msg) {
//1.消息需要自己处理,通过msg.callback进行回调
if (msg.callback != null) {
handleCallback(msg);
} else {
//2.Handler 设置了 Callback
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
//回调 Handler#handleMessage 处理消息
handleMessage(msg);
}
}
private static void handleCallback(Message message) {
message.callback.run();
}
- 第一种一般是 Handler.postXxx 方法这种
public final boolean post(Runnable r)
{
return sendMessageDelayed(getPostMessage(r), 0);
}
private static Message getPostMessage(Runnable r) {
Message m = Message.obtain();
m.callback = r;//给 message 设置一个 callback
return m;
}
- 第二种是在创建 Handler 时指定的 Callback 接口
public Handler(Callback callback, boolean async) {
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread " + Thread.currentThread()
+ " that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
- 自定义类继承Handler 实现 handleMessage 方法。
Message 享元模式的应用
Message 是实现线程间通讯的共享资源,而 Message 是存储在 messagequeue 中的,当消息通过 Handler 进行分发之后,那么这个消息就会被回收 msg.recycleUnchecked,具体看如下代码。之前也写过一篇 Message 复用的博客 Android消息处理机制之Message是如何重复利用的?
//SDK 28 Looper.java
//1.取出消息
Message msg = queue.next(); // might block
//2.分发消息
msg.target.dispatchMessage(msg);
//3.回收消息
msg.recycleUnchecked();
将消息的属性都恢复为默认值状态,并且将其插入到回收 sPool 队列中,那么下次再通过 Message.obtain() 获取 Message 时,可以从缓存的 sPool 队列中获取,节省了内存的开销。
这里的 recycleUnChecked() 是在 Looper 所在的线程中执行,而 obtain()有可能是在其他线程中执行,那么这里需要使用 synchronized 保证线程安全 。
通过 recycleUnChecked() 回收,obtain() 获取,这样就减少了频繁创建 Message 的开销,这里推荐使用 Message.obtain() 来获取一个缓存中的 Message 对象,而不是直接去 new Message() 这里也是一个内存优化的方向之一。
//SDK 28 Message.java
void recycleUnchecked() {
// Mark the message as in use while it remains in the recycled object pool.
// Clear out all other details.
flags = FLAG_IN_USE;
what = 0;
arg1 = 0;
arg2 = 0;
obj = null;
replyTo = null;
sendingUid = -1;
when = 0;
target = null;
callback = null;
data = null;
synchronized (sPoolSync) {//保证取和存是线程安全的
//这里做了一个队列大小的限制
if (sPoolSize < MAX_POOL_SIZE) {
//将 msg 插入到队列中
next = sPool;
//sPool 指向当前 msg
sPool = this;
sPoolSize++;
}
}
}
从 sPool 中取出一个 Message
public static Message obtain() {
synchronized (sPoolSync) {//保证取和存是线程安全的
if (sPool != null) {
Message m = sPool;
sPool = m.next;//更新 sPool 的引用指向下一个 Message 节点
m.next = null;
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
}
return new Message();
}
Looper什么时候退出?
子线程的 Looper 没有退出导致内存泄露,因为 Looper 一直在运行着,那么对应的 Message 处理时可能其它对象的引用,那么就可能出现内存泄露。
首先要确定,主线程的 Looper 在整个程序运行期间是不会被退出的,如果主线程 Looper 退出,那么表示程序已经退出了,所以这里只需要考虑子线程的 Looper 是如何退出的?
//SDK 28 Looper.java
Looper.myLooper().quit();
//Looper.myLooper().quitSafely();//注意做版本兼容
通知 MessageQueue 退出,MessageQueue 底层实现一套 Linux 等待通知机制,它需要通知 Linux 要退出消息轮训了。
//SDK 28 Looper.java
public void quit() {
mQueue.quit(false);
}
//SDK 28 MessageQueue.java
void quit(boolean safe) {
if (!mQuitAllowed) {
throw new IllegalStateException("Main thread not allowed to quit.");
}
synchronized (this) {
//避免重复退出
if (mQuitting) {
return;
}
//标记当前正在退出
mQuitting = true;
//消息的回收
if (safe) {
removeAllFutureMessagesLocked();
} else {
removeAllMessagesLocked();
}
// 唤起对应的线程
// We can assume mPtr != 0 because mQuitting was previously false.
nativeWake(mPtr);
}
}
内部的入队和出队是怎么停止的?
- 出队的处理
Message next() {
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
...
for(;;){
...
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
...
}
...
}
private void dispose() {
if (mPtr != 0) {
nativeDestroy(mPtr);
mPtr = 0;//标识为0
}
}
public static void loop() {
for (;;) {
//quit() 会返回 null,就退出 for 循环
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
}
}
- 入队的处理
boolean enqueueMessage(Message msg, long when) {
....
if (mQuitting) {//quit
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
...
}
Looper.loop() 为什么不会导致主线程不会阻塞出现 ANR?
Looper 的 loop() 函数设计如下,它是不断地在 for 循环去获取消息,分发消息,回收消息,而这个过程就是在主线程中进行的。
public static void loop() {
for(;;){
Message msg = queue.next(); // might block
msg.target.dispatchMessage(msg);
msg.recycleUnchecked();
}
}
而主线程的 Looper 是在 ActivityThread 中的 main 函数创建,具体代码如下:
public static void main(String[] args) {
Looper.prepareMainLooper();//创建主线程 Looper
//...
Looper.loop();
//从源码可以看出,主线程 Looper 是不能退出,一旦退出,那么就出现
throw new RuntimeException("Main thread loop unexpectedly exited");
}
- 创建主线程 的 Looper Looper.prepareMainLooper();
主线程的所有操作都会在这里执行,从这里也可以看出,正常情况下,主线程是不会被退出的。
- 开始消息轮训 Looper.loop();
如果手贱调用了 Looper.getMainLooper().quit(); 那么程序就会出现以下异常。
06-03 15:39:46.274 5061-5079/com.example.async E/AndroidRuntime: FATAL EXCEPTION: Thread-297
Process: com.example.async, PID: 5061
java.lang.IllegalStateException: Main thread not allowed to quit.
at android.os.MessageQueue.quit(MessageQueue.java:234)
at android.os.Looper.quit(Looper.java:216)
at com.example.async.MainActivity$1$2.run(MainActivity.java:44)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:135)
at com.example.async.MainActivity$1.run(MainActivity.java:50)
追踪这个异常是在哪里抛出的?
//SDk 28 MessageQueue.quit()
void quit(boolean safe) {
if (!mQuitAllowed) {//这个值是在构造函数中指定,表明是否可以退出
throw new IllegalStateException("Main thread not allowed to quit.");
}
}
//SDK 28 MessageQueue构造
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
//SDk Looper.prepareMainLooper
public static void prepareMainLooper() {
//这里传入 false 指定当前对应的 MessageQueue 是不可以退出的。
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
本文是笔者学习之后的总结,方便日后查看学习,有任何不对的地方请指正。
记录于 2019年6月3号