Android消息处理机制工作原理

2019-06-03  本文已影响0人  未见哥哥

Looper 的工作机制

Looper 的创建与使用

Looper 的使用流程

在子线程中使用 Looper

//在子线程中使用 Looper
new Thread() {
    @Override
    public void run() {
        super.run();
        //在子线程中创建 Looper 对象
        Looper.prepare();
        final Handler handler = new Handler(Looper.myLooper());
        //发送消息到消息队列中
        handler.post(new Runnable() {
            @Override
            public void run() {
                System.out.println("post message in " + Thread.currentThread().getName() + " thread");
                handler.postDelayed(this, 1000);
              
            }
        });
        //开始轮训
        Looper.loop();
    }
}.start();

Looper 是如何创建的,它与 ThreadLocal,Thread,MessageQueue 的是什么关系?

对于 Looper 来说,每一个线程会保存一份实例,它是通过 ThreadLocal 来保证的,并且在 prepare 方法内部会在 Looper 创建前进行判断当前线程本地变量 ThreadLocal 是否存有 Looper,如果没有才会去创建一个 Looper,这就保证了一个线程只会有一个 Looper ,而对应的 mThread,mQueue 是在 Looper 的构造函数中实例化的,因此它也保证了一个 Looper 对应于一个线程 mThread和一个消息队列 mQueue。

//SDK28 Looper
public static void prepare() {
    prepare(true);
}
private static void prepare(boolean quitAllowed) {
  
    //从线程本地变量取,如果取到说明当前线程已经创建过了Looper了,不要重复创建,直接报错
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    //将创建的 Looper 保存到 ThreadLocal 中
    sThreadLocal.set(new Looper(quitAllowed));
}



private Looper(boolean quitAllowed) {
    mQueue = new MessageQueue(quitAllowed);
    mThread = Thread.currentThread();
}

Handler 通讯原理

在 Handler 中不管是通过 sendXxx还是 postXxx 发送 Message 最终都会走到enqueueMessage 方法,也就是将 Message 放入到 MessageQueue 队列中。

多个线程是通过共享内存的 Message 的方式来实现线程间数据通讯。

//SDK 28 Handler#enqueueMessage
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    //指定 msg 的 target 为当前 Handler 对象,之后在消息分发时会用到
    msg.target = this;
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}

线程间通讯是通过共享 Message 的方式,而 Message 则是通过 MessageQueue 来管理的 ,因此在 Message 入队出队就要考虑在多线程操作的情况可能出现线程安全问题,因此在消息入队 enqueueMessage() 和消息出队 next() 内部使用了 synchronized 锁确保线程安全。这里可以将其看成是一种生产者消费者模式

//MessageQueue#enqueueMessage
//===============================消息入队=====================================
boolean enqueueMessage(Message msg, long when) {
    ...
    //加锁
    synchronized (this) {
        //...
        // msg 添加到队列中
    }
    return true;
}


//===============================消息出队=====================================

Message next() {
        //...
    for (;;) {
        
        //加锁
        synchronized (this) {
            //...
            return msg;
        }
      //...
    }
}

MessageQueue的工作原理

Message出队和入队的实现

MessageQueue 是用来管理 Message 的,这里就设计到 Message 入队和出队两个操作

入队:根据时间排序,时间优先级存储,那么就可以做到消息延迟执行的。

  1. 插入到队头
  2. 插入到队列中间
//MessageQueue#enqueueMessage
Message p = mMessages;//队头
//当前消息 when 是比对头还早,那么就插入到队头
if (p == null || when == 0 || when < p.when) {
    // New head, wake up the event queue if blocked.
    msg.next = p;
    //重新更新队头的指针指向为当前入队的消息
    mMessages = msg;
    needWake = mBlocked;
} else {
    // Inserted within the middle of the queue.  Usually we don't have to wake
    // up the event queue unless there is a barrier at the head of the queue
    // and the message is the earliest asynchronous message in the queue.
    needWake = mBlocked && p.target == null && msg.isAsynchronous();
    Message prev;
    //从队头开始找,找到一个 when < p.when,也就是找到第一个 Message 比当前插入的消息还要晚执行的 
    for (;;) {//这里查找,使用链表的效率会比较低
        prev = p;
        p = p.next;
        if (p == null || when < p.when) {
            break;
        }
        if (needWake && p.isAsynchronous()) {
            needWake = false;
        }
    }
    //将该 msg 插入到队列中,这是一个优先级队列,存储的时候就会排序,它不是链表
    msg.next = p; // invariant: p == prev.next
    prev.next = msg;
}

