百度 Apollo 8.0 Cyber 源代码分析(五)
相关链接
百度 Apollo 8.0 Cyber 源代码分析(一)
百度 Apollo 8.0 Cyber 源代码分析(二)
百度 Apollo 8.0 Cyber 源代码分析(三)
百度 Apollo 8.0 Cyber 源代码分析(四)
百度 Apollo 8.0 Cyber 源代码分析(五)
9 cyber协程调度
9.1 CRoutine
CRoutine定义协程。
-
成员name_ 是协程名字,可以转成一个uint64数字,也就是成员id_
-
成员func_是协程函数
-
成员state_是协程状态,IO_WAIT/DATA_WAIT是等待状态,READY是准备好执行,FINISHED是已经执行完成。
-
成员priority_是执行优先级
-
static thread_local成员current_routine_ scheduler的当前执行的CRoutine实例。而成员main_stack_ 是没有协程执行时的执行位置。
-
成员context_ 是RoutineContext实例。它保存协程的执行栈,成员sp是sp指针,成员stack是栈。在CRoutine的构造函数中,会从CRoutineContext的CObjectPool池中,得到这个实例。
-
HangUp()置DATA_WAIT,Wake() 将状态置READY,Sleep()设置等待时间wake_time_,状态置SLEEP。
-
UpdateState()中,如果wait_time_超时,则置状态为READY;如果状态为IO_WAIT,则置READY。
-
Stop()中,置状态为FINISHED。
-
Resume()中,如果状态为FINISHED,则直接返回;如果状态为READY,调用SwapContext()将当前的调用栈换成协程的调用栈,开始执行。在如下条件满足前,SwapContext()不会返回。
- 协程函数执行完成
- 协程调用Yield()。Yield()的作用是将协程的调用栈换出,所以协程不再执行。
9.2 Processor/ProcessorContext
Processor为协程提供处理器资源。ProcessorContext为它提供执行上下文。
ProcessorContext定义processor执行context接口。
- 虚拟成员函数NextRoutine()得到下一个READY状态的CRoutine实例。
- Wait()等待协程READY。
关于Processor,
- 成员thread_ 是执行线程。
- 成员context_是ProcessorContext实例。成员函数BindContext()将指定的ProcessorContext实例绑定到context_,并且启动线程thread_,线程函数是Run()
- Run()在while()循环中,调用ProcessorContext::NextRoutine(),得到CRoutine实例;如果有READY的CRoutine实例的话,调用CRoutineL::Resume()执行它;如果没哟,则调用功能ProcessorContext::Wait()等待。
9.3 ClassicContext
ClassicContext实现经典模式的context接口。
-
成员current_group是当前processor。由于ClassiscContext实例与Processor是一一绑定的,所以一个processor实例,就只能服务与一个组的CRoutine。
-
static成员cr_group_是一个从group名字到array的映射。这个array的大小是MAX_PRIO=20。这是协程优先级最大值。也就是说array中每个元素对应一个优先级。array的元素是一个CRoutine实例的vector。也就是说,CRoutien实例是按照组名、优先级放在对应的vector中。
-
成员multi_pri_rq_ 是cr_group_ 中,当前组对应的array。这是在成员函数InitGroup()中设置的。
-
static成员notify_grp_是一个从组名到int值的映射。这个int值是当前READY状态的协程个数。调用成员函数Notify(),这个值加1,而调用成员函数Wait(),这个值减1。
-
成员函数NextRoutine(), 按照优先级从高到低的顺序,遍历multi_pri_rq_的array,看看其中的CRoutine是否状态为READY,如果是,就获得所有权,并返回它。
-
成员Wait(),等待notify_grp_[]中的协程READY,等待成功则返回。成员Notify()则用于通知READY协程增加了一个。
9.4 Scheduler
Scheduler定义调度器接口。有两种调度器SchedulerClassisc和SchedulerChoreography。这里以SchedulerClassic为例说明。
SchedulerFactory::Instance() 根据配置文件的配置,创建唯一的Scheduler实例。
Scheduler定义调度器接口。
- 成员processors_ 是使用的处理器Processor实例,而成员pctxs_是一一对应的上下文ClassicContext实例。
- 成员id_cr_ 是从routine id到CRoutine实例的映射。routine id是基于协程名字计算得到的hash值。
- 成员函数ProcessLevelResourceControl()根据设置当前线程的cpuset。
- 成员函数CreateTask()的步骤如下。
- 创建CRoutine实例
- 调用虚拟成员函数DispatchTask(),将它推送到调度队列。
- 如果这个任务指定了DataVisitor参数,则定义一个函数作为它的回调函数,以便在消息到达时,得到通知。这个函数的实现是调用虚拟成员函数NotifyProcessor()。
9.5 SchedulerClassic
SchedulerClassic实现经典的调度器接口。
-
成员classic_conf_是配置数据。在构造函数中,从配置文件中读入这个配置;然后调用CreateProcessor()。在CreateProcessor()中,对classic_conf配置的每个组,创建一对Processor/ClassicContext实例并绑定,这样协程就开始调度了。
-
虚拟成员函数DispatchTask()的实现:
- 在cr_confs_中,根据任务名查找这个任务的配置,如果有的话,使用它配置priority和组名;如果没有,则priority使用默认值0,组名置为classsic_conf_中第一个组classic_conf_.groups(0)。
- 将这个CRoutine实例放入ClassicContext::cr_group_中指定组/priority的vector。
-
虚拟成员函数NotifyProcessor()的实现:
- 在id_cr_按照routine id查找CRoutine实例。找到的话,调用CRoutine::SetUpdateFlag(),将状态从DATA_WAIT改成READY,准备执行;
- 调用ClassicContext::Notify(),如果processor还处于等待状态,则唤醒它。
9.6 ClassicConf
ClassicConf是SchedulerClassic的配置定义。
在SchedulerClassic的构造函数中,打开配置文件,如/cyber/conf/pv_nn.conf,得到SchedulerConf实例。
其中的Scheduler_conf是给SchedulerClassic准备的。
- 成员groups是SchedGroup实例,包括协程分组。
关于SchedGroup,
- 成员name是组名,
- 成员processor_num是这个组占用的处理器个数,
- cpuset是处理器编号,
- 某些任务需要指定更详细的选项,如priority。成员tasks[]可以配置这样的任务。
9.7 CreateRoutineFactory()
CreateRoutineFactory()有多个版本。它们工作都是创建一个RoutineFactory实例,用于创建一个协程。
当消息到达时,这个协程被唤醒。调用DataVisitor::TryFetch()读取消息,如果读到了,就调用用户指定的回调函数处理,并且置协程为READY状态,这样可以继续被调度;如果没有读到,则放弃时间片,协程进入DATA_WAIT状态,等待再次被唤醒。
template <typename M0, typename M1, typename F>
RoutineFactory CreateRoutineFactory(
F&& f, const std::shared_ptr<data::DataVisitor<M0, M1>>& dv) {
RoutineFactory factory;
factory.SetDataVisitor(dv);
factory.create_routine = [=]() {
return [=]() {
std::shared_ptr<M0> msg0;
std::shared_ptr<M1> msg1;
for (;;) {
CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);
if (dv->TryFetch(msg0, msg1)) {
f(msg0, msg1);
CRoutine::Yield(RoutineState::READY);
} else {
CRoutine::Yield();
}
}
};
};
return factory;
}
如下是涉及的类。
这里说明CreateRoutineFactory()创建协程的运行逻辑。
-
协程在一个for()循环中运行,自身不会中止执行。需要外部调用者执行CRoutine::Stop()将它置为FINISHED状态,并通过ClassicContext::RemoveTask(),从调度队列移除。
-
一开始它将自身置为DATA_WAIT状态。这是为下一轮”等待 - 执行”作准备。
-
调用DataVisitor::TryFetch()得到消息。
+如果得到,调用使用者的处理函数;调用CRoutine::Yield(READY),这是假设还有消息没处理完,所以回到调度队列后,下次可以继续被调度,不用等着被唤醒了。- 如果没得到,则调用Yield(),将自身的调用栈交换出去。这次回到调度队列,协程处于DATA_WAIT状态,需要被唤醒。
-
当消息到达时,会调用DataVisitor的成员notifier_,这个回调函数会调用SchedulerClassic::NotifyProcessor()。其中调用CRoutine::SetUpdateFlag(),将CRoutine实例状态从DATA_WAIT改回READY,并调用ClassicContext::Notify()唤醒相应的条件变量。
bool SchedulerClassic::NotifyProcessor(uint64_t crid) {
......
{
ReadLockGuard<AtomicRWLock> lk(id_cr_lock_);
if (id_cr_.find(crid) != id_cr_.end()) {
auto cr = id_cr_[crid];
if (cr->state() == RoutineState::DATA_WAIT ||
cr->state() == RoutineState::IO_WAIT) {
cr->SetUpdateFlag();
}
ClassicContext::Notify(cr->group_name());
return true;
}
}
return false;
}
- Processor的处理线程可能正调用ClassicContext::Wait(),等在这个条件变量上,这时就被唤醒了。
- 之后它调用ClassicContext::NextRoutine()得到下一个CRoutine实例 。因为前面的CRoutine实例状态已经是READY了,所以现在它可以被选中。
- 然后调用CRoutine::Resume(),将它的调用栈交换进来,所以它就又开始执行了。
void Processor::Run() {
......
while (cyber_likely(running_.load())) {
if (cyber_likely(context_ != nullptr)) {
auto croutine = context_->NextRoutine();
if (croutine) {
snap_shot_->execute_start_time.store(cyber::Time::Now().ToNanosecond());
snap_shot_->routine_name = croutine->name();
croutine->Resume();
croutine->Release();
} else {
snap_shot_->execute_start_time.store(0);
context_->Wait();
}
} else {
std::unique_lock<std::mutex> lk(mtx_ctx_);
cv_ctx_.wait_for(lk, std::chrono::milliseconds(10));
}
}
}
10 cyber::Async()
关于TaskManager,
- 成员task_queue_保存低优先级的任务,成员task_queue_hight_prio_保存高优先级的任务。
- 成员tasks_保存了所有的任务。
- 在TaskManager的构造函数中,创建指定数量的协程,这个数量在进程的cpu调度配置文件中指定的CPU核数相同。在协程的处理函数中,会等待task_queue_high_prio_和task_queue_中的任务,有就执行它。task_queue_hight_prio_中的任务会优先执行。
cyber::Async()给使用者一个接口,方便地创建一个函数在协程中执行。它的实现就是向TaskManager的低优先级队列推送一个任务。