golang学习篇章

nsq 的消息流转分析

2021-12-28  本文已影响0人  Best博客

nsq

带着一些问题,想看看nsq里面是怎么实现的

nsqd 都做了哪些事情

  1. 将自己的host信息注册到nsqlookupd
  2. 对外提供http/https/tcp三个协议服务
  3. 封装了自己的通讯协议(http,tcp最后都是走协议交互的)
  4. message 持久化
  5. topic,channel管理
  6. 接收客户端生产消息-- 消息流转 -- 消息push到消费者客户端
  7. 支持分布式部署,但各个node相互独立并无通讯,所以未做数据分片
  8. 消息顺序无序 (想改成有序?)

go-nsq 都做了哪些事情

  1. 通过topicName,channelName找nsqlookupd要到了所有nsqd,并全部建立tcp
  2. 告知nsqd消息每次push几条,超时时间,最大重试次数(MaxAttempts)等等(nsqd暴露能力,客户端连接时指定)
  3. 本地并发消费消息的work数量(每一条消息消费是串的,但是支持设置并发work数量)
  4. nsqlookupd告知消费的topic在新的nsqd出现了,会响应并建立新的tcp监听

nsqlookupd 都做了哪些事情

  1. 负责接收处理nsqd的注册信息
  2. 负责接收处理nsqd的unregister消息
  3. 负责响应go-nsq 通过topic,channel 索取 nsqd的响应(doLookup)
  4. 负责推送新增或变更topic的nsqd信息到go-nsq
  5. 支持集群部署(每一个nsqlookupd都能单独对外提供所有服务,这根分布式不同,分布式是所有node加起来对外提供整体服务,也就是每个节点只是整体服务的一小部分)

有关nsq无序改变成有序需要考虑的一些事

  1. go-nsqd消费的指定topic消息可能是来自于多个nsqd的,所以要优先保证topic只存在于一个nsqd中(业务对于该topic指定nsqd即可)
  2. go-nsqd 在连接nsqd的时候会设置每次push(nsq是push模式)过来的消息条数,当然这个MaxInFlight默认为1,你要注意这个值非1,然后本地work也大于1的时候,有序也无法保证了
  3. go-nsqd 会设置MaxAttempts,消息消费超时后的重复次数,服务端一超时就会从inflight移除,重新put走消息push流程,这也是个能影响到消息插队的因素。 保持简单我们可以通过设置业务单条msg消费的最长时间,nsq认为100%成功消费, 其他由业务方案保证(持久,增加work。。。)
  4. nsqd 持久化是将内存队列消息存入磁盘,但消息下发流程是内存chan,磁盘chan在一个select的io多路复用上的,所以最求有序就将消息流转设置成100%磁盘chan(非内存模式)模式吧

服务管理

前面说了nsq对外起了http,https,tcp三个服务,每个服务的start,Init,stop是通过github.com/judwhite/go-svc 这个包统一管理的

package main

import (
    "sync"
    "github.com/judwhite/go-svc"
)

//一个服务启动总不是分为几部
//1.服务启动前的准备工作 Init()
// 2. 服务运行的核心
// 3. 服务被kill 的时候的退出工作 Stop()
type program struct {
    wg   sync.WaitGroup
    quit chan struct{}
}
func main() {
    prg := &program{}
    if err := svc.Run(prg); err != nil {
        log.Fatal(err)
    }
}

func (p *program) Init(env svc.Environment) error {
    return nil
}

func (p *program) Start() error {
    return nil
}

func (p *program) Stop() error {
//被kill,github.com/judwhite/go-svc帮你监听了系统信号,并触发这个stop
//记得sync.Once避免多次收到的幂等性
    return nil
}

nsqd消息持久化

//持久化的数据先写新文件,成功后mv修改文件名
    err := p.nsqd.LoadMetadata()
    if err != nil {
        logFatal("failed to load metadata - %s", err)
    }
    err = p.nsqd.PersistMetadata()
    if err != nil {
        logFatal("failed to persist metadata - %s", err)
    }

消息流转

  1. 客户端publish消息到nsqd, 这里分析下http方式

//路由定义,参数定里面要求带必要参数 ,topicname,message等

1. router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
  1. 通过topicname获取topic对象
reqParams, topic, err := s.getTopicFromQuery(req)
  1. 根据本次的topic以及消息信息,获取message对象
    msg := NewMessage(topic.GenerateID(), body)
  1. 使用topic对象提供的put方法将消息进行投递到内存chan或者磁盘chan
    err = topic.PutMessage(msg) // 投递 PutMessage的实现见下文
    if err != nil {
        return nil, http_api.Err{503, "EXITING"}
    }

