技术干货Android技术知识Android知识

ALooper简析

2017-08-09  本文已影响0人  TankWitch

接着ALooper、AMessage、AHandler的简述,现在来分析下ALooper的声明以及定义。

声明

这篇重在细节,废话不多说,各位看官搬好小板凳,且请看代码(已加注释)。

#include <media/stagefright/foundation/ABase.h>
#include <media/stagefright/foundation/AString.h>
#include <utils/Errors.h>
#include <utils/KeyedVector.h>
#include <utils/List.h>
#include <utils/RefBase.h>
#include <utils/threads.h>

namespace android {
//前置声明,在ALooper中只用到了这三个结构体的指针或者引用。至于为什么要使用前置声明,我在C/C++专栏会写
struct AHandler;
struct AMessage;
struct AReplyToken;

struct ALooper : public RefBase {
    typedef int32_t event_id;
    typedef int32_t handler_id;
    //构造函数
    ALooper();

    // Takes effect in a subsequent call to start().
    //给这个Looper起名字
    void setName(const char *name);
    //将一个Handler注册其中
    handler_id registerHandler(const sp<AHandler> &handler);
    //注销对应ID的AHandler
    void unregisterHandler(handler_id handlerID);
    //启动该Looper的独立线程
    status_t start(
            bool runOnCallingThread = false,
            bool canCallJava = false,
            int32_t priority = PRIORITY_DEFAULT
            );
    //停止该Looper的独立线程
    status_t stop();
    //获得当前的系统时间
    static int64_t GetNowUs();
    //获取当前Looper的名字
    const char *getName() const {
        return mName.c_str();
    }

protected:
    virtual ~ALooper();

private:
    friend struct AMessage;       // post()
    //包装Message
    struct Event {
        int64_t mWhenUs;
        sp<AMessage> mMessage;
    };

    Mutex mLock;
    Condition mQueueChangedCondition;

    AString mName;

    List<Event> mEventQueue;
    //继承于Thread
    struct LooperThread;
    sp<LooperThread> mThread;
    bool mRunningLocally;

    // use a separate lock for reply handling, as it is always on another thread
    // use a central lock, however, to avoid creating a mutex for each reply
    Mutex mRepliesLock;
    Condition mRepliesCondition;

    // START --- methods used only by AMessage

    // posts a message on this looper with the given timeout
    void post(const sp<AMessage> &msg, int64_t delayUs);

    // creates a reply token to be used with this looper
    sp<AReplyToken> createReplyToken();
    // waits for a response for the reply token.  If status is OK, the response
    // is stored into the supplied variable.  Otherwise, it is unchanged.
    status_t awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response);
    // posts a reply for a reply token.  If the reply could be successfully posted,
    // it returns OK. Otherwise, it returns an error value.
    status_t postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &msg);

    // END --- methods used only by AMessage
    //如果有队列中有消息,且未超过等待时间,则将队列头的消息交付给对应的handler处理
    bool loop();

    DISALLOW_EVIL_CONSTRUCTORS(ALooper);
};

下面,将就代码中和结构相关的部分简要分析一下。


struct Event {
    int64_t mWhenUs;
    sp<AMessage> mMessage;
    };

List<Event> mEventQueue;

可以看到,Event结构体对Message以及该Message所对应的入队时间进行的组装。而后,又将这些进入该Looper的消息进行的一个排队(用一个List包起来)。如果大家还记得之前《ALooper、AMessage、AHandler的简述》一篇的图1的话,就能很好的理解,为什么称ALooper为消息队列了。

定义


下面,将就代码中和重要功能相关的部分简要分析一下。


1. void ALooper::post(const sp<AMessage> &msg, int64_t delayUs)

168    void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {
169    Mutex::Autolock autoLock(mLock);
170
171    int64_t whenUs;
172    if (delayUs > 0) {
173        whenUs = GetNowUs() + delayUs;
174    } else {
175        whenUs = GetNowUs();
176    }
177
178    List<Event>::iterator it = mEventQueue.begin();
179    while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {
180        ++it;
181    }
182
183    Event event;
184    event.mWhenUs = whenUs;
185    event.mMessage = msg;
186
187    if (it == mEventQueue.begin()) {
188        mQueueChangedCondition.signal();
189    }
190
191    mEventQueue.insert(it, event);
192}

这个函数很简单。

  1. 根据设置的delayUS(延时交付时间)和NowUS(系统时间)来计算真正这条Message需要交付的时间。
  2. 之后再根据这个时间计算当前Message在消息队列中的位置。
  3. 如果该消息是队列头消息,就通知一下(等待中的线程中只有一个会被唤醒执行)。
  4. 如果不是就加入到队列的相应位置中。

2. status_t postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &msg);

252    status_t ALooper::postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &reply) {
253    Mutex::Autolock autoLock(mRepliesLock);
254    status_t err = replyToken->setReply(reply);
255    if (err == OK) {
256        mRepliesCondition.broadcast();
257    }
258    return err;
259}

先看看replyToken是个什么鬼(他住在AMessage的家里)

