WebRTCWebRTC源码分析

WebRTC源码分析-线程基础之MessageQueue

2019-11-15  本文已影响0人  ice_ly000

前言

MessageQueue提供了两方面的功能,消息循环中的消息队列功能以及通过持有SocketServer对象带来的IO多路复用功能。在MessageQueue内部这两部分功能不是完全孤立的,而是相互配合在一起使用。尤其是在MessageQueue的核心方法Get()中体现得淋漓尽致。

MessageQueue的实现位于rtc_base/message_queue.h以及rtc_base/message_queue.cc中,其声明如下:

class MessageQueue {
 public:
  static const int kForever = -1;

  MessageQueue(SocketServer* ss, bool init_queue);
  MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue);
  virtual ~MessageQueue();

  SocketServer* socketserver();

  virtual void Quit();
  virtual bool IsQuitting();
  virtual void Restart();
  virtual bool IsProcessingMessagesForTesting();

  virtual bool Get(Message* pmsg, int cmsWait = kForever, bool process_io = true);
  virtual bool Peek(Message* pmsg, int cmsWait = 0);
  virtual void Post(const Location& posted_from, MessageHandler* phandler,
                    uint32_t id = 0, MessageData* pdata = nullptr, bool time_sensitive = false);
  virtual void PostDelayed(const Location& posted_from, int cmsDelay,
                           MessageHandler* phandler, uint32_t id = 0, MessageData* pdata = nullptr);
  virtual void PostAt(const Location& posted_from, int64_t tstamp,
                      MessageHandler* phandler, uint32_t id = 0, MessageData* pdata = nullptr);
  // TODO(honghaiz): Remove this when all the dependencies are removed.
  virtual void PostAt(const Location& posted_from, uint32_t tstamp,
                      MessageHandler* phandler, uint32_t id = 0, MessageData* pdata = nullptr);
  virtual void Clear(MessageHandler* phandler, uint32_t id = MQID_ANY,
                              MessageList* removed = nullptr);
  virtual void Dispatch(Message* pmsg);
  virtual void ReceiveSends();

  // Amount of time until the next message can be retrieved
  virtual int GetDelay();

  bool empty() const { return size() == 0u; }
  size_t size() const {
    CritScope cs(&crit_);  // msgq_.size() is not thread safe.
    return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u);
  }

  // Internally posts a message which causes the doomed object to be deleted
  template <class T>
  void Dispose(T* doomed) {
    if (doomed) {
      Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData<T>(doomed));
    }
  }

  sigslot::signal0<> SignalQueueDestroyed;

 protected:
  class PriorityQueue : public std::priority_queue<DelayedMessage> {
   public:
    container_type& container() { return c; }
    void reheap() { make_heap(c.begin(), c.end(), comp); }
  };

  void DoDelayPost(const Location& posted_from, int64_t cmsDelay, int64_t tstamp, 
                   MessageHandler* phandler, uint32_t id, MessageData* pdata);

  void DoInit();

  void ClearInternal(MessageHandler* phandler, uint32_t id,
                     MessageList* removed) RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);

  void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);

  void WakeUpSocketServer();

  bool fPeekKeep_;
  Message msgPeek_;
  MessageList msgq_ RTC_GUARDED_BY(crit_);
  PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_);
  uint32_t dmsgq_next_num_ RTC_GUARDED_BY(crit_);
  CriticalSection crit_;
  bool fInitialized_;
  bool fDestroyed_;

 private:
  volatile int stop_;
  SocketServer* const ss_;
  std::unique_ptr<SocketServer> own_ss_;

  RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(MessageQueue);
};

MQ的基本成员

根据功能划分,可以将MQ的基本成员分为三类
MQ状态指示:

消息循环相关:MQ中有3个地方存储了消息:msgPeek_,msgq_ ,dmsgq_ 。

IO多路复用相关:

MQ的构造及析构

构造 做了这么几件事:初始化所有的成员;断言ss不能传空指针,将MQ的指针传递给ss使得二者相互持有相互访问;将MQ加入到MQM的管理指针,fInitialized_标志置为true,指示该MQ已经初始化完成。

MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
    : fPeekKeep_(false), dmsgq_next_num_(0), fInitialized_(false),
      fDestroyed_(false), stop_(0), ss_(ss) {
  RTC_DCHECK(ss);
  ss_->SetMessageQueue(this);
  if (init_queue) {
    DoInit();
  }
}

MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
    : MessageQueue(ss.get(), init_queue) {
  own_ss_ = std::move(ss);
}

void MessageQueue::DoInit() {
  if (fInitialized_) {
    return;
  }
  fInitialized_ = true;
  MessageQueueManager::Add(this);
}

析构 基本上是构造的逆操作:设置fDestroyed_为true,表示MQ被销毁;发送信号SignalQueueDestroyed()告知关注了MQ的对象不要再访问该MQ了;从MQM中移除自己;清理MQ中的所有消息;从ss中移除MQ的指针。

MessageQueue::~MessageQueue() {
  DoDestroy();
}

void MessageQueue::DoDestroy() {
  if (fDestroyed_) {
    return;
  }
  fDestroyed_ = true;
  // The signal is done from here to ensure
  // that it always gets called when the queue
  // is going away.
  SignalQueueDestroyed();
  MessageQueueManager::Remove(this);
  ClearInternal(nullptr, MQID_ANY, nullptr);

  if (ss_) {
    ss_->SetMessageQueue(nullptr);
  }
}

MQ的Size

由于MQ有3个地方存储了消息,一个是Peek消息msgPeek_,一个即时消息队列msgq_,一个是延迟消息队列dmsgq_。那么计算MQ中存储的消息个数时,这三个地方都得算上。

  bool empty() const { return size() == 0u; }
  size_t size() const {
    CritScope cs(&crit_);  // msgq_.size() is not thread safe.
    return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u);
  }

MQ的运行状态

MQ的运行状态由成员volatile int stop_来指示,相关的函数有以下几个,如源码所示。方法都非常简单,无非就是对stop_变量进行原子性的置1和置0,本身就是线程安全的,所有并没有上锁。另外需要知道的几点如下:

void MessageQueue::Quit() {
  AtomicOps::ReleaseStore(&stop_, 1);
  WakeUpSocketServer();
}

bool MessageQueue::IsQuitting() {
  return AtomicOps::AcquireLoad(&stop_) != 0;
}

bool MessageQueue::IsProcessingMessagesForTesting() {
  return !IsQuitting();
}

void MessageQueue::Restart() {
  AtomicOps::ReleaseStore(&stop_, 0);
}

消息获取

MQ中消息获取相关的函数有两个,Peek()与Get(),其中Get()是核心内容,再看Get()方法之前,先看看Peek()方法干了啥~~

Peek(): 简而言之就是查看之前是否已经Peek过一个MSG到成员msgPeek_中,若已经Peek过一个则直接将该消息返回;若没有,则通过Get()方法从消息队列中取出一个消息,成功则将该消息交给msgPeek_成员,并将fPeekKeep_标志置为true。

bool MessageQueue::Peek(Message* pmsg, int cmsWait) {
  // fPeekKeep_为真,表示已经Peek过一个MSG到msgPeek_
  // 直接将该MSG返回
  if (fPeekKeep_) {
    *pmsg = msgPeek_;
    return true;
  }
  // 若没有之前没有Peek过一个MSG
  if (!Get(pmsg, cmsWait))
    return false;
  //将Get到的消息放在msgPeek_中保存,并设置标志位
  msgPeek_ = *pmsg;
  fPeekKeep_ = true;
  return true;
}

Get():方法的声明如下所示,注释说明了Get()方法的内部算法的流程,Get()方法会阻塞的处理IO,直到有消息可以处理 或者 cmsWait时间已经过去 或者 Stop()方法被调用。

  // Get() will process I/O until:
  //  1) A message is available (returns true)
  //  2) cmsWait seconds have elapsed (returns false)
  //  3) Stop() is called (returns false)
  virtual bool Get(Message* pmsg,
                   int cmsWait = kForever,
                   bool process_io = true);

源码如下:

bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io) {
  // 是否存在一个Peek过的消息没有被处理?
  // 优先处理该消息                                                                     // 步骤1
  if (fPeekKeep_) {                                                                              
    *pmsg = msgPeek_;
    fPeekKeep_ = false;
    return true;
  }

  int64_t cmsTotal = cmsWait;
  int64_t cmsElapsed = 0;
  int64_t msStart = TimeMillis();
  int64_t msCurrent = msStart;
  while (true) {
    // 检查是否有send消息,若存在,先阻塞处理send消息                                     // 步骤2      
    ReceiveSends();                                                                              

    // 检查所有post消息(即时消息+延迟消息)
    int64_t cmsDelayNext = kForever;
    bool first_pass = true;
    while (true) {
       // 上锁进行消息队列的访问
      {
        CritScope cs(&crit_);
        // 内部第一次循环,先检查延迟消息队列                                           // 步骤3
        if (first_pass) {                                                                        
          first_pass = false;
          // 将延迟消息队列dmsgq_中已经超过触发时间的消息全部取出放入到即时消息队列msgq_中
          // 计算当前时间距离下一个最早将要到达触发时间的消息还有多长时间cmsDelayNext。
          while (!dmsgq_.empty()) {
            if (msCurrent < dmsgq_.top().msTrigger_) {
              cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
              break;
            }
            msgq_.push_back(dmsgq_.top().msg_);
            dmsgq_.pop();
          }
        }
        // 从即时消息队列msgq_队首取出第一个消息                                     // 步骤4
        if (msgq_.empty()) {                                                                  
          break;
        } else {
          *pmsg = msgq_.front();
          msgq_.pop_front();
        }
      }  // crit_ is released here.

      // 如果消息对时间敏感,那么如果超过了最大忍耐时间kMaxMsgLatency才被处理
      // 则打印警告日志
      if (pmsg->ts_sensitive) {
        int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
        if (delay > 0) {
          RTC_LOG_F(LS_WARNING)
              << "id: " << pmsg->message_id
              << "  delay: " << (delay + kMaxMsgLatency) << "ms";
        }
      }
      // 如果取出是需要销毁的消息,则销毁该消息,继续取下一个消息。
      if (MQID_DISPOSE == pmsg->message_id) {
        RTC_DCHECK(nullptr == pmsg->phandler);
        delete pmsg->pdata;
        *pmsg = Message();
        continue;
      }
      return true;
    }
    
    // 走到这,说明当前没有消息要处理,很可能是处于Quit状态了,先判断一下
    if (IsQuitting())
      break;
    
    // 计算留给IO处理的时间                                                       // 步骤5
    int64_t cmsNext;                                                                           
    if (cmsWait == kForever) {    //Get无限期,那么距离下个延迟消息的时间就作为本次IO处理时间
      cmsNext = cmsDelayNext;
    } else {      // Get有超时时间,计算本次IO处理时间
      cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);   // 总体来说还剩多少时间
      if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))  // 总体剩余时间和下一个延迟消息触发时间谁先到达?取其短者
        cmsNext = cmsDelayNext;
    }

    {
      // 阻塞处理IO多路复用                                                     // 步骤6
      if (!ss_->Wait(static_cast<int>(cmsNext), process_io))                  
        return false;
    }

    // 计算是否所有时间都已耗尽,是否进入下一个大循环                             // 步骤7
    msCurrent = TimeMillis();                                    
    cmsElapsed = TimeDiff(msCurrent, msStart);
    if (cmsWait != kForever) {
      if (cmsElapsed >= cmsWait)
        return false;
    }
  }
  return false;
}

