Handler

2017-05-01  本文已影响142人  冉桓彬

1、有几个问题:

1. Handler与线程的关系;
2. Handler、Looper、MessageQueue的关系;
3. Android中为什么主线程不会因为Looper.loop()里的死循环卡死? 
4. 没看见哪里有相关代码为这个死循环准备了一个新线程去运转? 
5. Activity的生命周期这些方法这些都是在主线程里执行的吧,那这些生命周期方法是怎么实现在死循环体外能够执行起来的?

一、关于Looper :

1.1 ActivityThread.main:
public final class ActivityThread {
    static Handler sMainThreadHandler;
    /**
     * ActivityThread.main是主程序的入口, 在这里进行Looper与Message的初始化操作;
     */
    public static void main(String[] args) {
        Looper.prepareMainLooper();      模块<1.2>
        if (sMainThreadHandler == null) {
            sMainThreadHandler = thread.getHandler();
        }
        Looper.loop();
    }
}

1.2 Looper.prepareMainLooper() :

public final class Looper {
    public static void prepareMainLooper() {
        /**
         * 对Looper进行初始化;
         */
        prepare(false);
        synchronized (Looper.class) {
            /**
             * 1. 如果Looper为null, 则对Looper进行初始化, 如果不为空, 则抛出异常;
             * 2. 换句话说也就是一个线程只允许一个Looper存在;
             */
            if (sMainLooper != null) {
                throw new IllegalStateException("The main Looper has already been prepared.");
            }
            sMainLooper = myLooper();        模块<1.3>
        }
    }

    private static void prepare(boolean quitAllowed) {
        /**
         * 一个线程对应一个Looper, Thread与Looper是一对一关系;
         */
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        /**
         * 如果Looper不存在, 则对Looper进行初始化, 并且利用ThreadLocal将Looper与Thread进行绑定;
         */
        sThreadLocal.set(new Looper(quitAllowed));
    }

    public static void prepare() {
        prepare(true);
    }
    final MessageQueue mQueue;
    /**
     * 一个线程只能有一个Looper, 而在初始化Looper时才会进行MessageQueue的初始化, 
     * 所以线程, Looper, MessageQueue的关系1:1:1;
     */
    private Looper(boolean quitAllowed) {
        mQueue = new MessageQueue(quitAllowed);
        mThread = Thread.currentThread();
    }
}

二、关于MessageQueue:

2.1 MessageQueue构造函数 :
public final class MessageQueue {
    private native static long nativeInit();
    private long mPtr;

    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        /**
         * 触发native层方法, 获取native层的NativeMessageQueue的引用, 使其指向mPtr;
         */
        mPtr = nativeInit();   模块<2.2>
    }
}

http://androidxref.com/7.1.1_r6/xref/frameworks/base/core/jni/android_os_MessageQueue.cpp;

2.2 MessageQueue.nativeInit:

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();    模块<2.3>
    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jlong>(nativeMessageQueue);
}

2.3 NativeMessageQueue构造函数 :

NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);   模块<2.4>
        Looper::setForThread(mLooper);
    }
}

2.4 Looper.looper() :

在读下面代码之前, 先复制一段IO多路复用_epoll的文章, 文章最后复制了一段epoll的概念

Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    AutoMutex _l(mLock);
    rebuildEpollLocked();
}

void Looper::rebuildEpollLocked() {
    // Close old epoll instance if we have one.
    /**
     * 由最下面的多路复用_epoll的介绍可知, 当我们在使用完一个句柄之后需要close它, 这里mEpollFd为该句柄的引用, 
     * 如果地址值 > 0, 说明前面已经创建过mEpollFd;
     */
    if (mEpollFd >= 0) {
        close(mEpollFd);
    }
    /**
     * static const int EPOLL_SIZE_HINT = 8; 创建一个epoll句柄, 并且该句柄监听的size为32个字节;
     */
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);

    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd;
    /**
     * 1. 注册新的mWakeEventFd到mEpollFd中去;
     * 2. 对应的文件描述符可以读;
     */
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
    for (size_t i = 0; i < mRequests.size(); i++) {...}
}

三、Looper与MessageQueue关联:

