百度 Apollo 8.0 Cyber 源代码分析(三)
3 cyber的topology管理(node、channelD等)
fastrtps是一个开源网络库。cyber基于fastrtps在进程之间交换节点/channel信息。这些进程可以在同一个主机上,也可以跨主机。
Manager及相关类封装fastrtps对象。NodeManager和ChannelManager派生自Manager,前者实现node加入、离开等信息的交换,后者实现channel加入、离开等信息的交换。
Reader/Writer基于ChannelManager管理channel信息;Node/NodeChannelImpl基于NodeChannelManager管理节点信息,它也负责创建Reader/Writer。
3.1 Participant
cyber基于fastrtps库实现topoloy发现功能。Participant类封装fastrtps接口。
- 成员fastrtps_partipant_ 保存partipant实例,这个participant实例是fastrtps的通信节点。成员name_ 是participant的名字。
- fastrtps基于udp协议,成员send_port_ 保存fastrtps发送端口。
- 成员listener_ 挂接在fastrtps_participant_上监听fastrtps消息,如其他pariticipant的上线、下线通知等。
- 成员函数CreateFastRtpsParticipant()创建fastrtps_participant_和listenser_,它们分别是fastrtps::Participant和fastrtps::ParticipantListener的实例。这两个类是fastrtps库定义的。
- ParticipantListener监听fastrtps的Participant change事件。当Particant加入或离开时,它的虚拟成员函数OnPartipantDiscovery()被调用。
3.2 TopologyManager
TopologyManager管理cyber各种对象的topology,包括Node、Channel等等。
- 成员participant_ 是Participant实例。成员listener_是ParticipantListener实例,被打包到participant_。成员函数CreateParticipant()创建participant_,参数包括listener_用于接收消息。
- 当Paricipant Change事件发生时,ParticipantListener的成员callback_被调用。这里的callback_是TopologyManager::OnParticipantChange()。
- OnParticipantChange()接收的消息包括ParticipantDiscoveryInfo信息。TopoloyManager将它转换成ChangeMsg。这个消息通知给成员change_signal_,它是Signal<ChangeMsg>实例,用于通知其他成员,如node_manager_等。
- OnParticipantChange()将其他participant的名字保存在成员participant_names_中。
3.3 Manager
Manager基于Participant管理topology消息的收发。它的派生类包括NodeManager、ChannelManager等,分别管理Node、Channel等加入/离开消息的收发。
-
成员hostname_是主机名,process_id_是进程id。
-
成员是allowed_role_和change_type_分别保存当前Manager实例的角色和消息类型,比如NodeManager和ChannelManager分别有自己的类型。
-
成员publisher_ 是fastrtps::Publisher实例,subscriber_是fastrtps::Subscriber实例。这两个是fastrtps定义的类型。
-
成员listener_是SubscriberListener类型。它派生自fastrtps定义的类型fastrtps::SubscriberListener,用于从fastrtps接收消息。
-
消息到来时,调用虚拟成员函数SubscrierListener::onNewDataMessage()。这个函数调用成员callback_。在SubcriberListener的构造函数中,它已经指定为Manager::OnRemoteChange()。
-
在OnRemoteChange()中,调用Manager的虚拟成员函数Dispose()。Manager的派生类可以按照自己的方式处理消息,比如处理其他节点的join/leave消息。
-
成员函数Join()加入指定的role圈子。其中,
- 根据指定的role类型、属性构造ChangeMsg实例;
- 调用Dispose()通知自己;
- 调用Publish()通过fastrtps发布消息。
-
成员函数Leave()退出指定的圈子。
-
成员singal_是一个Signal实例,Notify()通过发送通知。AddChangeListner()将指定的回调函数接收这个通知,RemoveChangeListener()将指定的回调函数移出通知。
3.4 NodeManager
这里以NodeManager为例说明Manager的派生类实现。
- 成员nodes_保存从fastrtps消息创建的其他节点。这是一个NodeWareHouse实例。
- 在成员函数Dispose()中,根据收到消息类型是加入或离开,分别调用DisposeJoin()/DisposeLeave(),然后调用Notify()通知监听者。
- 在DisposeJoin()中,创建节点实例加入nodes_;在DisposeLeave()中,从成员nodes_中移除节点实例。
3.5 ChannelManager
ChannelManager是另一个Manager派生类的例子。
- 成员node_writers_是WriterWarehouse实例,保存从node到RoleWriter的多值映射;成员node_readers_则是从node到RoleReader的多值映射。
- 成员channel_writers_是从channel到RoleWriter的多值映射,成员channel_readers_是从channel到RoleReader的多值映射。
- RoleReader和RoleWriter是RoleBase的派生类,这里的WriterWarehouse/ReaderWarehouse保存RoleBase实例。
3.6 再说TopologyManager
再次回到TopologyManager。它基于Participant,创建NodeManager/ChannelManager实例。
- 成员participant_是Participant实例,listener_是ParticipantListener实例。成员node_manager_是NodeManager实例,channel_manager_是ChannelManager实例。
- 在TopologyManager的构造函数中,
- 创建node_manager_/channel_manager_;
- 调用CreateParticipant()创建participant_listener_,绑定成员函数OnParticipantChange()处理Participant Change消息;创建participant_。
- 调用功能InitNodeManager()/InitChannelManager(),启动node_manger_/channel_manager_的发现机制,也就是participant的收发功能。
- 在OnParticipantChange()中,调用成员change_signal_通知其中注册的回调函数。
3.7 Reader
关于Reader,(这里的读取数据部分,后面会再详细说明)
- 成员reader_func_ 是消息到达时的回调函数,由使用者指定。
- 成员receiver_ 是Receiver实例,负责接收数据。接收到消息时,它会调用reader_func_。
- 成员croutine_name_ 是使用的协程名。receiver_拿到消息,调用reader_func_,是这个协程执行的。
- 成员blocker_ 是Blocker实例。从receiver_得到的消息,也会推送一份保存在这里。
- 成员channel_manager_ 是全局唯一的ChannelManager实例,负责通知Reader的加入/离开。
3.8 Writer
关于Writer,(这里的写数据部分,后面会再详细说明)
- 成员transmitter_ 是Transmitter实例,负责发送数据。
- 成员channel_manager_ 是全局唯一的ChannelManager实例,负责通知Writer的加入/离开。
3.9 Node
Node定义cyber的节点。
- 成员readers_保存所有reader,这是一个topic name -> Reader实例的map
- 成员writers_保存所有writer,这是一个topic name -> Writer实例的map
- 成员node_channel_impl_是一个NodeChannelImpl实例,Node委托它处理各种任务,如创建Reader/Writer实例。
关于NodeChannelImpl,
- 成员node_manager_ 是NodeManager实例,它在NodeChannelImpl的构造函数中,从TopologyManager得到。
- 成员函数CreateReader()创建Reader实例。它有两个参数,RoleAttribute指定Reader属性,CbFunc指定接收消息的回调函数。
- 成员函数CreateWriter()创建Writer实例。
相关链接
百度 Apollo 8.0 Cyber 源代码分析(一)
百度 Apollo 8.0 Cyber 源代码分析(二)
百度 Apollo 8.0 Cyber 源代码分析(三)
百度 Apollo 8.0 Cyber 源代码分析(四)
百度 Apollo 8.0 Cyber 源代码分析(五)