NSQD 源码分析

2019-07-28  本文已影响0人  威廉姆韦德惠达王

启动流程

  1. 利用 svc 创建一个常驻进程
  2. 读取配置,优先级:命令行、配置文件、默认配置
  3. 实例化 nsqd,初始化日志、http client,数据文件锁等
  4. 读取元数据,就是数组的 topics 并且包含里面的 channels,每个都有是否暂停的标志位
  5. 协程启动 tcp server, http[s] server , queueScanLoop, lookupLoop,statsdLoop
  6. 等待信号执行关闭 tcp server 和 http & https server,持久化元数据(topic & channel & version,存储格式 json)

TCP Server

  1. 消费者:客户端初始化,开始 messagePump 到订阅的 channel
  2. 生产者:客户端初始化,推消息,创建 topic,topic 内部 messagePump 发往 channel
  1. IDENTIFY: 客户端连接上后,标识自己,同时得到服务端的信息
  2. FIN: 消息顺利处理完了,里面会 in-flight 的消息清理掉,rdy 加1, 还有其他统计数据
  3. RDY: 流控 客户端告知我有能力处理多少个消息,服务器端就浪多少呗
  4. REQ: 消息重新进队列
  5. PUB, MPUB, DPUB: 写消息, 优先写内存队列,写不进写 backend 队列,使用 select 模式,新的 topic 会 messagePump,很简单从内存和磁盘读,循环 channel 发送 msg,每个客户端的 messagePump 获取
  6. NOP:空指令什么也不干
  7. TOUCH: 重置消息超时时间, pop queue 放到最后去,受到 max msg timeout 限制
  8. SUB:订阅命令
  9. CLS: 关闭
# 消息格式,用于磁盘存储
[x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
|       (int64)        ||    ||      (hex string encoded in ASCII)           || (binary)
|       8-byte         ||    ||                 16-byte                      || N-byte
------------------------------------------------------------------------------------------...
nanosecond timestamp      ^^                   message ID                       message body                        
                       (uint16)
                        2-byte
                       attempts

HTTP Server

  1. /ping 健康检查
  2. /info 打印一些配置信息,主机名,版本号,启动时间
  3. /pub & /mpub 生产消息,写内存,如果超配置数量写磁盘,没有消费者连接写磁盘
  4. /stats 统计信息,很详细
  5. /topic/create 创建 topic,内部会从 lookupd 同步 channel
  6. /topic/delete 删除 topic,未消费的消息存入磁盘,循环 channels,未消费的消息存入磁盘
  7. /topic/empty 清空 topic,内存和磁盘都清空
  8. /topic/pause & /topic/unpaus 中断或开启 topic,内部其实就是把两个管道架空,重新存储下元数据
  9. /channel/* channel 的创建,删除,清空,中断,开启。和 topic 类似就是换个结构体
  10. /config/:opt 取得配置信息,动态修改一些配置,nslookup、日志级别等
  11. /debug/pprof pprof debug 信息

queueScanLoop

lookupLoop

statsdLoop

上一篇下一篇

猜你喜欢

热点阅读