3.1 Looper.loop() :
public final class Looper {
    public static void loop() {
        final Looper me = myLooper();
        /**
         * 1. 在使用Looper之前一定要通过prepare()对Looper进行初始化;
         * 2. 而且一个线程只能初始化一次Looper;
         */
       if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        final MessageQueue queue = me.mQueue;
        Binder.clearCallingIdentity();
        final long ident = Binder.clearCallingIdentity();
        for (;;) {
            /**
             * 如何获取Message?
             */
            Message msg = queue.next();      模块<3.2>
            /**
             * 如果msg == null, 直接退出该loop?  msg什么情况为null?
             */
            if (msg == null) {
                return;
            }
            /**
             * 1. 结合模块<四>部分可知target指向Handler;\
             * 2. 此时dispatchMessage的调用已经处于主线程中;
             */
            msg.target.dispatchMessage(msg);    模块<4.7>
            final long newIdent = Binder.clearCallingIdentity();
            msg.recycleUnchecked();
        }
    }
    /**
     * 由下文Handler源码分析可知, 当我们以Handler.post形式向MessageQueue中添加消息时, 创建的Message会持有Runnable的引用, 
     * 如果以Handler.sendMessage的形式向MessageQueue中添加消息时, mCallback = null和msg.callback = null, 
     * 并且handleMessage(...)默认是空实现, 所以如果是sendMessage形式, 需要我们自己重写handleMessage方法;
     */
    public void dispatchMessage(Message msg) {
        if (msg.callback != null) {
            handleCallback(msg);
        } else {
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            handleMessage(msg);
        }
    }
    private static void handleCallback(Message message) {
        message.callback.run();
    }
    public void handleMessage(Message msg) {
    }
}

  调用loop()之后, Looper轮训器进入循环状态, 当msg = null时跳出该循环;

3.2 MessageQueue.next :
public final class MessageQueue {
    private native void nativePollOnce(long ptr, int timeoutMillis);
    Message next() {
        int pendingIdleHandlerCount = -1;
        int nextPollTimeoutMillis = 0;
        for (;;) {
            /**
             *  1. nextPollTimeoutMillis默认为0;
             */
            nativePollOnce(ptr, nextPollTimeoutMillis);     模块<3.3>
            synchronized (this) {
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                /**
                 *  Message内部是一个链表结构, 此时先考虑在主线程中创建Handler的情况, (由下文Handler.post()相关源码可知)
                 *  当调用handler.enqueueMessage(...)时, 会创建一个Message, 并且该Message会持有当前Handler的引用, 所以
                 *  直接跳过该if语句;
                 */
                if (msg != null && msg.target == null) {
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                /**  
                 * msg != null的条件是mMessages != null, 而mMessages != null的条件又是我们调用enqueueMessage()
                 * 方法对mMessages进行赋值, 我们先假设我们在调用Handler.enqueueMessage时都是按时间线性的顺序进行消息入队;
                 * 则会进入到else{...}语句中;
                 */
                if (msg != null) {
                    if (now < msg.when) {
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        /**
                         * 由此可知当有消息到来时, mBlocked = false;
                         */
                        mBlocked = false;
                        /**
                         * 下面几行代码的意思就是获取Message链表的第一个元素Message, 然后删除掉该元素(FIFO);
                         */
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        msg.markInUse();
                        return msg;
                    }
                } else {
                    /**
                     * 当msg = null时, nextPollTimeoutMillis = -1, 继续向下;
                     */
                    nextPollTimeoutMillis = -1;
                }
                if (mQuitting) {...}
                /**
                 * 当mMessages = null时, 进入第一个if语句pendingIdleHandlerCount = 0;
                 * 然后进入第二个if语句执行continue语句继续for循环, 并且调用nativePollOnce(), 此时nextPollTimeoutMillis = -1
                 */
                if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
                    mBlocked = true;
                    continue;
                }
            }
            for (int i = 0; i < pendingIdleHandlerCount; i++) {...}
            pendingIdleHandlerCount = 0;
            nextPollTimeoutMillis = 0;
        }
    }
}

上述next()代码很长, 大致逻辑如下:

