ceph

ceph网络库分析

2021-10-19  本文已影响0人  小跑001

1 介绍

    线程模型是一个程序的骨架,只有设计好了线程模型才能发挥程序的性能, 也只有理解了线程模型才能写好代码。理解ceph osd线程模型要从源头网络模块看起。

2 网络

    ceph osd的网络分为三种:async、simple、xio,分别对应src/msg的三个目录。根据14.2.16版本代码分析,默认的是async+posix的网络库, 实际使用的也是async库。可以通过命令查询:ceph config help ms_type ceph daemon osd.0 config get ms_type, 也可以查看函数:build_options()

async主要类图结构参考如下:

classDiagram 
    Messenger <|-- SimplePolicyMessenger
    SimplePolicyMessenger <|-- AsyncMessenger
    NetworkStack <|-- PosixNetworkStack
    Worker <|-- PosixWorker
    
    AsyncMessenger: -vector<Processor*>processors
    AsyncMessenger: -DispatchQueue dispath_queue
    
    Thread <|-- DispatchThread
    DispatchQueue: +DispatchThread dispatch_thread // 仅一个线程负责投递
    DispatchQueue: -PrioritizedQueue<QueueItem, uint64_t> mqueue
    
    ThreadPool::WorkQueue<Command> <-- CommandWQ
    ThreadPool: -_num_threads
    ThreadPool: +woker()
    
    OSD: -CommandWQ command_wq
    OSD: -ThreadPool command_tp
    OSD: -ShardThreadPool osd_op_tp
    
    
    Processor: +bind()
    Processor: +start()
    Processor: +accept() // callback by work thread
    Processor: -vector<ServerSocket> listen_sockets
    
    
    class EventCenter
    EventDriver <|-- EpollDriver
    
    EventCallback <|-- C_submit_event
    EventCallback <|-- C_handle_notify
    EventCallback <|-- C_processor_accept
    
    class ServerSocket
    ServerSocketImpl <|-- PosixServerSocketImpl
    
    class ConnectedSocket
    ConnectedSocketImpl <|-- PosixConnectedSocketImpl
    Connection  <|-- AsyncConnection
    
    Protocol <|-- ProtocolV2
    

a)、Messenger通过create函数创建不同类型的网络, 这里主要集中分析AsyncMessenger, 对应async+posix

b)、 NetworkStack封装了network集合,给每个network(PosixWorker)分配一个线程来处理,这里的线程对应posix线程,即PosixNetworkStack

c)、NetworkStack通过num_workers(配置ms_async_op_threads)控制network的数量,但是不能超过EventCenter::MAX_ENVENTCENTER = 24, 也就是最大网络线程数不能超过24个。

d)、Processor是用来负责监听端口,在dpdk里面需要在num_workers中的每个线程都需要监听,这里的async只需要在一个worker对应的线程(选取负载最小的)监听就好, processor负责一些端口初始化工作[先bind()后start()],后续的监听事件处理主要是通过worker来驱动并调用accept来处理新的连接。

e)、epoll说明:ceph里面的epoll里面的事件都会加上EPOLLET(参考EpollDriver::add_event), 即边缘触发:读的时候从空到非空,写的时候从满到非满才会触发, 且只触发一次。注意每次读都要读完,否则导致消息可能丢失获取读取不及时; 写是主动调用触发,当遇到空间满的时候要确保添加了epoll的可写事件,保证有空间后及时写入,否则也会导致写入丢失或者写入不及时。

f)、启动流程:

1. Messenger::create() --> AsyncMessenger::AsyncMessenger(): // 准备网络线程与epoll驱动
    1. workers: NetworkStack::NetworkStack(type) 
    2. threads: NetworkStack::create(type)  // 名字为msg-worker-${workerid}
    3. init vector<Processor*>processors
2. AsyncMessenger::bind -> Processor:bind: 绑定socket和地址,为Processor::start准备
3. Messenger::add_dispather_head
    3.1. if first in step 3, go to Messenger::ready() -> 
        |——3.1.1、Processor:start
            1. 注册listen_sockets的可读事件, 回掉函数:Processor::accept。
            2. 有连接进来, 回调accept接口, 通过listen_socket.accept来创建ConnectedSocket:ConnectedSocket实际是包装了PosixConnectedSocketImpl, 并创建连接AsyncConnection conn添加到AsyncMessenger msgr中来管理(通过msgr->add_accept(...)), 同时在 conn->accept中投递一个事件来监听消息。
            3. 步骤2中AsyncConnection conn 中的成员dispatch_queue实际是AsyncMessenger中的dispatch_queue的引用
            4. 步骤2中投递的额外事件是为了安全的创建fd的可读事件,并接受消息处理
            5. protocol->read_event(), 这里主要是实现是ProtocolV2, 逻辑从read_frame一直到handle_message()
            6. 判断消息是否可以fast_dispatch, 如果可以就调用fast_dispatch(), 如果不行则扔进dispatch_queue中
              1.fast_dispath流程:
                OSD::ms_fast_dispatch
                  |_OSD::dispatch_session_waiting()
                  |_OSD::enqueue_op
                  |_ShardedOpWQ::queue()
              2.dispatch_queue对应流程:参考3.1.2
            
        |——3.1.2、dispatch_queue.start():开启消息处理线程, 仅一个线程负责投递
            |_DispatchThread::entry()
                |_DispatchQue::entry
                    |_qitem.get_code:true
                    |_qitem.get_code:false
                        |_Messenger::ms_deliver_dispatch
                            |_Dispatcher::ms_dispatch2
                                |_OSD::ms_dispatch
                                    |_OSD::_dispatch
                                        |_case ...
                                        |_case MSG_COMMAND:
                                            |_OSD::command_wq.queue() // 扔进了command_wq, 根据command_tp构造显示就一个线程负责处理

参考

Ceph OSD处理客户端写操作处理流程: https://www.bianchengquan.com/article/503059.html

上一篇下一篇

猜你喜欢

热点阅读