Day19-Handler和线程间通信
Handler 的使用
创建对象 -> 获取消息对象 -> 发送消息 -> 处理消息
- 子线程的 run 当主线程执行
Handler handler = new Handler();//主线程创建Handler
...
handler.post(Runnable...);//子线程用 Handler 来处理 Runnable
- 子线程发送消息到主线程
private Handler mHandler = new Handler(){
@Override
public void handlerMessage(Message msg){
switch(msg.what){
case 1:
}
}
}
....
//子线程
Message msg = new Message();
//Message msg = mHandler.obtain(1);
//Message msg = mHandler.obtainMessage();
msg.what = 1;//识别码
msg.arg1 = 123;
msg.arg2 = 321;
msg.obj = "222";//msg可以加入obj
Bundle data = new Bundle();
msg.setData(data);//msg也可以加入bundle
mHandler.sendMessage(msg);
Handler的创建
Handler.java
public Handler(Looper looper, Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
looper
- 构造方法looper绑定了当前线程
looper.java
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
- Lopper的真正创建是通过prepare(), 并且一个线程只能创建一个looper
looper.java
public static void prepare() {
prepare(true);
}
// 目前quitAllowed应该仅在主线程中被设置成false,自定义线程中均为true
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
PS: 主线程的 Looper 是在 ActivityThread的main()里创建的
public static void main(String[] args) {
//···
Looper.prepareMainLooper();
//···
Looper.loop();
//···
}
MessageQueue
- 构造函数
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
private native static long nativeInit();
具体实现在C++
nativeInit
就是在native层创建了一个NativeMessageQueue
及一个Looper
对象,而native层的Looper
对象利用管道机制来监控文件,从而可以利用epoll机制实现MessageQueue
的等待和唤醒,这个在MessageQueue#next()
会进一步分析。至此,MessageQueue
对象就被创建出来了(由于MessageQueue
的构建函数仅有包访问权限,因此正常情况下我们无需关心MessageQueue
的创建)
作者:SparkInLee
链接:http://www.jianshu.com/p/8a344dbd17f0
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
Looper.loop()
loop()主要是一个无限for循环
looper.java
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
//···省略无关代码
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
//···省略无关代码
msg.target.dispatchMessage(msg);
//···省略无关代码
msg.recycleUnchecked();
}
}
循环着从MessageQueue.next里拿到消息
MessageQueue.java
Message next() {
// 在Looper#loop()中我们知道返回空消息会退出loop()中的无限循环
// 当调用MessageQueue#quit(boolean)时会调用nativeDestory()销毁MessageQueue,将ptr置为0
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // 仅在第一次调用时为-1
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
/**
* 这个调用跟上面提到native层中Looper的epoll机制相关,用于等待可处理的消息
* nextPollTimeoutMillis < 0 : 进入无限空闲等待,直到有新消息唤醒
* nextPollTimeoutMillis = 0 : 不等待
* nextPollTimeoutMillis > 0 : 进入空闲等待,直到有新消息唤醒或者nextPollTimeoutMillis超时
**/
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
// 注:MessageQueue管理的消息是一个消息链表,后续Message中会详细分析
if (msg != null && msg.target == null) {
/**
* msg.target为空是一类特殊消息(栅栏消息),用于阻塞所有同步消息,但是对异步消息没有影响,
* 后续会详细分析。在这个前提下,当头部是特殊消息时需要往后找是否有异步消息
*/
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
// 找到消息
if (now < msg.when) {
// 消息的触发时间在当前时间之后,于是计算出需要等待的时间,准备进入有限空闲等待
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 找到可处理的消息,更新消息链表数据,返回可处理消息
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (false) Log.v("MessageQueue", "Returning message: " + msg);
return msg;
}
} else {
// 没找到消息,准备进入无限空闲等待
nextPollTimeoutMillis = -1;
}
// 没有可处理的消息,并且消息队列已经退出,则返回空消息让loop退出
if (mQuitting) {
dispose();
return null;
}
// 当mMessages为空或者mMessages的处理时间在当前时间之后(注意栅栏消息的特殊情况)时,
// 并且pendingIdleHandlerCount没有在此处初始化过,
// 则设置pendingIdleHandlerCount为IdleHandler的数量,IdleHandler后续详细说明。
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// 无空闲处理器,阻塞队列,进入空闲等待
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// 调用空闲处理器逻辑,此处代码仅调用一次
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf("MessageQueue", "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// 设置为0保证空闲处理器代码仅调用一次
pendingIdleHandlerCount = 0;
// 在处理空闲处理器的时候可能已经有可处理的消息,因此无需等待
nextPollTimeoutMillis = 0;
}
}
Message的创建
- mHandler.obtainMessage()
Handler.javapublic final Message obtainMessage(){ return Message.obtain(this); }
- Message.obtain()
Message.java
obtain()方法, 回收池sPool就是一个静态Message, obtain 就是取出一个next是null, what是0 的 Messagepublic static Message obtain(Handler h) { Message m = obtain(); m.target = h; return m; }
public static Message obtain() { synchronized (sPoolSync) { if (sPool != null) { Message m = sPool; sPool = m.next; m.next = null; m.flags = 0; // clear in-use flag sPoolSize--; return m; } } return new Message(); }
- 消息的结构
public int what; // 消息码,带Handler命名空间,因此不同Handler中相同消息码不冲突 public int arg1; // 整数数据 public int arg2; // 整数数据 public Object obj; // 任意对象,当利用Messenger进行跨进程传递时需要继承自Parcelable public Messenger replyTo; // Messenger对象实现跨进程消息传递 public int sendingUid = -1; // 跨进程是标记消息来源的Uid /** * flags可设置消息是否在使用以及是否异步 * FLAG_IN_USE = 1 << 0,该标记只有在创建或obtain时才会清除,此时方可修改消息的相关数据及进行发送 * FLAG_ASYNCHRONOUS = 1 << 1,标记该消息为异步消息,不受栅栏消息的影响 **/ /*package*/ int flags; /*package*/ long when; // 消息执行时间,采用SystemClock#uptimeMillis()时间base /*package*/ Bundle data; // 消息的数据 /*package*/ Handler target; // 消息对应的Handler /*package*/ Runnable callback; // 消息对应的回调,具体参看下文中消息处理一节 /*package*/ Message next; // 形成消息链表,以在MessageQueue以及消息回收池中使用
Message 放入 MessageQueue
Handler 除了 sendMessageAtFrontOfQueue(Message) 外其余均会调用 sendMessageAtTime(Message), 而这两个接口最终调用了 enqueueMessage(MessageQueue, Message, long)
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
Handler 在构造时可以通过 boolean async 给 Handler 设置是否异步
最后调用 enqueueMessage
MessageQueue.java
// 插入成功返回true,否则返回false
boolean enqueueMessage(Message msg, long when) {
// Handler中不允许发送target为空的消息,空消息为特殊消息(栅栏消息)
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
// 不允许发送状态为使用中的消息
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) {
// 不允许发送消息给已退出的消息队列
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w("MessageQueue", e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// 消息触发时间最早,直接插在链表头部,如果当前队列阻塞则唤醒消息队列的等待,见MessageQueue#next
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// 将消息插入到链表中间,如果链表头是栅栏消息并且该消息是触发时间最早的异步消息则需要进行唤醒
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p;
prev.next = msg;
}
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
Message 处理和回收
msg.target.dispatchMessage(msg);
msg.recycleUnchecked();
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
消息先由消息自身的callback处理, 如果自身回调为空, 就用Handler的回调mCallback处理, 否则用Handler 的 handlerMessage()处理
栅栏消息
target = null; 用于延迟指定时间之后所有的同步消息, 异步照常进行. 发送和移除栅栏消息必须成对, 否则容易导致MessageQueue被挂起
发送和移除接口在 Looper.java 中
public int postSyncBarrier() {
return mQueue.enqueueSyncBarrier(SystemClock.uptimeMillis());//返回的int 用于移除栅栏信息
}
public void removeSyncBarrier(int token) {
mQueue.removeSyncBarrier(token);
}
调用了 MessageQueue 的 enqueueSyncBarrier(long)
int enqueueSyncBarrier(long when) {
// 创建一个target为空的特殊消息,并根据when插入MessageQueue中合适的位置
// 无需唤醒因为栅栏消息的目的在于阻塞消息的执行
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) {
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}
返回的 int 用于移除栅栏信息
void removeSyncBarrier(int token) {
// 移除token对应的栅栏消息,并在必要的时候进行唤醒
synchronized (this) {
Message prev = null;
Message p = mMessages;
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (p == null) {
throw new IllegalStateException("The specified message queue synchronization "
+ " barrier token has not been posted or has already been removed.");
}
final boolean needWake;
if (prev != null) {
prev.next = p.next;
needWake = false;
} else {
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
p.recycleUnchecked();
if (needWake && !mQuitting) {
nativeWake(mPtr);
}
}
}