golang学习篇章

nsq客户端源码分析

2020-05-17  本文已影响0人  Best博客

nsq为生产环境的消息中间,遭遇消息重复消费,思索客户端与服务端是怎么通过 REQ 与 FIN 通讯的(客户端回复:REQ 重新入队,FIN 成功消费)。后面发现这个重复消费的问题还得结合nsq服务端启动配置的参数结合分析才行。
https://github.com/nsqio/nsq

1.分析nsq客户端时怎么接收nsqd服务端的消息的

//以下是客户端包源代码文档demo

type myMessageHandler struct {}

func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
    if len(m.Body) == 0 {
        // Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
        return nil
    }
    err := processMessage(m.Body)
    // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
    return err
}

func main() {
    // Instantiate a consumer that will subscribe to the provided channel.
    config := nsq.NewConfig()  //得到配置consumer的配置对象
    consumer, err := nsq.NewConsumer("topic", "channel", config)   //得到consumer对象
    if err != nil {
        log.Fatal(err)
    }

    // Set the Handler for messages received by this Consumer. Can be called multiple times.
    // See also AddConcurrentHandlers.
    consumer.AddHandler(&myMessageHandler{})  //设置接收tcp传送过来的
//nsqd的消息,此方法里面 handlerLoop 一个for循环就 直接对接一个 incomingMessages 的channel了,达到解耦的目的. 并且会根据 err是否为nil去触发不同的事件,达到往channel写进不同的值(就是不同值的cmd结构体,req与fin)

    // Use nsqlookupd to discover nsqd instances.
    // See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
    err = consumer.ConnectToNSQLookupd("localhost:4161")
//连接客户端与服务端nsq,前面都是配置属性和准备好方法,这里才是通信的起点,golang的channel秒到了解耦真是方便,你看还没连接服务器,就可以把消费服务器的消费者方法先执行起来,这个连接方法里面除连接外,就是tcp的readLoop与writeLoop 套路了,跟之前了解到的纯websocket的套路大同小异
    if err != nil {
        log.Fatal(err)
    }

    // Gracefully stop the consumer.
    consumer.Stop()
}

//1.以上是一个nsq消费者 初始化config对象(可通过自定义配置文件去控制)
//2.根据config对象去初始化 一个  consumer对象,这才是重点,
//2.1: 根据config初始化的对象达到了大部分consumer属性值的控制(也就是行为控制),但是我们还需要自己自定义一个结构体(该结构体得具备HandleMessage方法),说白了就是闭包去初始化consumer需要灵活配置的方法。
//2.2: 这个 HandleMessage 方法是用来处理nsqd推送过来的消息的,这个方法返回err为 nil 则表示消费成功,err不为nil则表示消费不成功,那么该消息会重新入nsqd队列。
// 2.3:  在2.2中说的都是结果,以下就是来分析返回的 err为nil与非nil 时候  nsq客户端分别给服务端nsqd 返回的 (REQ与FIN)是怎么来的

整理流程:

nsq.NewConfig() 初始化配置对象

nsq.NewConsumer 初始化消费者对象

nsqConsumer.AddConcurrentHandlers 初始化消费者消费用来处理nsqd推送过来消息的处理器handle

nsqConsumer.ConnectToNSQD(mqHost)
1.conn := NewConn(addr, &r.config, &consumerConnDelegate{r}) 初始化一个conn对象

  1. resp, err := conn.Connect() 这里面建立tcp与nsqd的连接,
    2.1. go c.readLoop() 接收nsqd推送过来的信息,并 r.incomingMessages <- msg 往这个channel里面推送,这个channel就是上面的 handle 的channel消费的channel了,如此nsqd到消费者这条路就通了
    2.2. go c.writeLoop() 监控 case resp := <-c.msgResponseChan: 也就是 c.msgResponseChan的channel,并根据resp不同触发不同的回复事件,resp的不同由我们上面定义handle的返回值确定,最终就是回复nsqd的是REQ还是FIN
上一篇下一篇

猜你喜欢

热点阅读