3.3 MessageQueue.nativePollOnce:
MessageQueue->nativePollOnce:
static void android_os_MessageQueue_nativePollOnce(...jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    /**
     * 结合模块<2.2>可知ptr指向NativeMessageQueue;
     */
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

NativeMessageQueue->pollOnce():
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    ...
    mLooper->pollOnce(timeoutMillis);   模块<3.4>
    ...
}
3.4 Looper.pollOnce:
Looper->pollOnce(...):
/**
 * timeoutMillis为我们在java中传入的nextPollTimeoutMillis变量;
 */
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {...}
        if (result != 0) {...}
        result = pollInner(timeoutMillis);
    }
}

int Looper::pollInner(int timeoutMillis) {
    /**
     * 只观察timeoutMillis引用的地方;
     * 由上文Native层的Looper初始化知道, mNextMessageUptime = LLONG_MAX;
     */
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {...}
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;
    mPolling = true;
    /**
     * 1. 当timeoutMillis = -1时, 由epoll_wait定义可知该方法会一直阻塞;
     * 2. 如果timeoutMillis = 0继续向下执行, 并获取事件个数;
     */
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);   模块<3.5>

    // No longer idling.
    mPolling = false;

    // Acquire lock.
    mLock.lock();

    /**
     * 默认FALSE, 继续向下执行;
     */
    if (mEpollRebuildRequired) {...}
    /**
     * 当事件个数 ≤ 0 时执行Done: 部分的代码, 即异常情况, 此时先不考虑该情况;
     */
    if (eventCount < 0) {
        ...
        goto Done;
    }
    if (eventCount == 0) {
        ...
        goto Done;
    }
    /**
     * 当eventCount > 0 时, 遍历该eventItem, 并判断当前文件描述符是否为
     * mWakeEventFd, 如果是mWakeEventFd则调用awoken()唤醒, 否则则将该文件描述符
     * 添加到mResponses中去;
     */
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } 
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } 
            else {...}
        }
    }
Done: ;
    ...
    return result;
}

void Looper::pushResponse(int events, const Request& request) {
    Response response;
    response.events = events;
    response.request = request;
    mResponses.push(response);
}
3.5 epoll_wait:
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout):

四、关于Handler:

4.1 Handler构造函数:
public class Handler {
    public Handler() {
        this(null, false);
    }
    public Handler(Callback callback, boolean async) {
        /**
         * 1. 结合前文其实可知, Looper的获取使用了ThreadLocal, 也就是说一个线程只能有一个Looper;
         * 2. 但是Handler初始化时并没有做相关限制, 所以Handler与Looper的关系是多:1的关系;
         */
        mLooper = Looper.myLooper();
        if (mLooper == null) {
            throw new RuntimeException(
                "Can't create handler inside thread that has not called Looper.prepare()");
        }
        mQueue = mLooper.mQueue;
    }
}
4.2 Handler.post :
public class Handler {
    public final boolean post(Runnable r) {
        /**
         * 将Message发送至MessageQueue;
         */
        return  sendMessageDelayed(getPostMessage(r), 0);   模块<4.3>
    }
    private static Message getPostMessage(Runnable r) {
        /**
         *  初始化Message, 将r指向Message.callback;
         */
        Message m = Message.obtain();
        m.callback = r;
        return m;
    }
}

