Android线程调度-Handler
日常开发过程中我们会经常使用到以下代码:
final Handler handler = new Handler(Looper.getMainLooper());
new Thread(
new Runnable(){
public void run(){
...做一些操作...
handler.post(new Runnable(){
...修改UI等的操作...
});
}
}
).start();
通过使用Handler,我们就能很轻易的完成线程调度,例如,在子线程做些耗时的操作(加载本地文件,联网下载文件等),然后通过Handler来切换到主线程来修改UI,流程图如下:
线程调度流程图下面我们从源码的角度来分析下这波操作是怎么实现的。
初始化
ActivityThread.java
//了解Android启动流程的知道,一个进程的启动,在java层会先调用ActivityThread的main方法。
public static void main(String[] args) {
...
Looper.prepareMainLooper();
...
Looper.loop();
throw new RuntimeException("Main thread loop unexpectedly exited");
}
接着我们来看下Looper.prepareMainLooper()和Looper.loop()做了什么
Looper.java
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
parepareMainLooper()方法中调用了prepare(),我们看下prepare干了什么
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
//ThreadLocal(线程私有,意思是,这个对象线程不共享,只对当前线程可见)
sThreadLocal.set(new Looper(quitAllowed));
}
接下来会调用Looper的构造方法
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
在Looper的构造方法中,我们会创建消息队列MessageQueue,接着我们看下消息队列的初始化做了什么。
MessageQueue.java
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
//这个nativeInit返回的是native层的NativeMessageQueue的地址
mPtr = nativeInit();
}
MessageQueue的构造方法中,会调用native方法nativeInit,接下来我们看下nativeInit做了什么。
android_os_MessageQueue.cpp
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
//返回nativeMessageQueue的地址
return reinterpret_cast<jlong>(nativeMessageQueue);
}
在nativeInit的方法中会创建native层的消息队列NativeMessageQueue(),我们看下NativeMessageQueue做了什么。
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
NativeMessageQueue的构造函数中会调用native层的Looper的初始化。我们看下native层Looper的初始化
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
//这个fd主要用于唤醒线程,用于管道的数据传输
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
AutoMutex _l(mLock);
rebuildEpollLocked();
}
native层的Looper会调用rebuildEpollLocked函数,接下来我们看看rebuildEpollLocked函数做了什么
void Looper::rebuildEpollLocked() {
...
//这段代码的意思主要是,通过epoll机制监听mWakeEventFd的in事件。这个fd主要用于唤醒线程
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd.get();
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
...
}
}
总结下初始化过程做了什么:
- 初始化java层的Looper对象,并且将其设置为线程私有,初始化java层的消息队列MessageQueue。
- 初始化native层的Looper对象,NativeMesasgeQueue对象。
- 并且在native层监听WakeEventFd的的IN事件。(Epoll机制请自行去了解 )
消息读取
Looper.java
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
...
for (;;) {
//这段代码可能会让线程进入wait状态
Message msg = queue.next();
...
try {
//这里就处理消息了
msg.target.dispatchMessage(msg);
dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
} finally {
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
...
}
接下来我们看下MessageQueue的next方法做了什么
MessageQueue.java
Message next() {
...
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis);
...
}
接下来我们看下nativePollOnce方法做了什么。
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
nativePollOnce函数中,我们会传入在初始化过程中获取到的NativeMessageQueue的地址,通过这个地址找到NativeMessageQueue,然后会调用pollOnce函数,接下来我们看下pollOnce做了什么
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
...
}
Looper.cpp
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
}
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
...
if (result != 0) {
...
return result;
}
result = pollInner(timeoutMillis);
}
}
接下来我们看下pollInner做了什么
int Looper::pollInner(int timeoutMillis) {
...
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
//这句代码可能会让线程处于空闲状态(让出CPU),如果没有事件发生的话,这个线程则会处于空闲状态
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
...
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
...
}
}
void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ awoken", this);
#endif
uint64_t counter;
//这一句是为了把管道中的数据清空
TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
}
总结下消息读取主要做了什么:
- ava端通过死循环来读取Message。
- native端则是通过epoll机制监听是否有消息需要处理的事件
- 这里有个问题,为什么java端有死循环,却不会让主线程进入ANR状态呢?因为在native端,pollInner函数中会调用epoll_wait函数,这个epoll_wait函数可能会让线程处于wait状态,处于wait状态的线程不会占用CPU。
消息发送
Handler.java
public final boolean sendMessage(Message msg)
{
return sendMessageDelayed(msg, 0);
}
public final boolean sendMessageDelayed(Message msg, long delayMillis)
{
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
sendMessageAtTime方法最后会调用enqueueMessage()接下来我们看下这个方法做了什么
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
接下来看下MessageQueue做了什么
MessageQueue.java
boolean enqueueMessage(Message msg, long when) {
...
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
//如果消息队列里面没有消息,就把当前的消息放在队列头,并且这是needWake为true,意思是需要唤醒waiting中的主线程
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
//否则,就把当前消息放在队列中适当的位置,然后needWake为false,因为此时的主线程正处于running状态了。
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
再enqueuMessage过程中,会把Message加入到消息队列中,然后调用nativeWake方法,接下来我们看下nativeWake方法做了什么
android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
nativeWake函数最后会调用native层的Looper的wake函数,接下来我们看下native层的Looper的wake函数做了什么。
Looper.cpp
void Looper::wake() {
...
uint64_t inc = 1;
//这一步主要是往fd中写入数据,只要wakeFd中有数据了,那么epoll_wait对应的线程就会被唤醒,继续执行
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
...
}
}
在wake方法过程中,会通过往mWakeEventFd中写入数据。然后会唤醒线程去处理消息
总结下消息发送做了什么
- 首先,在java层,发送Message的时候,会把这个message放进消息队列中去。
- 其次再通知native层,把对应的线程唤醒,以便处理消息