消息出队的过程,因为消息在入队时已经是根据 Message.when 排好序了,因此会从队头 mMessages 开始取消息。

队列中有消息:

​ 1.如果当前队头 mMessages 指向的消息还没有执行的时间,那么就开始等待

​ 2.如果当前队头 mMessages 指向的消息到了执行时间,那么就取出 mMessages 指向的 msg,并且将其从队列中移除,并标记 markInUse()。

队列没有消息:

​ nextPollTimeoutMillis = -1;这里会进行阻塞,可以参考生产者消费者模式,当消费端没有数据可以消费时,那么就开始等待生成者生产数据。

出队:由Looper.loop(),启动轮询器,对queue进行轮询。当消息达到执行时间就取出来。当 message queue为空的时候,队列阻塞,等消息队列调用enqueueMessage的时候,通知队列,可以取出消息,停止阻塞。

//MessageQueue#next()
synchronized (this) {
    // Try to retrieve the next message.  Return if found.
    final long now = SystemClock.uptimeMillis();
    Message prevMsg = null;
    Message msg = mMessages;//队头
    if (msg != null && msg.target == null) {
        // Stalled by a barrier.  Find the next asynchronous message in the queue.
        do {
            prevMsg = msg;
            msg = msg.next;
        } while (msg != null && !msg.isAsynchronous());
    }
    if (msg != null) {//队列中有消息
        if (now < msg.when) {//如果队头的消息还没有到执行时间,那么就等待...
            // Next message is not ready.  Set a timeout to wake up when it is ready
            nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE
        } else {
            // 获取一个消息
            // Got a message.
            mBlocked = false;
            if (prevMsg != null) {
                prevMsg.next = msg.next;
            } else {
                mMessages = msg.next;//更新队头
            }
            //将该消息从队列中移除
            msg.next = null;
            if (DEBUG) Log.v(TAG, "Returning message: " + msg);
            //标记该消息已经使用过了
            msg.markInUse();
            return msg;
        }
    } else {//队列没有消息
        // No more messages.
        nextPollTimeoutMillis = -1;
    }

MessageQueue的阻塞机制

要了解 MessageQueue 等待唤醒机制之前要看看这个 nextPollTimeoutMillis 的作用,它表示阻塞等待的时间,它分别有以下3种赋值情况:

消息出队的逻辑 next()

Message next() {
  ...
  int pendingIdleHandlerCount = -1; // -1 only during first iteration
    int nextPollTimeoutMillis = 0; 
  for (;;) {
    //最终调用 Linux 层实现等待
    /*
    * 1.nextPollTimeoutMillis==0 表示不需要等待
    * 2.nextPollTimeoutMillis==-1,表示无限期等待,一般是队列为空
    * 3.nextPollTimeoutMillis==某一个等待值(不为0、-1)表示当前消息还没有到时间执行,需要等待
    */
    nativePollOnce(ptr, nextPollTimeoutMillis);
    
    if(消息还没到时间执行){
        //等待时间
        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
    }else{//队列没有消息,队列为空
        nextPollTimeoutMillis = -1;
    }
    ...
    if (pendingIdleHandlerCount <= 0) {
            // No idle handlers to run.  Loop and wait some more.
        //标记为阻塞状态,这个值会在 enqueueMessage 中使用
            mBlocked = true;
            continue;
        }
    ...
  }
}

消息入队的逻辑 enqueueMessage

boolean enqueueMessage(Message msg, long when) {
  
    if(队列为空||当前插入消息执行时间早于队头的时间){
        //mBlocked 就是在 next 中设置
        //在队列为空,消息还没有达到执行的时间点这个值 mBlocked 就为 true
        needWake = mBlocked;
    } else{
        //插入到队列中间,这时表示队列不为空,如果当前处于阻塞等待状态,系统会自动唤醒,因为此时肯定是队头的消息还没有到执行的时间点。nextPollTimeoutMillis==某一个等待值(不为0、-1)表示当前消息还没有到时间执行,需要等待
        ...
    }
        if (needWake) {//表示需要唤醒
            nativeWake(mPtr);
        } 
}

Handler 设计亮点

消息如何分发呢?

//1.取出消息
Message msg = queue.next(); // might block
//2.分发消息
msg.target.dispatchMessage(msg);
//3.回收消息
msg.recycleUnchecked();

消息分发

public void dispatchMessage(Message msg) {
    //1.消息需要自己处理,通过msg.callback进行回调
    if (msg.callback != null) {
        handleCallback(msg);
    } else {
        //2.Handler 设置了 Callback
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        //回调 Handler#handleMessage 处理消息
        handleMessage(msg);
    }
}

private static void handleCallback(Message message) {
    message.callback.run();
}
  1. 第一种一般是 Handler.postXxx 方法这种
public final boolean post(Runnable r)
{
   return  sendMessageDelayed(getPostMessage(r), 0);
}

private static Message getPostMessage(Runnable r) {
    Message m = Message.obtain();
    m.callback = r;//给 message 设置一个 callback
    return m;
}
  1. 第二种是在创建 Handler 时指定的 Callback 接口
public Handler(Callback callback, boolean async) {
    if (FIND_POTENTIAL_LEAKS) {
        final Class<? extends Handler> klass = getClass();
        if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                (klass.getModifiers() & Modifier.STATIC) == 0) {
            Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                klass.getCanonicalName());
        }
    }
    mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread " + Thread.currentThread()
                    + " that has not called Looper.prepare()");
    }
    mQueue = mLooper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
}
  1. 自定义类继承Handler 实现 handleMessage 方法。

