WebRTC Module和ProcessThread
2020-05-16 本文已影响0人
JeffreyLau
Module定义
#modules/include/module.h
class Module {
public:
// Returns the number of milliseconds until the module wants a worker
// 返回下一次执行Process函数的时间,单位是毫秒
virtual int64_t TimeUntilNextProcess() = 0;
// Process any pending tasks such as timeouts.
// Called on a worker thread.
virtual void Process() = 0;
//绑定或者解绑ProcessThread到当前模块,实际上只有平滑发送模块(PacedSender)才使用到这个接口,
//PacedSender暂停和恢复的时候会调用ProcessThread的WakeUp接口激活PacedSender模块
virtual void ProcessThreadAttached(ProcessThread* process_thread) {}
protected:
virtual ~Module() {}
};
ProcessThread定义
#modules/utility/process_thread.h
class ProcessThread {
public:
virtual ~ProcessThread();
// create thread
static std::unique_ptr<ProcessThread> Create(const char* thread_name);
// Starts the worker thread. Must be called from the construction thread.
virtual void Start() = 0;
// Stops the worker thread. Must be called from the construction thread.
virtual void Stop() = 0;
// Can be called on any thread.
virtual void WakeUp(Module* module) = 0;
// the thread never runs).
virtual void PostTask(std::unique_ptr<QueuedTask> task) = 0;
// Adds a module that will start to receive callbacks on the worker thread.
// Can be called from any thread.
virtual void RegisterModule(Module* module, const rtc::Location& from) = 0;
// Removes a previously registered module.
// Can be called from any thread.
virtual void DeRegisterModule(Module* module) = 0;
}
-
调用Start创建一个新的线程,并启动线程执行定时任务
-
调用Stop停止线程,并销毁线程
-
通过RegisterModule接口注册需要定时执行的模块,ProcessThread把模块加入到模块列表中(modules_),并调用ProcessThreadAttached注册此线程到新加入模块
-
通过,DeRegisterModule接口移除不再需要定时执行的模块,ProcessThread把模块从模块列表中移除,并调用ProcessThreadAttached取消注册此线程到移除模块
-
线程启动以后会循环执行
ProcessThreadImpl::Process
,它会从模块列表中找到当前需要执行的模块,并找出最近一次需要执行的最小时间,把这个时间给定时器 -
如果某个模块需要立马被执行可以调用WakeUp函数,它会中断定时器,马上执行指定模块
ProcessThread的实现
#modules/utility/source/process_thread_impl.h
class ProcessThreadImpl : public ProcessThread {
public:
explicit ProcessThreadImpl(const char* thread_name);
~ProcessThreadImpl() override;
void Start() override;
void Stop() override;
void WakeUp(Module* module) override;
void PostTask(std::unique_ptr<QueuedTask> task) override;
void RegisterModule(Module* module, const rtc::Location& from) override;
void DeRegisterModule(Module* module) override;
protected:
static void Run(void* obj);
bool Process();
private:
struct ModuleCallback {
....
ModuleCallback(Module* module, const rtc::Location& location)
: module(module), location(location) {}
bool operator==(const ModuleCallback& cb) const {
return cb.module == module;
}
Module* const module;//记录模块实例
//下一次回调的绝对时间
int64_t next_callback = 0; // Absolute timestamp.
const rtc::Location location;
private:
ModuleCallback& operator=(ModuleCallback&);
};
//模块列表
typedef std::list<ModuleCallback> ModuleList;
// Warning: For some reason, if |lock_| comes immediately before |modules_|
// with the current class layout, we will start to have mysterious crashes
// on Mac 10.9 debug. I (Tommi) suspect we're hitting some obscure alignemnt
// issues, but I haven't figured out what they are, if there are alignment
// requirements for mutexes on Mac or if there's something else to it.
// So be careful with changing the layout.
rtc::CriticalSection lock_; // Used to guard modules_, tasks_ and stop_.
rtc::ThreadChecker thread_checker_;
rtc::Event wake_up_;
// TODO(pbos): Remove unique_ptr and stop recreating the thread.
std::unique_ptr<rtc::PlatformThread> thread_;
ModuleList modules_;
//任务队列
std::queue<QueuedTask*> queue_;
bool stop_;
const char* thread_name_;
}
Module注册
#modules/utility/source/process_thread_impl.cc
void ProcessThreadImpl::RegisterModule(Module* module,
const rtc::Location& from) {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK(module) << from.ToString();
// Now that we know the module isn't in the list, we'll call out to notify
// the module that it's attached to the worker thread. We don't hold
// the lock while we make this call.
if (thread_.get())
module->ProcessThreadAttached(this);
{
rtc::CritScope lock(&lock_);
modules_.push_back(ModuleCallback(module, from));
}
// Wake the thread calling ProcessThreadImpl::Process() to update the
// waiting time. The waiting time for the just registered module may be
// shorter than all other registered modules.
wake_up_.Set();
}
- 每一个Module通过RegisterModule注册模块和ProcessThread进行绑定在绑定
- 创建ModuleCallback将Module作为参数传入
- 将ModuleCallback插入到modules_列表
ProcessThread循环
#modules/utility/source/process_thread_impl.cc
void ProcessThreadImpl::Start() {
RTC_DCHECK(thread_checker_.IsCurrent());
RTC_DCHECK(!thread_.get());
if (thread_.get())
return;
RTC_DCHECK(!stop_);
for (ModuleCallback& m : modules_)
m.module->ProcessThreadAttached(this);
thread_.reset(
new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_));
thread_->Start();
}
-
当在其他类中调用Start()方法执行ProcessThread的时候首先回调m.module->ProcessThreadAttached(this)
将ProcessThread和模块进行绑定
-
new rtc::PlatformThread创建线程并绑定线程回调ProcessThreadImpl::Run处理函数
-
执行thread_->Start()进行线程循环
#modules/utility/source/process_thread_impl.cc
bool ProcessThreadImpl::Process() {
TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_);
int64_t now = rtc::TimeMillis();
int64_t next_checkpoint = now + (1000 * 60);
{
rtc::CritScope lock(&lock_);
if (stop_)
return false;
for (ModuleCallback& m : modules_) {
// TODO(tommi): Would be good to measure the time TimeUntilNextProcess
// takes and dcheck if it takes too long (e.g. >=10ms). Ideally this
// operation should not require taking a lock, so querying all modules
// should run in a matter of nanoseconds.
if (m.next_callback == 0)
m.next_callback = GetNextCallbackTime(m.module, now);
if (m.next_callback <= now ||
m.next_callback == kCallProcessImmediately) {
{
m.module->Process();
}
// Use a new 'now' reference to calculate when the next callback
// should occur. We'll continue to use 'now' above for the baseline
// of calculating how long we should wait, to reduce variance.
int64_t new_now = rtc::TimeMillis();
m.next_callback = GetNextCallbackTime(m.module, new_now);
}
if (m.next_callback < next_checkpoint)
next_checkpoint = m.next_callback;
}
while (!queue_.empty()) {
QueuedTask* task = queue_.front();
queue_.pop();
lock_.Leave();
task->Run();
delete task;
lock_.Enter();
}
}
int64_t time_to_wait = next_checkpoint - rtc::TimeMillis();
if (time_to_wait > 0)
wake_up_.Wait(static_cast<int>(time_to_wait));
return true;
}
- 首先遍历modules_看是否有注册模块
- 如果有注册的模块,首次执行的时候next_callback=0,通过GetNextCallbackTime获取下一次执行Module::Process()函数的绝对时间
- 将next_callback和当前值进行比较如果当前时间大于等于next_callback,说明时间已经到了,则调用Module对应的Process()函数,如果next_callback的值为-1则立即执行
- 更新next_callback为当前值
- 扫描queue_执行任务队列,回调task->Run()
#modules/utility/source/process_thread_impl.cc
const int64_t kCallProcessImmediately = -1;
int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
int64_t interval = module->TimeUntilNextProcess();
if (interval < 0) {
// Falling behind, we should call the callback now.
return time_now;
}
return time_now + interval;
}
- 当前的时间值加上module->TimeUntilNextProcess()
- 由这里可以看出Module::TimeUntilNextProcess函数的实现是告诉ProcessThreadImpl线程该模块的Process是要过多少ms之后希望被调用
pacer_thread的创建和注册
#call/call.cc
Call* Call::Create(const Call::Config& config) {
return Create(config, Clock::GetRealTimeClock(),
ProcessThread::Create("ModuleProcessThread"),
ProcessThread::Create("PacerThread"));
}
Call* Call::Create(const Call::Config& config,
Clock* clock,
std::unique_ptr<ProcessThread> call_thread,
std::unique_ptr<ProcessThread> pacer_thread) {
RTC_DCHECK(config.task_queue_factory);
return new internal::Call(
clock, config,
absl::make_unique<RtpTransportControllerSend>(
clock, config.event_log, config.network_state_predictor_factory,
config.network_controller_factory, config.bitrate_config,
std::move(pacer_thread), config.task_queue_factory),
std::move(call_thread), config.task_queue_factory);
}
Call::Call(Clock* clock,
const Call::Config& config,
std::unique_ptr<RtpTransportControllerSendInterface> transport_send,
std::unique_ptr<ProcessThread> module_process_thread,
TaskQueueFactory* task_queue_factory)
: clock_(clock),
transport_send_(std::move(transport_send)) {
RTC_DCHECK(config.event_log != nullptr);
module_process_thread_->RegisterModule(
receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE);
module_process_thread_->RegisterModule(call_stats_.get(), RTC_FROM_HERE);
module_process_thread_->RegisterModule(&receive_side_cc_, RTC_FROM_HERE);
}
- 在创建Call的时候会创建pacer_thread 同时会为pacer_thread注册如下三个模块
- ReceiveSideCongestionController模块
- CallStats模块
- RemoteEstimatorProxy模块
- 除此之外还有PacedSender也是Module的派生类,在构造PacedSender的时候
也会向pacer_thread注册模块,将Process的调用交个pacer_thread来进行调度
#modules/pacing/paced_sender.cc
PacedSender::PacedSender(Clock* clock,
PacketRouter* packet_router,
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials,
ProcessThread* process_thread)
: pacing_controller_(clock,
static_cast<PacingController::PacketSender*>(this),
event_log,
field_trials),
packet_router_(packet_router),
process_thread_(process_thread) {
if (process_thread_)
process_thread_->RegisterModule(&module_proxy_, RTC_FROM_HERE);
}