ceph网络库分析
1 介绍
线程模型是一个程序的骨架,只有设计好了线程模型才能发挥程序的性能, 也只有理解了线程模型才能写好代码。理解ceph osd线程模型要从源头网络模块看起。
2 网络
ceph osd的网络分为三种:async、simple、xio,分别对应src/msg的三个目录。根据14.2.16版本代码分析,默认的是async+posix的网络库, 实际使用的也是async库。可以通过命令查询:ceph config help ms_type ceph daemon osd.0 config get ms_type
, 也可以查看函数:build_options()
。
async主要类图结构参考如下:
classDiagram
Messenger <|-- SimplePolicyMessenger
SimplePolicyMessenger <|-- AsyncMessenger
NetworkStack <|-- PosixNetworkStack
Worker <|-- PosixWorker
AsyncMessenger: -vector<Processor*>processors
AsyncMessenger: -DispatchQueue dispath_queue
Thread <|-- DispatchThread
DispatchQueue: +DispatchThread dispatch_thread // 仅一个线程负责投递
DispatchQueue: -PrioritizedQueue<QueueItem, uint64_t> mqueue
ThreadPool::WorkQueue<Command> <-- CommandWQ
ThreadPool: -_num_threads
ThreadPool: +woker()
OSD: -CommandWQ command_wq
OSD: -ThreadPool command_tp
OSD: -ShardThreadPool osd_op_tp
Processor: +bind()
Processor: +start()
Processor: +accept() // callback by work thread
Processor: -vector<ServerSocket> listen_sockets
class EventCenter
EventDriver <|-- EpollDriver
EventCallback <|-- C_submit_event
EventCallback <|-- C_handle_notify
EventCallback <|-- C_processor_accept
class ServerSocket
ServerSocketImpl <|-- PosixServerSocketImpl
class ConnectedSocket
ConnectedSocketImpl <|-- PosixConnectedSocketImpl
Connection <|-- AsyncConnection
Protocol <|-- ProtocolV2
a)、Messenger通过create函数创建不同类型的网络, 这里主要集中分析AsyncMessenger, 对应async+posix
b)、 NetworkStack封装了network集合,给每个network(PosixWorker)分配一个线程来处理,这里的线程对应posix线程,即PosixNetworkStack
c)、NetworkStack通过num_workers(配置ms_async_op_threads)控制network的数量,但是不能超过EventCenter::MAX_ENVENTCENTER = 24, 也就是最大网络线程数不能超过24个。
d)、Processor是用来负责监听端口,在dpdk里面需要在num_workers中的每个线程都需要监听,这里的async只需要在一个worker对应的线程(选取负载最小的)监听就好, processor负责一些端口初始化工作[先bind()后start()],后续的监听事件处理主要是通过worker来驱动并调用accept来处理新的连接。
e)、epoll说明:ceph里面的epoll里面的事件都会加上EPOLLET(参考EpollDriver::add_event), 即边缘触发:读的时候从空到非空,写的时候从满到非满才会触发, 且只触发一次。注意每次读都要读完,否则导致消息可能丢失获取读取不及时; 写是主动调用触发,当遇到空间满的时候要确保添加了epoll的可写事件,保证有空间后及时写入,否则也会导致写入丢失或者写入不及时。
f)、启动流程:
1. Messenger::create() --> AsyncMessenger::AsyncMessenger(): // 准备网络线程与epoll驱动
1. workers: NetworkStack::NetworkStack(type)
2. threads: NetworkStack::create(type) // 名字为msg-worker-${workerid}
3. init vector<Processor*>processors
2. AsyncMessenger::bind -> Processor:bind: 绑定socket和地址,为Processor::start准备
3. Messenger::add_dispather_head
3.1. if first in step 3, go to Messenger::ready() ->
|——3.1.1、Processor:start
1. 注册listen_sockets的可读事件, 回掉函数:Processor::accept。
2. 有连接进来, 回调accept接口, 通过listen_socket.accept来创建ConnectedSocket:ConnectedSocket实际是包装了PosixConnectedSocketImpl, 并创建连接AsyncConnection conn添加到AsyncMessenger msgr中来管理(通过msgr->add_accept(...)), 同时在 conn->accept中投递一个事件来监听消息。
3. 步骤2中AsyncConnection conn 中的成员dispatch_queue实际是AsyncMessenger中的dispatch_queue的引用
4. 步骤2中投递的额外事件是为了安全的创建fd的可读事件,并接受消息处理
5. protocol->read_event(), 这里主要是实现是ProtocolV2, 逻辑从read_frame一直到handle_message()
6. 判断消息是否可以fast_dispatch, 如果可以就调用fast_dispatch(), 如果不行则扔进dispatch_queue中
1.fast_dispath流程:
OSD::ms_fast_dispatch
|_OSD::dispatch_session_waiting()
|_OSD::enqueue_op
|_ShardedOpWQ::queue()
2.dispatch_queue对应流程:参考3.1.2
|——3.1.2、dispatch_queue.start():开启消息处理线程, 仅一个线程负责投递
|_DispatchThread::entry()
|_DispatchQue::entry
|_qitem.get_code:true
|_qitem.get_code:false
|_Messenger::ms_deliver_dispatch
|_Dispatcher::ms_dispatch2
|_OSD::ms_dispatch
|_OSD::_dispatch
|_case ...
|_case MSG_COMMAND:
|_OSD::command_wq.queue() // 扔进了command_wq, 根据command_tp构造显示就一个线程负责处理
参考
Ceph OSD处理客户端写操作处理流程: https://www.bianchengquan.com/article/503059.html