WebRTC源码分析-线程基础之Message && Messa
前言
本文将介绍消息循环中的消息(Message),消息中持有的数据(MessageData),处理消息的Handler(MessageHandler)的基本内容。
其中Message与MessageData相关的结构体位于rtc_base/message_queue.h中,MessageHandler相关的类位于rtc_base/message_handler.h中
消息Message
WebRTC中消息相关的类分为两种,一种是Message,表征的是即时消息,投放到消息循环中期待能被立马消费;另外一种是DelayedMessage,表征的是延迟消息,投放到消息循环中不会立马被消费,而是延迟一段时间才会被消费。
Message 源码如下所示
struct Message {
Message()
: phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {}
inline bool Match(MessageHandler* handler, uint32_t id) const {
return (handler == nullptr || handler == phandler) &&
(id == MQID_ANY || id == message_id);
}
Location posted_from; // 消息自哪儿产生
MessageHandler* phandler; // 消息如何处理
uint32_t message_id; // 消息id
MessageData* pdata; // 消息中携带的数据
int64_t ts_sensitive; // 消息产生的时间
};
Message结构体成员有如下几种:
- Location posted_from: 该成员描述了此消息结构是在哪个函数,哪个文件的哪一行产生的,即消息产生的源头。通常实际使用中使用宏RTC_FROM_HERE来产生Location对象,Location类的简单分析见WebRTC源码分析-定位之Location。
- MessageHandler* phandler:该成员持有消息如何被消费的方法,当消息从消息循环中被取出后,将使用 MessageHandler的OnMessage(Message* msg)来对消息进行处理。特别的PeerConnection对象也是MessageHandler,实现了OnMessage(Message* msg)方法。
- uint32_t message_id:32位无符号整型的消息id。通常有两类特别的消息,使用特殊的消息id来识别,分别是MQID_ANY (-1)表示所有的消息;MQID_DISPOSE(-2)表示需要丢弃的消息,该消息已经在MQM的分析文章中出现,详见WebRTC源码分析-线程基础之MessageQueueManager。当然,还有使用其他id值用以做特殊处理的,比如某个实现了MessageHandler.OnMessage方法的类,可能需要处理好几种不同的消息,那么就可以将不同消息id值当作区分消息类别的标志,从而在OnMessage方法中分门别类处理好几种消息了。
- int64_t ts_sensitive:64位时间戳,单位ms。当不关心消息是否处理过慢时,也即消息时间不敏感时,该值为0;若关心消息是否得到即时处理,一般会设置ts_sensitive为消息创建时的时间戳 + kMaxMsgLatency常量(150ms),当该消息从消息循环中取出被处理时,将会检测当前时间msCurrent与ts_sensitive的大小,若msCurrent>ts_sensitive,则表示该消息并没有得到即时的处理,会打印警告日志。超时时间计算为msCurrent-ts_sensitive+kMaxMsgLatency。
- MessageData* pdata:消息数据。有多个变种,后面详述。
Message结构体还提供了两个方法。
- 构造函数:没啥好说,Message没有提供析构函数,那么Message持有的消息处理器phandler以及消息数据pdata何时被销毁就非常值得注意了。
- 匹配函数:该函数在MQ清理消息时,用于评判哪些消息是满足条件的,即匹配上了。看源码可知,只要handler为空或者相等 并且 消息id为MQID_ANY或者相等,就被认为是找到了满足条件的消息。
DelayedMessage源码如下
// DelayedMessage goes into a priority queue, sorted by trigger time. Messages
// with the same trigger time are processed in num_ (FIFO) order.
class DelayedMessage {
public:
DelayedMessage(int64_t delay,
int64_t trigger,
uint32_t num,
const Message& msg)
: cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {}
bool operator<(const DelayedMessage& dmsg) const {
return (dmsg.msTrigger_ < msTrigger_) ||
((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_));
}
int64_t cmsDelay_; // for debugging
int64_t msTrigger_;
uint32_t num_;
Message msg_;
};
DelayedMessage与Message并非是继承is-a关系,而是has-a的关系。提供了除Message msg_成员之外的另几个成员:
- int64_t cmsDelay_:延迟时间,消息延迟多长时间后需要被处理,单位ms
- int64_t msTrigger_:触发时间,消息需要被处理的时间戳,为消息创建时的时间戳+cmsDelay_,单位ms
- uint32_t num_:消息的序号,在某个MQ中延迟消息的num_单调递增。
由于DelayedMessage在MQ中会放至于专门的优先级队列中,优先级如何确定?DelayedMessage重载了小于"<"运算符以确定排序,先按消息需要被处理的触发时间戳msTrigger_排序;若触发时间相同,则按消息序号排序。当某个延迟消息要投放到延迟队列中时,该优先级队列会根据“<”运算符去比较这个延迟消息与队列中的消息以确定应在队列的哪个位置插入该消息。
MessageList被定义为std::list<Message>, MessageQueue中的即时消息就存储于MessageList类别的列表中,按照先入先出的方式挨个进行处理。
typedef std::list<Message> MessageList;
消息MessgeData
MessgeData 消息数据基类,其析构函数被定义为virtual类型,以防内存泄漏
class MessageData {
public:
MessageData() {}
virtual ~MessageData() {}
};
TypedMessageData 使用模板定义的MessageData 的一个子类,便于扩展。
template <class T>
class TypedMessageData : public MessageData {
public:
explicit TypedMessageData(const T& data) : data_(data) {}
const T& data() const { return data_; }
T& data() { return data_; }
private:
T data_;
};
ScopedMessageData 类似于 TypedMessageData,用于指针类型。在析构函数中,自动对该指针调用 delete。
// Like TypedMessageData, but for pointers that require a delete.
template <class T>
class ScopedMessageData : public MessageData {
public:
explicit ScopedMessageData(std::unique_ptr<T> data)
: data_(std::move(data)) {}
// Deprecated.
// TODO(deadbeef): Remove this once downstream applications stop using it.
explicit ScopedMessageData(T* data) : data_(data) {}
// Deprecated.
// TODO(deadbeef): Returning a reference to a unique ptr? Why. Get rid of
// this once downstream applications stop using it, then rename inner_data to
// just data.
const std::unique_ptr<T>& data() const { return data_; }
std::unique_ptr<T>& data() { return data_; }
const T& inner_data() const { return *data_; }
T& inner_data() { return *data_; }
private:
std::unique_ptr<T> data_;
};
ScopedRefMessageData 类似于ScopedMessageData,用于引用计数的指针类型。
// Like ScopedMessageData, but for reference counted pointers.
template <class T>
class ScopedRefMessageData : public MessageData {
public:
explicit ScopedRefMessageData(T* data) : data_(data) {}
const scoped_refptr<T>& data() const { return data_; }
scoped_refptr<T>& data() { return data_; }
private:
scoped_refptr<T> data_;
};
DisposeData 这个值得注意下,和前面几个MessageData子类不一样,该对象并没有提供其内部数据data_的方法,那么该对象正如其名,持有需要进行销毁的数据。在哪些场景下使用?
- 有些函数不便在当前函数范围内销毁对象,那么就可以将需要销毁的对象封装到DisposeData中,并进一步封装成消息并投递到消息队列中,等待线程的消息循环取出消息并进行销毁时将该对象销毁。见范例 HttpServer::Connection::~Connection;这里类似于QT中QObject的deletelater()方法的作用,起到延迟销毁的作用。
- 某对象属于某一线程,因此销毁操作应该交给所有者线程。这个可以通过调用对应线程对象Thread的MessageQueue的Dispose(T* doomed)来实现。在介绍MessageQueue的文章中会详述。
template <class T>
class DisposeData : public MessageData {
public:
explicit DisposeData(T* data) : data_(data) {}
virtual ~DisposeData() { delete data_; }
private:
T* data_;
};
WrapMessageData && UseMessageData 两个模板方法,分别用来将数据封装成MessageData以及取出对应的数据,封装拆箱操作。
template <class T>
inline MessageData* WrapMessageData(const T& data) {
return new TypedMessageData<T>(data);
}
template <class T>
inline const T& UseMessageData(MessageData* data) {
return static_cast<TypedMessageData<T>*>(data)->data();
}
消息处理器MessageHandler
MessageHandler 消息处理器的基类,子类在继承了该类之后要重载 OnMessage 函数,在其中实现消息响应的逻辑。
class MessageHandler {
public:
virtual ~MessageHandler();
virtual void OnMessage(Message* msg) = 0;
protected:
MessageHandler() {}
private:
RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandler);
};
FunctorMessageHandler MessageHandler的模板子类,用于帮助实现阻塞地跨线程在指定线程上执行某个方法,并可获取执行结果。Thread的Invoke()方法使用该类。该类的构造函数中使用了C++11的右值引用,转发语义std::forward,以及转移语义std::move。详见博客C++11 std::move和std::forward
// Helper class to facilitate executing a functor on a thread.
template <class ReturnT, class FunctorT>
class FunctorMessageHandler : public MessageHandler {
public:
explicit FunctorMessageHandler(FunctorT&& functor)
: functor_(std::forward<FunctorT>(functor)) {}
virtual void OnMessage(Message* msg) { result_ = functor_(); }
const ReturnT& result() const { return result_; }
// Returns moved result. Should not call result() or MoveResult() again
// after this.
ReturnT MoveResult() { return std::move(result_); }
private:
FunctorT functor_;
ReturnT result_;
};
无返回值特例FunctorMessageHandler 返回值类型为 void 的函数的FunctorMessageHandler特化版本
// Specialization for ReturnT of void.
template <class FunctorT>
class FunctorMessageHandler<void, FunctorT> : public MessageHandler {
public:
explicit FunctorMessageHandler(FunctorT&& functor)
: functor_(std::forward<FunctorT>(functor)) {}
virtual void OnMessage(Message* msg) { functor_(); }
void result() const {}
void MoveResult() {}
private:
FunctorT functor_;
};
总结
至此,基本上将MQ相关的“边角料”介绍完毕了,重点的知识再回顾下:
- WebRTC中有两类消息需要在消息循环中得以处理,即时消息Message以及延迟消息DelayedMessage。他们被投递进入消息循环时,分别进入不同的队列,即时消息Message进入MessageLits类别的即时消息队列,该队列是先入先出的对列,这类消息期待得到立即的处理;延迟消息DelayedMessage进入PriorityQueue类别的延迟消息队列,该队列是优先级队列,根据延迟消息本身的触发时间以及消息序号进行排序,越早触发的消息将越早得以处理。如果再算上线程上同步发送消息,同步阻塞执行方法的话还有另外一个SendList,当然,这不是本文需要说明的内容了。
- 消息数据的类别有好多种,各自起到不同的作用,尤其要注意DisposeData用来利用消息循环处理消息的功能来自然而然地销毁某个类别的数据。
- 消息处理器最重要的就是其OnMessage方法,该方法是消息最终得以处理的地方。WebRTC中的很多重要的类就是MessageHandler的子类,比如PeerConnection;
- 消息处理器的子类FunctorMessageHandler为跨线程执行方法提供了便利,后续会在线程相关的文章中重点阐述。