上述算法过程是整个消息循环的核心内容,如上注释,大概可以分为7个步骤:

  1. 检查Peek消息。先检查之前是否已经Peek过一个消息到msgPeek_还未被处理,若有,当前就处理该消息吧,函数返回~ 若无,继续处理其他消息。
  2. 通过执行 ReceiveSends() 方法来处理所有Send消息。在MQ中该方法为virtual方法,啥也不干,Thread类继承MQ后会实现该方法,在此方法中处理所有Send消息。因此,消息循环中其实优先,阻塞地先处理所有Send消息,实现跨线程的Send消息方法。
  3. 处理Post消息中的延迟消息。从延迟消息队列dmsgq_中取出所有已经到达触发时间点的延迟消息,并塞入即时消息队列msgq_的队尾。同时计算下一个延迟消息还过多久将被触发(如果延迟消息队列中还有未超时的消息),这个时间可能会被作为后续IO多路复用处理的超时时间。这点在redis,nginx上理念一致。
  4. 处理即时消息。取出即时消息队列msgq_的队首消息。若该消息是个要销毁的消息,那么销毁该消息,并取下一个即时消息;若取到一个非要销毁的即时消息,那么就先处理该即时消息吧,函数返回;若本步骤没有取到即时消息,表示当前没有消息要处理,那干点啥好呢~处理网络IO吧
  5. 计算留给网络IO的时间。消息处理才是迫切的,网络IO嘛,看我能给你分配多少时间吧~~ 分两种情形来对待:
    1)若外部Get方法无限期,那么下一个延迟消息触发时间到来之前我都可以用来处理IO;若是延迟队列中没有延迟消息呢?也就是消息循环队列中没有任何要处理的消息了,那当然我就可以无限期地,阻塞地将时间都用来处理IO了,直到有消息进入消息队列,将消息循环从IO处理中唤醒为止,继续处理消息。
    2)若外部Get方法是有超时时间的,那么我们有必要先计算下已经花费了多长时间,到此刻,我们总共最多还剩多长时间留给IO处理。将总剩余时间跟下一个延迟消息触发时间做个比较,哪个小取哪个作为IO处理的时间;若是延迟队列中没有延迟消息呢?那就将剩下的所有时间都交给IO处理咯,反正也没有消息要处理~
  6. IO多路复用处理。 阻塞地花费上述计算好的时间进行IO处理。过程中要是处理出错,则函数返回;若是处理时间耗完或者时间没有耗完,但是有新消息进入循环了使得阻塞式的IO处理被唤醒,那么进入下个步骤。
  7. 计算剩余时间。既然消息已经被处理完过一次,IO也处理完了,先计算下是不是所有时间都已经耗尽?耗尽时间了,我还没找到可用的即时消息,sorry~函数返回false;没有耗尽的话,那么我们计算下剩余的时间,并将剩余的时间把2-7过程再来一遍吧:处理Send消息,检查延迟消息,检查并返回即时消息,再次计算IO处理时间,IO处理,再次计算剩余时间。什么?为什么没有重复步骤1检查Peek消息?同一个线程中我既然在执行Get,怎么可能Peek嘛,怎么可能又蹦跶出一个Peek消息呢?

消息投递

MQ中消息投递相关的函数有这么几个:Post(),PostDelayed(),两个PostAt(),DoDelayPost()。其中Post()用于投递即时消息;PostDelayed(),两个PostAt()用于投递延迟消息,内部都是调用DoDelayPost()来实现。

Post(): 即时消息的投递,源码如下。主要就是将函数的入参封装成一个即时消息Message对象,然后放置到即时队列msgq_的队尾。需要注意的有四点:

void MessageQueue::Post(const Location& posted_from, MessageHandler* phandler,
                        uint32_t id,  MessageData* pdata,  bool time_sensitive) {
  if (IsQuitting()) {
    delete pdata;
    return;
  }
  {
    CritScope cs(&crit_);
    Message msg;
    msg.posted_from = posted_from;
    msg.phandler = phandler;
    msg.message_id = id;
    msg.pdata = pdata;
    if (time_sensitive) {
      msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
    }
    msgq_.push_back(msg);
  }
  WakeUpSocketServer();
}

void MessageQueue::WakeUpSocketServer() {
  ss_->WakeUp();
}

PostDelayed(),PostAt(),DoDelayPost(): 延迟消息的投递,源码如下。PostDelayed(),PostAt()均是将各自的入参稍作转换后,再调用DoDelayPost()方法,将入参封装成延迟消息DelayedMesssage,然后加入到延迟消息队列dmsgq_中,并从IO多路复用的阻塞中唤醒来处理消息。与Post()方法中的做法并无二致。需要额外注意的地方有这么几点:

void MessageQueue::PostDelayed(const Location& posted_from, int cmsDelay,
                               MessageHandler* phandler, uint32_t id, MessageData* pdata) {
  return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
                     pdata);
}
void MessageQueue::PostAt(const Location& posted_from, uint32_t tstamp,
                          MessageHandler* phandler, uint32_t id, MessageData* pdata) {
  // This should work even if it is used (unexpectedly).
  int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
  return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
}
void MessageQueue::PostAt(const Location& posted_from, int64_t tstamp,
                          MessageHandler* phandler, uint32_t id, MessageData* pdata) {
  return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
                     pdata);
}
void MessageQueue::DoDelayPost(const Location& posted_from, int64_t cmsDelay, int64_t tstamp,
                               MessageHandler* phandler, uint32_t id, MessageData* pdata) {
  if (IsQuitting()) {
    delete pdata;
    return;
  }

  {
    CritScope cs(&crit_);
    Message msg;
    msg.posted_from = posted_from;
    msg.phandler = phandler;
    msg.message_id = id;
    msg.pdata = pdata;
    DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
    dmsgq_.push(dmsg);
    // If this message queue processes 1 message every millisecond for 50 days,
    // we will wrap this number.  Even then, only messages with identical times
    // will be misordered, and then only briefly.  This is probably ok.
    ++dmsgq_next_num_;
    RTC_DCHECK_NE(0, dmsgq_next_num_);
  }
  WakeUpSocketServer();
}

消息处理

Dispatch():方法内容一目了然,先打印一条trace日志;然后记录消息处理的开始时间start_time;调用消息的MessageHandler的OnMessage方法进行消息处理;记录消息处理的结束时间end_time;计算消息处理花费了多长时间diff,如果消息花费时间过程,超过kSlowDispatchLoggingThreshold(50ms),则打印一条警告日志,告知从哪儿构建的消息花费了多长时间才消费完。

void MessageQueue::Dispatch(Message* pmsg) {
  TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line",
               pmsg->posted_from.file_and_line(), "src_func",
               pmsg->posted_from.function_name());
  int64_t start_time = TimeMillis();
  pmsg->phandler->OnMessage(pmsg);
  int64_t end_time = TimeMillis();
  int64_t diff = TimeDiff(end_time, start_time);
  if (diff >= kSlowDispatchLoggingThreshold) {
    RTC_LOG(LS_INFO) << "Message took " << diff
                     << "ms to dispatch. Posted from: "
                     << pmsg->posted_from.ToString();
  }
}

消息清理

Clear(): 清理函数逻辑也相当简单,目标也相当明确,就是要讲满足条件的消息从MQ中删除。需要注意的点有以下几个:

void MessageQueue::Clear(MessageHandler* phandler, uint32_t id, MessageList* removed) {
  CritScope cs(&crit_);
  ClearInternal(phandler, id, removed);
}

void MessageQueue::ClearInternal(MessageHandler* phandler, uint32_t id, MessageList* removed) {
  // Remove messages with phandler
  if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
    if (removed) {
      removed->push_back(msgPeek_);
    } else {
      delete msgPeek_.pdata;
    }
    fPeekKeep_ = false;
  }

  // Remove from ordered message queue
  for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
    if (it->Match(phandler, id)) {
      if (removed) {
        removed->push_back(*it);
      } else {
        delete it->pdata;
      }
      it = msgq_.erase(it);
    } else {
      ++it;
    }
  }

  // Remove from priority queue. Not directly iterable, so use this approach
  PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
  for (PriorityQueue::container_type::iterator it = new_end;
       it != dmsgq_.container().end(); ++it) {
    if (it->msg_.Match(phandler, id)) {
      if (removed) {
        removed->push_back(it->msg_);
      } else {
        delete it->msg_.pdata;
      }
    } else {
      *new_end++ = *it;
    }
  }
  dmsgq_.container().erase(new_end, dmsgq_.container().end());
  dmsgq_.reheap();
}

销毁消息

Dispose(): 之前在WebRTC源码分析-线程基础之Message && MessageData && MessageHandler中对DisposeData消息数据专门进行过阐述。再配合之前Get()方法中对DisposeData消息数据的处理方式,我们很容易理解该函数的作用:如果想要销毁某个对象,而不方便立马销毁,那么就可以将调用消息循环的Dispose()方法让消息循环帮忙进行数据销毁。

  // Internally posts a message which causes the doomed object to be deleted
  template <class T>
  void Dispose(T* doomed) {
    if (doomed) {
      Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData<T>(doomed));
    }
  }
上一篇 下一篇

猜你喜欢

热点阅读