Message 享元模式的应用

Message 是实现线程间通讯的共享资源,而 Message 是存储在 messagequeue 中的,当消息通过 Handler 进行分发之后,那么这个消息就会被回收 msg.recycleUnchecked,具体看如下代码。之前也写过一篇 Message 复用的博客 Android消息处理机制之Message是如何重复利用的?

//SDK 28 Looper.java
//1.取出消息
Message msg = queue.next(); // might block
//2.分发消息
msg.target.dispatchMessage(msg);
//3.回收消息
msg.recycleUnchecked();

将消息的属性都恢复为默认值状态,并且将其插入到回收 sPool 队列中,那么下次再通过 Message.obtain() 获取 Message 时,可以从缓存的 sPool 队列中获取,节省了内存的开销。

这里的 recycleUnChecked() 是在 Looper 所在的线程中执行,而 obtain()有可能是在其他线程中执行,那么这里需要使用 synchronized 保证线程安全 。

通过 recycleUnChecked() 回收,obtain() 获取,这样就减少了频繁创建 Message 的开销,这里推荐使用 Message.obtain() 来获取一个缓存中的 Message 对象,而不是直接去 new Message() 这里也是一个内存优化的方向之一。

//SDK 28 Message.java
void recycleUnchecked() {
    // Mark the message as in use while it remains in the recycled object pool.
    // Clear out all other details.
    flags = FLAG_IN_USE;
    what = 0;
    arg1 = 0;
    arg2 = 0;
    obj = null;
    replyTo = null;
    sendingUid = -1;
    when = 0;
    target = null;
    callback = null;
    data = null;
    synchronized (sPoolSync) {//保证取和存是线程安全的
        //这里做了一个队列大小的限制
        if (sPoolSize < MAX_POOL_SIZE) {
            //将 msg 插入到队列中
            next = sPool;
            //sPool 指向当前 msg
            sPool = this;
            sPoolSize++;
        }
    }
}

从 sPool 中取出一个 Message

public static Message obtain() {
    synchronized (sPoolSync) {//保证取和存是线程安全的
        if (sPool != null) {
            Message m = sPool;
            sPool = m.next;//更新 sPool 的引用指向下一个 Message 节点
            m.next = null;
            m.flags = 0; // clear in-use flag
            sPoolSize--;
            return m;
        }
    }
    return new Message();
}

Looper什么时候退出?

子线程的 Looper 没有退出导致内存泄露,因为 Looper 一直在运行着,那么对应的 Message 处理时可能其它对象的引用,那么就可能出现内存泄露。

首先要确定,主线程的 Looper 在整个程序运行期间是不会被退出的,如果主线程 Looper 退出,那么表示程序已经退出了,所以这里只需要考虑子线程的 Looper 是如何退出的?

//SDK 28 Looper.java
Looper.myLooper().quit();
//Looper.myLooper().quitSafely();//注意做版本兼容

通知 MessageQueue 退出,MessageQueue 底层实现一套 Linux 等待通知机制,它需要通知 Linux 要退出消息轮训了。