// 投递消息的实现
func (t *Topic) PutMessage(m *Message) error {
    t.RLock()
    defer t.RUnlock()
    if atomic.LoadInt32(&t.exitFlag) == 1 {
        return errors.New("exiting")
    }
    err := t.put(m) // 投递消息到内存chan或者 磁盘中的实现
    if err != nil {
        return err
    }
    atomic.AddUint64(&t.messageCount, 1)//topic全局对象记录消息又加了一个
    atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))// 记录topic下的消息总字节数
    return nil
}

func (t *Topic) put(m *Message) error {
    select {
//优先写入内存chan,这个内存队列会被NewTopic时候起的gotoutinue给for select 监控消费,
//然后又入 topic的所有channel的内存chan
    case t.memoryMsgChan <- m:
    default:
        err := writeMessageToBackend(m, t.backend) //写入磁盘的方法实现
        t.nsqd.SetHealth(err)
        if err != nil {
            t.nsqd.logf(LOG_ERROR,
                "TOPIC(%s) ERROR: failed to write message to backend - %s",
                t.name, err)
            return err
        }
    }
    return nil
}
//写入磁盘
//1.定义格式
//2.写入 diskQueue 这个抽象,这里面有自己的ioloop进行异步,缓冲,批量刷盘
func writeMessageToBackend(msg *Message, bq BackendQueue) error {
    buf := bufferPoolGet()
    defer bufferPoolPut(buf)
    _, err := msg.WriteTo(buf)
    if err != nil {
        return err
    }
    return bq.Put(buf.Bytes())
}

// 消息被持久化到磁盘的格式
// message format:
// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
// |       (int64)        ||    ||      (hex string encoded in ASCII)           || (binary)
// |       8-byte         ||    ||                 16-byte                      || N-byte
// ------------------------------------------------------------------------------------------...
//   nanosecond timestamp    ^^                   message ID                       message body
//                        (uint16)
//                         2-byte
//                        attempts(重试次数)
func decodeMessage(b []byte) (*Message, error) {
    var msg Message

    if len(b) < minValidMsgLength {
        return nil, fmt.Errorf("invalid message buffer size (%d)", len(b))
    }

    msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
    msg.Attempts = binary.BigEndian.Uint16(b[8:10])
    copy(msg.ID[:], b[10:10+MsgIDLength])
    msg.Body = b[10+MsgIDLength:]

    return &msg, nil
}


  1. 消息从topic的chan流转到topic关联的所有channel的队列中
1. NewTopic生成topic对象的时候就起了goroutinue监控
// Topic constructor
func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic {
        ......
       ......  此处省略对象初始化的其他代码
      ......
    t.waitGroup.Wrap(t.messagePump) //这个 messagePump 是消息流转泵

    t.nsqd.Notify(t, !t.ephemeral)

    return t
}

// messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic
func (t *Topic) messagePump() {
        ......
       ......  此处省略其他代码
      ......

    // main message loop
    for {
        select {
        case msg = <-memoryMsgChan:
        case buf = <-backendChan:
            msg, err = decodeMessage(buf)
                    ......
        case <-t.channelUpdateChan:
                ......
            continue
        case <-t.pauseChan:
                ......
            continue
        case <-t.exitChan:
            goto exit
        }

                //往topic对应的所有channel的内存队列写
        for i, channel := range chans {
                ......
            if chanMsg.deferred != 0 {
                                //nsq 还支持延时消息队列,就是这里支持的
                channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
                continue
            }
            err := channel.PutMessage(chanMsg)
            if err != nil {
                t.nsqd.logf(LOG_ERROR,
                    "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                    t.name, msg.ID, channel.name, err)
            }
        }
    }

exit:
    t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}
  1. 消息从channel推送到各个消费客户端
1. nsqd通过Main方法进行服务启动的
func (n *NSQD) Main() error {
        ......

    n.waitGroup.Wrap(func() {
        exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) //TCPServer 这个便是与客户端进行tcp连接的逻辑了,push数据很定也是这里负责最终交付流程了
    })
    if n.httpListener != nil {
        httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
        n.waitGroup.Wrap(func() {
            exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
        })
    }
    if n.httpsListener != nil {
        httpsServer := newHTTPServer(n, true, true)
        n.waitGroup.Wrap(func() {
            exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
        })
    }

    n.waitGroup.Wrap(n.queueScanLoop)    //消息超时,重试等监控业务流程
    n.waitGroup.Wrap(n.lookupLoop)    //注册中心业务流程
    if n.getOpts().StatsdAddress != "" {
        n.waitGroup.Wrap(n.statsdLoop)
    }

    err := <-exitCh
    return err
}