public final class Message {
    public static Message obtain() {
        ...
        return new Message();
    }
}
4.3 Handler.sendMessageDelayed :
public class Handler {
    public final boolean sendMessageDelayed(Message msg, long delayMillis) {
        return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
    }
    public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
        MessageQueue queue = mQueue;
        return enqueueMessage(queue, msg, uptimeMillis);
    }
    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
        msg.target = this;
        /**
         * msg进行入队操作;
         */
        return queue.enqueueMessage(msg, uptimeMillis);  模块<4.4>
    }
}
4.4 MessageQueue.enqueueMessage :
public final class MessageQueue {
    boolean enqueueMessage(Message msg, long when) {
        ...
        synchronized (this) {
            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            /**
             * 1. 当第一次调用Handler.post时, 此时p = mMessage = null; 
             * 2. 搜索上文可知loop.loop()时, 主线程会一直阻塞在nativePollOnce(...)处并且mBlocked = true;
             * 3. 然后直接进入if (needWake) {...}内部;
             */
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }
            /**
             * 1. 能进入if语句内部的前提条件是needWake = true, 而needWake唯一被赋值就是needWake = mBlocked;
             * 2. mBlocked只在MessageQueue.next()中被赋值.而结合上文又知道当没有消息被添加到MessageQueue队列中时, 
             *    也就是Message链表为空时,  MessageQueue.next()会被一直阻塞在nativePollOnce(...), 而且mBlocked = true, 
             *    当有消息被添加到消息队列时, 是一定会执行下面if(...)语句;
             */
            if (needWake) {
                /**
                 * 1. 通过模块<3.3>可知, 当没有消息时, 当前线程会在native层通过触发epoll_wait方法并传入
                 * -1将当前线程处于阻塞状态;
                 * 2. 然后在有消息到来时, 通过nativeWake方法欢喜线程;
                 */
                nativeWake(mPtr);     模块<4.5>
            }
        }
        return true;
    }
    private native static void nativeWake(long ptr);
}
4.5 android_os_MessageQueue.nativeWake:
MessageQueue --->
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}

void NativeMessageQueue::wake() {
    mLooper->wake();      模块<4.6>
}
4.6 Looper.wake:
Looper --->
void Looper::wake() {
    uint64_t inc = 1;
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
}

  重点在这里的write(...)部分, 注意前文的epoll_wait(...)传入-1时会阻塞当前线程, 这里调用write(...)通过唤醒当前线程, pollOnce(...)会继续向下执行, 最终走到awoken方法;

Looper.cpp --->
void Looper::awoken() {
    uint64_t counter;
    TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}

  从管道内读入数据, 即native层不会再阻塞在pollOnce(...), 所以java层的MessageQueue.next()也就不会再阻塞在nativePollOnce(ptr, nextPollTimeoutMillis)处, MessageQueue.next()继续向下执行由执行到了return msg处, 然后根据我们Handler.post还是Handler.sendMsg而选择是调用Handler.handlerCallback(...)还是Handler.handleMessage(...);

4.7 Handler.dispatchMessage:
/**
 * 1. post方式时callback == Runnable != null, 触发handlerCallback();
 * 2. sendMessage方式时callback == null, 触发handleMessage;
 */
public void dispatchMessage(Message msg) {
    if (msg.callback != null) {
        handleCallback(msg);
    } else {
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        handleMessage(msg);
    }
}
总结一下:

  1、关于IO多路复用, epoll以及pipe并不是很了解, 导致分析经常卡壳, 到现在也不能说清楚;

多路复用_epoll:

1、概念:
  epoll使用一个文件描述符管理多个描述符, 将用户关系的文件描述符的事件存放到内核的一个事件表中, 这样在用户空间和内核空间的copy只需要一次;
2、epoll接口:

#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

(1) int epoll_create(int size):
  创建一个epoll句柄, size用来告诉内核这个监听的数目一共有多大. 当创建好epoll句柄后, 它就是会占用一个fd值, 在使用完epoll之后, 必须调用close()关闭, 否则就可能导致fd被耗尽;

(2) int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event):

  epoll的事件注册函数; 第一个参数是epoll_create()的返回值, 第二个参数表示动作, 用三个宏来表示:

EPOLL_CTL_ADD: 注册新的fd到epfd中;
EPOLL_CTL_MOD: 修改已经注册的fd的监听事件;
EPOLL_CTL_DEL: 从epfd中删除一个fd;

第三个参数是需要监听的fd, 第四个参数是告诉内核需要监听什么事件;

struct epoll_event {
    __uint32_t events;  /* Epoll events */
    epoll_data_t data;  /* User data variable */
};

events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

(3) int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout):

  等待事件的产生, 参数events用来从内核得到事件的集合, maxevents告诉内核这个events有多大, 参数timeout是超时时间(毫秒, 0会立即返回, -1将会阻塞). 该函数返回需要处理的事件数目, 如返回0表示已超时;

上一篇下一篇

猜你喜欢

热点阅读