nsq服务端源码探索一

2020-05-29  本文已影响0人  Best博客

nsq官方文档

项目结构欣赏

image.png
nsqlookupd    //启动nsqlookupd服务
nsqd --lookupd-tcp-address=127.0.0.1:4160   //启动nsqd服务
nsqadmin --lookupd-http-address=127.0.0.1:4161   //启动nsqadmin服务
curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'  //通过http的方式通讯并生产数据到topic为test中去

思考:为什么三个服务构成一个go的nsq消息服务中间件?
答:分工不同,其实核心是nsqd,所有生产者的消息都去nsqd了,所有消费者要消费的消息也是去nsqd拿的。那不是一个nsqd服务就搞定了?nsqd虽然承载了消息队列的核心,但是其自身为了更好的横向拓展,都是不直接跟外界交流通讯的(虽然也可以直接通讯但不推荐),nsqd会把自己的信息上报到nsqlookupd,意思就是所有的消费者和生产者都不会直接去找nsqd服务,带着自己要找的topic和channel去找nsqlookupd,然后nsqadminupd回复他们你的要找的topic在哪一个nsqd上,也就是返回给nsqd的连接信息,然后消费者或者生产者就拿着连接信息去进行tcp连接了。

func main() {
    prg := &program{}     //program结构体就是需要我们自己去定义,然后该结构体需要实现 start,stop,init等方法
    if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
        logFatal("%s", err)
    }
}

nsq那么多服务,使用svc那么服务管理风格就都一致了,没什么好说的。supervisorctl管理的服务记得配置下子进程终止的时间设置(stopwaitsecs=3600),不然stop函数需要执行1小时,supervisorctl可能就等不了那么久了.

    opts := nsqd.NewOptions() //初始化自己的配置对象
    flagSet := nsqdFlagSet(opts) //试图使用服务启动的参数的值去改造opts的属性值,从而达到灵活配置效果

2.根据opts去new一个本次的业务对象nsqd,后面所有的业务细节的根就都在nsqd这个对象上了,典型的面向对象思维

  nsqd, err := nsqd.New(opts)//获取nsqd对象
  if err != nil {
      logFatal("failed to instantiate nsqd - %s", err)
  }
  p.nsqd = nsqd

  err = p.nsqd.LoadMetadata()//通过默认dat文件加载元数据,将dat记载的topic加载到内存,并去nsqadminupd那里更新channel的信息,有可能topic是属于别的nsqd上的,这就相当于一个加载最新的过程了
  if err != nil {
      logFatal("failed to load metadata - %s", err)
  }
  err = p.nsqd.PersistMetadata() //将最新的原数据写回到随机临时文件,写成功再mv为dat目标文件,这是一个技巧
  if err != nil {
      logFatal("failed to persist metadata - %s", err)
  }

3.nsq的核心

func (n *NSQD) Main() error {
    ctx := &context{n}

    exitCh := make(chan error)
    var once sync.Once
    exitFunc := func(err error) {
        once.Do(func() {
            if err != nil {
                n.logf(LOG_FATAL, "%s", err)
            }
            exitCh <- err
        })
    }

    n.tcpServer.ctx = ctx
    n.waitGroup.Wrap(func() {
        exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
    })

    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
        exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
    })

    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        httpsServer := newHTTPServer(ctx, true, true)
        n.waitGroup.Wrap(func() {
            exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
        })
    }

    n.waitGroup.Wrap(n.queueScanLoop)
    n.waitGroup.Wrap(n.lookupLoop)  //依次跟nsqaminupd进行tcp拨号连接
    if n.getOpts().StatsdAddress != "" {
        n.waitGroup.Wrap(n.statsdLoop)
    }

    err := <-exitCh
    return err
}

1.Main这个函数其实作用就是先后启动http服务,https服务,tcp服务等。要知道每个服务可能就对应N个goroutine,从nsq这里我看到的是,main启动多个服务,main通过一定技巧达到管理这些服务,至于服务内部的细节(groutinue的分布用该服务自己管),也就是main管理的最小单元是服务,像极了操作系统资源分配最小单元进程,执行单元线程的这种意思了。

怎么做到的?
    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
        exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
    })
        ······
        ······
    err := <-exitCh

一个服务的启动分二步

  1. 初始化,也就是new的过程
  2. 服务的启动放到 exitFunc 函数里面托管,而exitFunc又放到 Wrap函数里面托管

exitFunc 保证所有的服务只要有一个服务返回的err不为nil,Main函数就执行完毕了。说白了就是如果你对外提供的服务在你内部是N个小服务组装一起的,只有内部全部正常,我们才能继续使用它对外提供服务,同理只要有一个内部小服务不正常,主协程就得监控到(是退出报警,还是其他。。),这个exitFunc就在扮演这个角色,核心点是 所有的子服务自己异常了自协程主动发信号给主协程的通讯过程,跟我们主协程通过context主动通知子协程去通讯有着通讯方向的区别

Wrap 这个函数可以说是结合 exitFunc 使用的,主协程通过 exitFunc 已经知道 了有一个子协程的异常行为,但是其他小服务还是正常做自己的事情的(这也说明解耦得足够彻底),Wrap这个行为是赋予你的code有能力知道所有的子服务是否全部正常运行完毕的能力,这个信号为依据自然可以做一些主动去通知各个自服务关闭这种友好的退出的行为了

上一篇下一篇

猜你喜欢

热点阅读