2. 客户端连接管理
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
        ......
    for {
        clientConn, err := listener.Accept()//连接事件,拿到连接客户端
        if err != nil {
                        ......
            break
        }
                 ......
        go func() {
            handler.Handle(clientConn)//为每个客户端起一个goroutinue,进行协议对接
            wg.Done()
        }()
    }
        ......
    return nil
}
2. 处理客户端tcp发送过来的各种指令(其实就是前面说到的nsq自定义协议)
func (p *tcpServer) Handle(conn net.Conn) {
  ......
    client := prot.NewClient(conn)
    p.conns.Store(conn.RemoteAddr(), client)
    err = prot.IOLoop(client)    //起for 进行不断的监听客户端发送的包,并解析成协议对应指令
    if err != nil {
        p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err)
    }

    p.conns.Delete(conn.RemoteAddr())
    client.Close()
}

// 监听获取数据包,解析成cmd协议指令
func (p *protocolV2) IOLoop(c protocol.Client) error {

    client := c.(*clientV2)
    go p.messagePump(client, messagePumpStartedChan)//消息泵,负责接收channel对应的队列消息,并推送给客户端
    <-messagePumpStartedChan

      //下面的for是用于鉴定客户端的数据包,然后将其解析成cmd指定,最后执行
    for {
        line = line[:len(line)-1]
        if len(line) > 0 && line[len(line)-1] == '\r' {
            line = line[:len(line)-1]
        }
        params := bytes.Split(line, separatorBytes)   //协议解析
        var response []byte
        response, err = p.Exec(client, params)  //命令执行
               ......
}

3. channel的消息推送给客户端消费者
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
 ......
 ......
    for {
        if subChannel == nil || !client.IsReadyForMessages() {
            // the client is not ready to receive messages...
            memoryMsgChan = nil
            backendMsgChan = nil
            flusherChan = nil
            // force flush
            client.writeLock.Lock()
            err = client.Flush()
            client.writeLock.Unlock()
            if err != nil {
                goto exit
            }
            flushed = true
        } else if flushed {
            // last iteration we flushed...
            // do not select on the flusher ticker channel
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = nil
        } else {
            // we're buffered (if there isn't any more data we should flush)...
            // select on the flusher ticker channel, too
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = outputBufferTicker.C
        }

        select {
        case <-flusherChan:
                       ......
        case <-client.ReadyStateChan:
        case subChannel = <-subEventChan:
         ......
        case identifyData := <-identifyEventChan:
                 ......
        case <-heartbeatChan:
                 ......
        case b := <-backendMsgChan://磁盘chan的消息消费
            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }

            msg, err := decodeMessage(b)
            if err != nil {
                p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
                continue
            }
            msg.Attempts++//重试次数累加,这个值会被发送客户端,客户端发现到达最大值就直接回复finish,达到丢弃效果,所以服务端只是傻瓜式,有消费者客户端决定消息丢弃不,不丢弃就继续。。。

            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            err = p.SendMessage(client, msg)//把消息推送到客户端
            if err != nil {
                goto exit
            }
            flushed = false
        case msg := <-memoryMsgChan://channel的内存队列消息
            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }
            msg.Attempts++

            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            err = p.SendMessage(client, msg)
            if err != nil {
                goto exit
            }
            flushed = false
        case <-client.ExitChan:
            goto exit
        }
    }

exit:
    p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
    heartbeatTicker.Stop()
    outputBufferTicker.Stop()
    if err != nil {
        p.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
    }
}


总结

1.平时还是调用服务居多,如果自己写个tcp服务对外,那岂不是自己可以把它的协议封装,解析,沾包处理拿过来用么。

  1. nsq在处理消息流转,消息输入到输出看做一个io模型的话,那nsq还是做了很多事情的,比如持久化,相互独立的分布式,io多路复用的编程思想,重试机制
  2. nsq其实是多个子服务组装而成的,代码层级维护也可以细品品,如果实现功能的同时保持简单。。
  3. nsq的很多设置都可以通过启动命令行参数指定,这个cli到代码config这个过程还是挺简单灵活的

参考文献:

http://doc.yonyoucloud.com/doc/wiki/project/nsq-guide/quick_start.html
nsq客户端
消费流程

上一篇下一篇

猜你喜欢

热点阅读