33    struct AReplyToken : public RefBase {
34    AReplyToken(const sp<ALooper> &looper)
35        : mLooper(looper),
36          mReplied(false) {
37    }

再看看setReply做了什么事情。

40   status_t AReplyToken::setReply(const sp<AMessage> &reply) {
41    if (mReplied) {
42        ALOGE("trying to post a duplicate reply");
43        return -EBUSY;
44    }
45    CHECK(mReply == NULL);
46    mReply = reply;
47    mReplied = true;
48    return OK;
49}

现在就很清楚了

  1. 创建一个和当前Looper相关联的AReplyToken ,并且初始化reply状态为false,就是还没有回复。
  2. 之后将reply的消息和reply的状态(变为true,意为这个时候已经回复了)给这个ReplyToken
  3. 如果成功返回OK,就广播一下(等待的线程都会被唤醒)

3. status_t ALooper::start(bool runOnCallingThread, bool canCallJava, int32_t priority)

96    status_t ALooper::start(
97        bool runOnCallingThread, bool canCallJava, int32_t priority) {
98    if (runOnCallingThread) {
99        {
100            Mutex::Autolock autoLock(mLock);
101
102            if (mThread != NULL || mRunningLocally) {
103                return INVALID_OPERATION;
104            }
105
106            mRunningLocally = true;
107        }
108
109        do {
110        } while (loop());
111
112        return OK;
113    }
114
115    Mutex::Autolock autoLock(mLock);
116
117    if (mThread != NULL || mRunningLocally) {
118        return INVALID_OPERATION;
119    }
120
121    mThread = new LooperThread(this, canCallJava);
122
123    status_t err = mThread->run(
124            mName.empty() ? "ALooper" : mName.c_str(), priority);
125    if (err != OK) {
126        mThread.clear();
127    }
128
129    return err;
130}

首先,我们可以知道这是该Looper线程启动的函数. 当设置了runOnCallingThread为true的时候, 对应逻辑里面有一个重要的代码段:

    do {
    } while (loop());

那这个loop() 是干嘛用的? 我们来看一看.

194    bool ALooper::loop() {
195    Event event;
196
197    {
198        Mutex::Autolock autoLock(mLock);
199        if (mThread == NULL && !mRunningLocally) {
200            return false;
201        }//如果没有分到线程且没有在本地运行,返回错误
202        if (mEventQueue.empty()) {
203            mQueueChangedCondition.wait(mLock);
204            return true;
205        }
206        int64_t whenUs = (*mEventQueue.begin()).mWhenUs;
207        int64_t nowUs = GetNowUs();
208
209        if (whenUs > nowUs) {
210            int64_t delayUs = whenUs - nowUs;
211            mQueueChangedCondition.waitRelative(mLock, delayUs * 1000ll);
212
213            return true;
214        }
215
216        event = *mEventQueue.begin();
217        mEventQueue.erase(mEventQueue.begin());
218    }
219
220    event.mMessage->deliver();
221
222    // NOTE: It's important to note that at this point our "ALooper" object
223    // may no longer exist (its final reference may have gone away while
224    // delivering the message). We have made sure, however, that loop()
225    // won't be called again.
226
227    return true;
228}

重要的有以下几点

    if (mEventQueue.empty()) {
        mQueueChangedCondition.wait(mLock);
        return true;
     }

不知道大家是否还记得,之前讲异步消息机制简述那一篇中, 有提到过说, ALooper会检查消息队列中消息是否为空, 如果为空, 则等待事件, 降低CPU资源的消耗. 那么这段代码就是上面所说的具体实现了.

之后, 会判断whenUS和nowUS, 根据这两个值的比较来判断是否还需要等待一段时间处理Event.

紧接着, 用上面定义了的Event event, 拿到消息队列的列头, 并清除原队列的列头.

最后, 交付这个消息. (这个交付的实现, 会在AMessage中讲解)

好回到start()代码的接下来的流程中. 接下来就很简单了:

  1. 创建一个LooperThread的实例;
  2. 把这个实例线程跑起来.

接下来,讲两个和AMessage::postAndAwaitResponse方法紧密相连的两个方法. 一个是ALooper::createReplyToken(), 而另外一个是ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response)

首先, 来了解下

4. ALooper::createReplyToken()

这个玩意儿的代码如下:

230    // to be called by AMessage::postAndAwaitResponse only
231    sp<AReplyToken> ALooper::createReplyToken() {
232    return new AReplyToken(this);
233    }

没错, 没干什么事情, 只是用当前的Looper, 实例化了一个AReplyToken(在第2点钟有提到这个东东)并返回.

接着,来看这个

5. ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response)

它长这个样子:

236    status_t ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response) {
237    // return status in case we want to handle an interrupted wait
238    Mutex::Autolock autoLock(mRepliesLock);
239    CHECK(replyToken != NULL);
240    while (!replyToken->retrieveReply(response)) {
241        {
242            Mutex::Autolock autoLock(mLock);
243            if (mThread == NULL) {
244                return -ENOENT;
245            }
246        }
247        mRepliesCondition.wait(mRepliesLock);
248    }
249    return OK;
250}

我们遇到了陌生的retrieveReply, 他长这个样子:

49    // if reply is not set, returns false; otherwise, it retrieves the reply and returns true
50    bool retrieveReply(sp<AMessage> *reply) {
51        if (mReplied) {
52            *reply = mReply;
53            mReply.clear();
54        }
55        return mReplied;
56    }

原生注释已经说得很清楚了. 如果replay没有设置,就返回false; 否则, 就获得这个replay的Message并返回true.

回到awaitResponse中. 当retrieveReply返回false, 也就是说没有拿到需要replay的Message的时候, 等待着, 一直等待到拿到这个消息为止(各位看官, 这里就是为了实现postAndAwaitResponse的同步的目的了哦)


总结

这篇讲述了一个叫ALooper的履带。它将AMessage根据交付时间(whenUs)进行排队(List<mEvent>)。然后简单介绍了它是怎么将消息(AMessage交付的(有同步的, 也有异步的))。关于交付这一块是不完整的,大部分的逻辑在AMessage中,今天先介绍到这里,有空再写最复杂的AMessage和相对简单的AHandler。

上一篇下一篇

猜你喜欢

热点阅读