WebRTC

WebRTC Module和ProcessThread

2020-05-16  本文已影响0人  JeffreyLau

Module定义

#modules/include/module.h
class Module {
 public:
  // Returns the number of milliseconds until the module wants a worker
  // 返回下一次执行Process函数的时间,单位是毫秒
  virtual int64_t TimeUntilNextProcess() = 0;

  // Process any pending tasks such as timeouts.
  // Called on a worker thread.
  virtual void Process() = 0;

  //绑定或者解绑ProcessThread到当前模块,实际上只有平滑发送模块(PacedSender)才使用到这个接口,  
  //PacedSender暂停和恢复的时候会调用ProcessThread的WakeUp接口激活PacedSender模块
  virtual void ProcessThreadAttached(ProcessThread* process_thread) {}

 protected:
  virtual ~Module() {}
};

ProcessThread定义

#modules/utility/process_thread.h
class ProcessThread {
 public:
  virtual ~ProcessThread();
  // create thread
  static std::unique_ptr<ProcessThread> Create(const char* thread_name);

  // Starts the worker thread.  Must be called from the construction thread.
  virtual void Start() = 0;

  // Stops the worker thread.  Must be called from the construction thread.
  virtual void Stop() = 0;

  // Can be called on any thread.
  virtual void WakeUp(Module* module) = 0;

  // the thread never runs).
  virtual void PostTask(std::unique_ptr<QueuedTask> task) = 0;

  // Adds a module that will start to receive callbacks on the worker thread.
  // Can be called from any thread.
  virtual void RegisterModule(Module* module, const rtc::Location& from) = 0;

  // Removes a previously registered module.
  // Can be called from any thread.
  virtual void DeRegisterModule(Module* module) = 0;
}

ProcessThread的实现

#modules/utility/source/process_thread_impl.h
class ProcessThreadImpl : public ProcessThread {
 public:
  explicit ProcessThreadImpl(const char* thread_name);
  ~ProcessThreadImpl() override;

  void Start() override;
  void Stop() override;

  void WakeUp(Module* module) override;
  void PostTask(std::unique_ptr<QueuedTask> task) override;

  void RegisterModule(Module* module, const rtc::Location& from) override;
  void DeRegisterModule(Module* module) override;

 protected:
  static void Run(void* obj);
  bool Process();

 private:
  struct ModuleCallback {
    ....
    ModuleCallback(Module* module, const rtc::Location& location)
        : module(module), location(location) {}
    bool operator==(const ModuleCallback& cb) const {
      return cb.module == module;
    }

    Module* const module;//记录模块实例
    //下一次回调的绝对时间
    int64_t next_callback = 0;  // Absolute timestamp.
    const rtc::Location location;

   private:
    ModuleCallback& operator=(ModuleCallback&);
  };
  //模块列表
  typedef std::list<ModuleCallback> ModuleList;

  // Warning: For some reason, if |lock_| comes immediately before |modules_|
  // with the current class layout, we will  start to have mysterious crashes
  // on Mac 10.9 debug.  I (Tommi) suspect we're hitting some obscure alignemnt
  // issues, but I haven't figured out what they are, if there are alignment
  // requirements for mutexes on Mac or if there's something else to it.
  // So be careful with changing the layout.
  rtc::CriticalSection lock_;  // Used to guard modules_, tasks_ and stop_.

  rtc::ThreadChecker thread_checker_;
  rtc::Event wake_up_;
  // TODO(pbos): Remove unique_ptr and stop recreating the thread.
  std::unique_ptr<rtc::PlatformThread> thread_;

  ModuleList modules_;
  //任务队列
  std::queue<QueuedTask*> queue_;
  bool stop_;
  const char* thread_name_;
}

Module注册

#modules/utility/source/process_thread_impl.cc
void ProcessThreadImpl::RegisterModule(Module* module,
                                       const rtc::Location& from) {
  RTC_DCHECK(thread_checker_.IsCurrent());
  RTC_DCHECK(module) << from.ToString();

  // Now that we know the module isn't in the list, we'll call out to notify
  // the module that it's attached to the worker thread.  We don't hold
  // the lock while we make this call.
  if (thread_.get())
    module->ProcessThreadAttached(this);
  {
    rtc::CritScope lock(&lock_);
    modules_.push_back(ModuleCallback(module, from));
  }
  // Wake the thread calling ProcessThreadImpl::Process() to update the
  // waiting time. The waiting time for the just registered module may be
  // shorter than all other registered modules.
  wake_up_.Set();
}

ProcessThread循环

