nsq源码(8) 生产和消费流程
2019-02-13 本文已影响5人
Linrundong
总览nsqd、nsqlookupd的启动过程,消息的生产到消费的过程
绑定注册
nsqd --lookupd-tcp-address=127.0.0.1:4160
- 当nsqd带参数启动时,除了会处理客户端的请求,还会启动一个线程去注册nsqlookupd
graph LR
nsqd--注册-->nsqlookupd
func (n *NSQD) Main() {
// 超时消息检索和处理任务
n.waitGroup.Wrap(n.queueScanLoop)
// 根据参数选择注册中心nsqlookupd
n.waitGroup.Wrap(n.lookupLoop)
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}
}
发布生产和消费订阅
- 通过nsqd的http接口生产消息:
curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
- 启动消费者客户端:
nsq_to_file --topic=test --output-dir=/tmp --channel=chan --lookupd-http-address=127.0.0.1:4161
- 生产消费流程图:
graph LR
nsq_to_file --请求-->nsqlookupd
nsqlookupd --返回分配的nsqd生产者信息-->nsq_to_file
graph LR
生产者--生产发布消息-->nsqd
nsqd --持续为消息寻找subchan--> nsq_to_file
nsq_to_file --连接并消费订阅指定topic-->nsqd