百度 Apollo 8.0 Cyber 源代码分析(五)

2024-11-27  本文已影响0人  RonZheng2010

相关链接

百度 Apollo 8.0 Cyber 源代码分析(一)
百度 Apollo 8.0 Cyber 源代码分析(二)
百度 Apollo 8.0 Cyber 源代码分析(三)
百度 Apollo 8.0 Cyber 源代码分析(四)
百度 Apollo 8.0 Cyber 源代码分析(五)

9 cyber协程调度

9.1 CRoutine

CRoutine定义协程。

9.2 Processor/ProcessorContext

Processor为协程提供处理器资源。ProcessorContext为它提供执行上下文。

ProcessorContext定义processor执行context接口。

关于Processor,

9.3 ClassicContext

ClassicContext实现经典模式的context接口。

9.4 Scheduler

Scheduler定义调度器接口。有两种调度器SchedulerClassisc和SchedulerChoreography。这里以SchedulerClassic为例说明。

SchedulerFactory::Instance() 根据配置文件的配置,创建唯一的Scheduler实例。

Scheduler定义调度器接口。

9.5 SchedulerClassic

SchedulerClassic实现经典的调度器接口。

9.6 ClassicConf

ClassicConf是SchedulerClassic的配置定义。
在SchedulerClassic的构造函数中,打开配置文件,如/cyber/conf/pv_nn.conf,得到SchedulerConf实例。

其中的Scheduler_conf是给SchedulerClassic准备的。

关于SchedGroup,

9.7 CreateRoutineFactory()

CreateRoutineFactory()有多个版本。它们工作都是创建一个RoutineFactory实例,用于创建一个协程。

当消息到达时,这个协程被唤醒。调用DataVisitor::TryFetch()读取消息,如果读到了,就调用用户指定的回调函数处理,并且置协程为READY状态,这样可以继续被调度;如果没有读到,则放弃时间片,协程进入DATA_WAIT状态,等待再次被唤醒。

template <typename M0, typename M1, typename F>
RoutineFactory CreateRoutineFactory(
    F&& f, const std::shared_ptr<data::DataVisitor<M0, M1>>& dv) {
  RoutineFactory factory;
  factory.SetDataVisitor(dv);
  factory.create_routine = [=]() {
    return [=]() {
      std::shared_ptr<M0> msg0;
      std::shared_ptr<M1> msg1;
      for (;;) {
        CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);
        if (dv->TryFetch(msg0, msg1)) {
          f(msg0, msg1);
          CRoutine::Yield(RoutineState::READY);
        } else {
          CRoutine::Yield();
        }   
      }   
    };  
  };  
  return factory;
}

如下是涉及的类。

这里说明CreateRoutineFactory()创建协程的运行逻辑。

bool SchedulerClassic::NotifyProcessor(uint64_t crid) {
......
  {
    ReadLockGuard<AtomicRWLock> lk(id_cr_lock_);
    if (id_cr_.find(crid) != id_cr_.end()) {
      auto cr = id_cr_[crid];
      if (cr->state() == RoutineState::DATA_WAIT ||
          cr->state() == RoutineState::IO_WAIT) {
        cr->SetUpdateFlag();
      }

      ClassicContext::Notify(cr->group_name());
      return true;
    }
  }
  return false;
}
void Processor::Run() {
......
  while (cyber_likely(running_.load())) {
    if (cyber_likely(context_ != nullptr)) {
      auto croutine = context_->NextRoutine();
      if (croutine) {
        snap_shot_->execute_start_time.store(cyber::Time::Now().ToNanosecond());
        snap_shot_->routine_name = croutine->name();
        croutine->Resume();
        croutine->Release();
      } else {
        snap_shot_->execute_start_time.store(0);
        context_->Wait();
      }
    } else {
      std::unique_lock<std::mutex> lk(mtx_ctx_);
      cv_ctx_.wait_for(lk, std::chrono::milliseconds(10));
    }
  }
}

10 cyber::Async()

关于TaskManager,

cyber::Async()给使用者一个接口,方便地创建一个函数在协程中执行。它的实现就是向TaskManager的低优先级队列推送一个任务。

上一篇 下一篇

猜你喜欢

热点阅读