//SDK 28 Looper.java
public void quit() {
    mQueue.quit(false);
}
//SDK 28 MessageQueue.java
void quit(boolean safe) {
    if (!mQuitAllowed) {
        throw new IllegalStateException("Main thread not allowed to quit.");
    }
    synchronized (this) {
        //避免重复退出
        if (mQuitting) {
            return;
        }
        //标记当前正在退出
        mQuitting = true;
        //消息的回收
        if (safe) {
            removeAllFutureMessagesLocked();
        } else {
            removeAllMessagesLocked();
        }
        // 唤起对应的线程
        // We can assume mPtr != 0 because mQuitting was previously false.
        nativeWake(mPtr);
    }
}

内部的入队和出队是怎么停止的?

Message next() {
  
    final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }
    ...
    for(;;){
        ...
                // Process the quit message now that all pending messages have been handled.
                if (mQuitting) {
                    dispose();
                    return null;
                }
      ...
    }
      ...
}


private void dispose() {
    if (mPtr != 0) {
        nativeDestroy(mPtr);
        mPtr = 0;//标识为0
    }
}
public static void loop() {
  
        for (;;) {
            //quit() 会返回 null,就退出 for 循环
                Message msg = queue.next(); // might block
                if (msg == null) {
                    // No message indicates that the message queue is quitting.
                    return;
                }
      }
}       
boolean enqueueMessage(Message msg, long when) {
    ....
    if (mQuitting) {//quit
            IllegalStateException e = new IllegalStateException(
            msg.target + " sending message to a Handler on a dead thread");
            Log.w(TAG, e.getMessage(), e);
            msg.recycle();
            return false;
        }
      ...
}

Looper.loop() 为什么不会导致主线程不会阻塞出现 ANR?

Looper 的 loop() 函数设计如下,它是不断地在 for 循环去获取消息,分发消息,回收消息,而这个过程就是在主线程中进行的。

public static void loop() {
        for(;;){
            Message msg = queue.next(); // might block
            msg.target.dispatchMessage(msg);
            msg.recycleUnchecked();
        }
}

而主线程的 Looper 是在 ActivityThread 中的 main 函数创建,具体代码如下:

public static void main(String[] args) {
  
        Looper.prepareMainLooper();//创建主线程 Looper
  
        //...
  
        Looper.loop();
        //从源码可以看出,主线程 Looper 是不能退出,一旦退出,那么就出现
            throw new RuntimeException("Main thread loop unexpectedly exited");
}  

主线程的所有操作都会在这里执行,从这里也可以看出,正常情况下,主线程是不会被退出的。

如果手贱调用了 Looper.getMainLooper().quit(); 那么程序就会出现以下异常。

06-03 15:39:46.274 5061-5079/com.example.async E/AndroidRuntime: FATAL EXCEPTION: Thread-297
    Process: com.example.async, PID: 5061
    java.lang.IllegalStateException: Main thread not allowed to quit.
        at android.os.MessageQueue.quit(MessageQueue.java:234)
        at android.os.Looper.quit(Looper.java:216)
        at com.example.async.MainActivity$1$2.run(MainActivity.java:44)
        at android.os.Handler.handleCallback(Handler.java:739)
        at android.os.Handler.dispatchMessage(Handler.java:95)
        at android.os.Looper.loop(Looper.java:135)
        at com.example.async.MainActivity$1.run(MainActivity.java:50)

追踪这个异常是在哪里抛出的?

//SDk 28 MessageQueue.quit()
void quit(boolean safe) {
    if (!mQuitAllowed) {//这个值是在构造函数中指定,表明是否可以退出
        throw new IllegalStateException("Main thread not allowed to quit.");
    }
}
//SDK 28 MessageQueue构造 
MessageQueue(boolean quitAllowed) {
    mQuitAllowed = quitAllowed;
    mPtr = nativeInit();
}
//SDk Looper.prepareMainLooper
public static void prepareMainLooper() {
    //这里传入 false 指定当前对应的 MessageQueue 是不可以退出的。
    prepare(false);
    synchronized (Looper.class) {
        if (sMainLooper != null) {
            throw new IllegalStateException("The main Looper has already been prepared.");
        }
        sMainLooper = myLooper();
    }
}

本文是笔者学习之后的总结,方便日后查看学习,有任何不对的地方请指正。

记录于 2019年6月3号

上一篇下一篇

猜你喜欢

热点阅读