Java层 MessageQueue 消息队列分析
带着问题找答案:
本文主要解答,handler.sendMessageDelayed()延时消息是如何实现的?
1、Thread、Looper、MessageQueue、Handler 铁三角关系
1.1、Looper是依附在Thread之上的。一个Thread 对一个Looper
public final class Looper {
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
}
调用Looper.prepare()创建Looper后是保存在sThreadLocal中。
1.2、建立起消息循环,需要线程中调用Looper.loop()
[Thread]
public class HandlerThread extends Thread{
@Override
public void run() {
Looper.prepare();
Looper.loop();
}
}
1.3、Handler 持有run起来的Looper,是MessageQueue的入口和出口。
实例化Handler 需要传入一个Looper对象
HandlerThread handerThread = new HandlerThread("demo");
Handler handler = new Handler(handerThread.getLooper()){
@Override
public void handleMessage(@NonNull Message msg) {
super.handleMessage(msg);
}
};
1.3.1、Message 的核心属性
Message 的核心属性
public final class Message implements Parcelable {
public long when; //(1)消费该消息的预期时间点
Runnable callback; //(2)Handler.post()方法传入的Runnable对象
Handler target; // (3)发送该消息的handler,最终也是该Message消息的消费者;配合what执行具体的业务操作
public int what; // what 标识具体的业务
}
- when 标识消费该消息的预期时间点
- callback 记录Handler.post()方法传入的Runnable对象
- target 发送该消息的handler,最终也是该Message消息的消费者
- what 区分具体的业务
1.3.2、向Handler发送Message
- handler.sendMessageDelayed(Message.obtain(),1000)
- handler.sendMessage(Message.obtain())
- handler.post(Runnable { })
- handler.postDelayed(Runnable { },100)
向handler中添加消息 有多种形式:,无论是sendMessage() 还是post(),最终都会封装一个Message对象,然后调用sendMessageAtTime()方法。
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
sendMessageDelayed()
public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
postDelayed()
Runnable会封装成一个Message对象(Runnable保存Message.callBack属性上)
然后执行sendMessageDelayed()
private static Message getPostMessage(Runnable r) {
Message m = Message.obtain();
m.callback = r;
return m;
}
public final boolean postDelayed(@NonNull Runnable r, long delayMillis) {
return sendMessageDelayed(getPostMessage(r), delayMillis);
}
1.3.3、消费Message
Message从MessageQueue取出之后,会交给target Handler来处理,最终调用Handler.dispatchMessage()
Message msg = queue.next(); // might block
msg.target.dispatchMessage(msg);
dispatchMessage
public void dispatchMessage(@NonNull Message msg) {
//(1)Runnable != null 优先执行Runnable()
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
- 通过handler.post()发送的消息,Message.callBack 不为空,优先执行Runnable.run()
private static void handleCallback(Message message) {
message.callback.run();
}
- Handler 构建时 若指定了mCallback 回调,则由mCallback 来处理消息。一般情况mCallback都为null,所以此种情况可以不考虑
- 然后Message 交由Handler.handleMessage()来处理
class Handler{
public void handleMessage(@NonNull Message msg) {}
}
通过handler.sendMessage()发送的消息,需要复写Handler.handleMessage()来具体处理消息
二、延时消息是如何实现的。
- handler.sendMessageDelayed(Message.obtain(),1000)
- handler.postDelayed(Runnable { },100)
这就要看MessageQueue对象了。
2.1、入队列
[Handler]
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
long uptimeMillis) {
msg.target = this;
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
Handler.enqueueMessage()方法中
- 将handler本身 赋给了msg.target
- 调用MessageQueue.queueMessage()
MessageQueue中最重要的就是两个方法:
1.enqueueMessage向队列中插入消息
2.next 从队列中取出消息
我们先分析enqueueMessage:
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) { //排除msg.target == null 的case
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) { //排除 正在使用的msg
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages; //MesssageQueue中的消息队列 以单向链表的形式存在;mMessages 指向链表中的第一个幻速
boolean needWake;
//(1)新消息插入链表表头:a、当前队列没有待处理的消息 或者 b、新消息msg.when = 0(这种情况一般不存在)或者 c、新消息的触发时间 早于mMessages表头消息的触发时间
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else { //(2)将新消息按照msg.when时间的先后顺序,插入到mMessages链表的中间位置.使整个链表以时间排序。
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
//(3)若当前是阻塞状态,则唤醒next()操作
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
MessageQueue中的消息 是一个单向链表的形式保存的,mMessages 变量 指向链表的表头。Message链表中的元素是以触发时间(when)为基准,从小道大排列的,when小的排在链表前面,优先被处理;when大的排在链表的后面,延后处理。
2.1.1、 enqueueMessage 主要做了两件事情:
(1)将新来的Message消息 插入到链表中合适的位置。
满足以下条件,新message会被插入到链表头部
- a、当前队列没有待处理的消息
- b、新消息msg.when = 0(这种情况一般不存在)
- c、新消息的触发时间 早于mMessages表头消息的触发时间
其他情况,新message会被插入到链表中部合适位置。
(2)满足条件时,唤醒队列
if (needWake) {
nativeWake(mPtr);
}
什么时候需要唤醒队列:
- 新消息插入链表头部时,需要立即唤醒队列
- 新消息插入链表中部时,一般不需要立即唤醒;但是当链表表头是一个消息屏障,且先消息是一个异步消息时,才需要唤醒队列。
2.1.2、next() 从队列中消费Message,进行分发处理
Message next() {
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
//自旋
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
//(1)nativePollOnce 尝试阻塞
//如果nextPollTimeoutMillis=-1,一直阻塞不会超时。
//如果nextPollTimeoutMillis=0,不会阻塞,立即返回。
//如果nextPollTimeoutMillis>0,最长阻塞nextPollTimeoutMillis毫秒(超时),如果期间有程序唤醒会立即返回。
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
//(2)如果链表表头是一个同步屏障消息,则跳过众多同步消息,找到链表中第一个异步消息,进行分发处理
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
//(3) 返回取到的Message
if (msg != null) {
//msg尚未到达触发时间,则计算新的阻塞超时时间nextPollTimeoutMillis,下次循环触发队列阻塞
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
//从链表中移除该消息后,直接返回
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
msg.markInUse();
return msg;
}
} else { //没有找到异步消息,设置nextPollTimeoutMillis=-1,队列阻塞
// No more messages.
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
//(4)MessageQueue执行了quit(),此处释放MessageQueue
if (mQuitting) {
dispose();
return null;
}
//(5) 对已注册的HandlerIdle回调的处理
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
next()方法大概可以描述为:从Message链表中取出一条Message返回;当链表中无消息,或者链表中第一个Message尚未到达触发时间时,则阻塞next()方法。
代码比较长,我们分几部分来讲:
(1) nativePollOnce()
通过Native层的epoll来阻塞住当前线程
nativePollOnce(ptr, nextPollTimeoutMillis);
private native void nativePollOnce(long ptr, int timeoutMillis)
nativePollOnce根据传入的参数nextPollTimeoutMillis 会有不同的阻塞行为
- 如果nextPollTimeoutMillis=-1,一直阻塞不会超时。
- 如果nextPollTimeoutMillis=0,不会阻塞,立即返回。
- 如果nextPollTimeoutMillis>0,最长阻塞nextPollTimeoutMillis毫秒(超时),如果期间有程序唤醒会立即返回。
(2)处理同步屏障消息
如果链表表头是一个同步屏障消息,则跳过众多同步消息,找到链表中第一个异步消息,进行分发处理
Message 分为同步消息和异步消息。
class Message{
public boolean isAsynchronous() {
return (flags & FLAG_ASYNCHRONOUS) != 0;
}
public void setAsynchronous(boolean async) {
if (async) {
flags |= FLAG_ASYNCHRONOUS;
} else {
flags &= ~FLAG_ASYNCHRONOUS;
}
}
}
通常我们使用Handler发消息都是同步消息,发出去之后就会在消息队列里面排队处理。我们都知道,Android系统16ms会刷新一次屏幕,如果主线程的消息过多,在16ms之内没有执行完,必然会造成卡顿或者掉帧。那怎么才能不排队,没有延时的处理呢?这个时候就需要异步消息。在处理异步消息的时候,我们就需要同步屏障,让异步消息不用排队等候处理。
可以理解为同步屏障是一堵墙,把同步消息队列拦住,先处理异步消息,等异步消息处理完了,这堵墙就会取消,然后继续处理同步消息。
MessageQueue里面有postSyncBarrier()可以发送同步屏障消息
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}
值得注意的是,同步屏障消息没有target,普通的消息的必须有target的。
再回到MessageQueue.next() 方法
//(2)如果链表表头是一个同步屏障消息,则跳过众多同步消息,找到链表中第一个异步消息,进行分发处理
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
我们看到,如果链表表头是一个同步屏障消息,就会遍历链表,返回第一个待处理异步消息。这样就跳过了前面众多的同步消息。
(3) 取到的Message返回还是阻塞?
//(3) 返回取到的Message
if (msg != null) {
//msg尚未到达触发时间,则计算新的阻塞超时时间nextPollTimeoutMillis,下次循环触发队列阻塞
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
//从链表中移除该消息后,直接返回
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
msg.markInUse();
return msg;
}
} else { //没有找到异步消息,设置nextPollTimeoutMillis=-1,队列阻塞
// No more messages.
nextPollTimeoutMillis = -1;
}
回到next()中的代码块,取到待处理的message后做了如下处理
- 如果msg == null,说明链表中没有消息,则nextPollTimeoutMillis = -1,下次循环 会无限阻塞。
- msg !=null,并且msg.when 符合触发条件,则直接返回
- msg !=null,但是msg.when 尚未到达预期的触发时间点,则重新计算nextPollTimeoutMillis,下次循环进行固定时长的阻塞。
(4)mQuitting 的处理
如果调用了MessageQueue.quit() ,mQuitting = true,队列中所有的消息都会处理后,会调用dispose 释放MessageQueue
if (mQuitting) {
dispose();
return null;
}
(5)对已注册的HandlerIdle回调的处理
MessageQueue可以注册HandlerIdle监听,此处对注册的HandlerIdle做回调处理。
//(5) 对已注册的HandlerIdle回调的处理
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
三、参考文章
https://zhuanlan.zhihu.com/p/265859150