nsq服务端源码探索一
项目结构欣赏
image.png- 1.通讯分析
nsq运行起来主要组件有3,分别是nsqlookupd,nsqd,nsqadmin。
官方文档快速开始:
nsqlookupd //启动nsqlookupd服务
nsqd --lookupd-tcp-address=127.0.0.1:4160 //启动nsqd服务
nsqadmin --lookupd-http-address=127.0.0.1:4161 //启动nsqadmin服务
curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test' //通过http的方式通讯并生产数据到topic为test中去
思考:为什么三个服务构成一个go的nsq消息服务中间件?
答:分工不同,其实核心是nsqd,所有生产者的消息都去nsqd了,所有消费者要消费的消息也是去nsqd拿的。那不是一个nsqd服务就搞定了?nsqd虽然承载了消息队列的核心,但是其自身为了更好的横向拓展,都是不直接跟外界交流通讯的(虽然也可以直接通讯但不推荐),nsqd会把自己的信息上报到nsqlookupd,意思就是所有的消费者和生产者都不会直接去找nsqd服务,带着自己要找的topic和channel去找nsqlookupd,然后nsqadminupd回复他们你的要找的topic在哪一个nsqd上,也就是返回给nsqd的连接信息,然后消费者或者生产者就拿着连接信息去进行tcp连接了。
- 2.服务管理包svc
nsqd用到的 svc 这个包来管理所有服务的,同时也统一了代码风格
1.program定义好后,我们在init里面初始化自己需要的服务代码
2.在start里面启动我们的业务服务
3.stop里面执行当我们的服务被kill的时候触发的执行代码(信号很多,这里只枚举了2种通用的)
func main() {
prg := &program{} //program结构体就是需要我们自己去定义,然后该结构体需要实现 start,stop,init等方法
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
logFatal("%s", err)
}
}
nsq那么多服务,使用svc那么服务管理风格就都一致了,没什么好说的。supervisorctl管理的服务记得配置下子进程终止的时间设置(stopwaitsecs=3600),不然stop函数需要执行1小时,supervisorctl可能就等不了那么久了.
- 3.code风格三板斧
nsqd的业务代码都在start里面
1.配置服务专属的配置对象,分2步,初始化自己的配置对象,然后把这个对象放到命令参数解析的函数中去改造,也就是配置对象有些值是可以被命令传参给覆盖
opts := nsqd.NewOptions() //初始化自己的配置对象
flagSet := nsqdFlagSet(opts) //试图使用服务启动的参数的值去改造opts的属性值,从而达到灵活配置效果
2.根据opts去new一个本次的业务对象nsqd,后面所有的业务细节的根就都在nsqd这个对象上了,典型的面向对象思维
nsqd, err := nsqd.New(opts)//获取nsqd对象
if err != nil {
logFatal("failed to instantiate nsqd - %s", err)
}
p.nsqd = nsqd
err = p.nsqd.LoadMetadata()//通过默认dat文件加载元数据,将dat记载的topic加载到内存,并去nsqadminupd那里更新channel的信息,有可能topic是属于别的nsqd上的,这就相当于一个加载最新的过程了
if err != nil {
logFatal("failed to load metadata - %s", err)
}
err = p.nsqd.PersistMetadata() //将最新的原数据写回到随机临时文件,写成功再mv为dat目标文件,这是一个技巧
if err != nil {
logFatal("failed to persist metadata - %s", err)
}
3.nsq的核心
func (n *NSQD) Main() error {
ctx := &context{n}
exitCh := make(chan error)
var once sync.Once
exitFunc := func(err error) {
once.Do(func() {
if err != nil {
n.logf(LOG_FATAL, "%s", err)
}
exitCh <- err
})
}
n.tcpServer.ctx = ctx
n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
})
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
})
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
})
}
n.waitGroup.Wrap(n.queueScanLoop)
n.waitGroup.Wrap(n.lookupLoop) //依次跟nsqaminupd进行tcp拨号连接
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}
err := <-exitCh
return err
}
1.Main这个函数其实作用就是先后启动http服务,https服务,tcp服务等。要知道每个服务可能就对应N个goroutine,从nsq这里我看到的是,main启动多个服务,main通过一定技巧达到管理这些服务,至于服务内部的细节(groutinue的分布用该服务自己管),也就是main管理的最小单元是服务,像极了操作系统资源分配最小单元进程,执行单元线程的这种意思了。
怎么做到的?
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
})
······
······
err := <-exitCh
一个服务的启动分二步
- 初始化,也就是new的过程
- 服务的启动放到 exitFunc 函数里面托管,而exitFunc又放到 Wrap函数里面托管
exitFunc 保证所有的服务只要有一个服务返回的err不为nil,Main函数就执行完毕了。说白了就是如果你对外提供的服务在你内部是N个小服务组装一起的,只有内部全部正常,我们才能继续使用它对外提供服务,同理只要有一个内部小服务不正常,主协程就得监控到(是退出报警,还是其他。。),这个exitFunc就在扮演这个角色,核心点是 所有的子服务自己异常了自协程主动发信号给主协程的通讯过程,跟我们主协程通过context主动通知子协程去通讯有着通讯方向的区别
Wrap 这个函数可以说是结合 exitFunc 使用的,主协程通过 exitFunc 已经知道 了有一个子协程的异常行为,但是其他小服务还是正常做自己的事情的(这也说明解耦得足够彻底),Wrap这个行为是赋予你的code有能力知道所有的子服务是否全部正常运行完毕的能力,这个信号为依据自然可以做一些主动去通知各个自服务关闭这种友好的退出的行为了