#modules/utility/source/process_thread_impl.cc
void ProcessThreadImpl::Start() {
  RTC_DCHECK(thread_checker_.IsCurrent());
  RTC_DCHECK(!thread_.get());
  if (thread_.get())
    return;

  RTC_DCHECK(!stop_);

  for (ModuleCallback& m : modules_)
    m.module->ProcessThreadAttached(this);

  thread_.reset(
      new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_));
  thread_->Start();
}
#modules/utility/source/process_thread_impl.cc
bool ProcessThreadImpl::Process() {
  TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_);
  int64_t now = rtc::TimeMillis();
  int64_t next_checkpoint = now + (1000 * 60);

  {
    rtc::CritScope lock(&lock_);
    if (stop_)
      return false;
    for (ModuleCallback& m : modules_) {
      // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
      // takes and dcheck if it takes too long (e.g. >=10ms).  Ideally this
      // operation should not require taking a lock, so querying all modules
      // should run in a matter of nanoseconds.
      if (m.next_callback == 0)
        m.next_callback = GetNextCallbackTime(m.module, now);

      if (m.next_callback <= now ||
          m.next_callback == kCallProcessImmediately) {
        {
          m.module->Process();
        }
        // Use a new 'now' reference to calculate when the next callback
        // should occur.  We'll continue to use 'now' above for the baseline
        // of calculating how long we should wait, to reduce variance.
        int64_t new_now = rtc::TimeMillis();
        m.next_callback = GetNextCallbackTime(m.module, new_now);
      }

      if (m.next_callback < next_checkpoint)
        next_checkpoint = m.next_callback;
    }

    while (!queue_.empty()) {
      QueuedTask* task = queue_.front();
      queue_.pop();
      lock_.Leave();
      task->Run();
      delete task;
      lock_.Enter();
    }
  }

  int64_t time_to_wait = next_checkpoint - rtc::TimeMillis();
  if (time_to_wait > 0)
    wake_up_.Wait(static_cast<int>(time_to_wait));

  return true;
}
#modules/utility/source/process_thread_impl.cc
const int64_t kCallProcessImmediately = -1;

int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
  int64_t interval = module->TimeUntilNextProcess();
  if (interval < 0) {
    // Falling behind, we should call the callback now.
    return time_now;
  }
  return time_now + interval;
}

pacer_thread的创建和注册

#call/call.cc
Call* Call::Create(const Call::Config& config) {
  return Create(config, Clock::GetRealTimeClock(),
                ProcessThread::Create("ModuleProcessThread"),
                ProcessThread::Create("PacerThread"));
}

Call* Call::Create(const Call::Config& config,
                   Clock* clock,
                   std::unique_ptr<ProcessThread> call_thread,
                   std::unique_ptr<ProcessThread> pacer_thread) {
  RTC_DCHECK(config.task_queue_factory);
  return new internal::Call(
      clock, config,
      absl::make_unique<RtpTransportControllerSend>(
          clock, config.event_log, config.network_state_predictor_factory,
          config.network_controller_factory, config.bitrate_config,
          std::move(pacer_thread), config.task_queue_factory),
      std::move(call_thread), config.task_queue_factory);
}

Call::Call(Clock* clock,
           const Call::Config& config,
           std::unique_ptr<RtpTransportControllerSendInterface> transport_send,
           std::unique_ptr<ProcessThread> module_process_thread,
           TaskQueueFactory* task_queue_factory)
    : clock_(clock),
      transport_send_(std::move(transport_send)) {
  RTC_DCHECK(config.event_log != nullptr);
  module_process_thread_->RegisterModule(
      receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE);
  module_process_thread_->RegisterModule(call_stats_.get(), RTC_FROM_HERE);
  module_process_thread_->RegisterModule(&receive_side_cc_, RTC_FROM_HERE);
}

​ 也会向pacer_thread注册模块,将Process的调用交个pacer_thread来进行调度

#modules/pacing/paced_sender.cc
PacedSender::PacedSender(Clock* clock,
                         PacketRouter* packet_router,
                         RtcEventLog* event_log,
                         const WebRtcKeyValueConfig* field_trials,
                         ProcessThread* process_thread)
    : pacing_controller_(clock,
                         static_cast<PacingController::PacketSender*>(this),
                         event_log,
                         field_trials),
      packet_router_(packet_router),
      process_thread_(process_thread) {
  if (process_thread_)
    process_thread_->RegisterModule(&module_proxy_, RTC_FROM_HERE);
}
上一篇下一篇

猜你喜欢

热点阅读