nsq 的消息流转分析
2021-12-28 本文已影响0人
Best博客
nsq
带着一些问题,想看看nsq里面是怎么实现的
nsqd 都做了哪些事情
- 将自己的host信息注册到nsqlookupd
- 对外提供http/https/tcp三个协议服务
- 封装了自己的通讯协议(http,tcp最后都是走协议交互的)
- message 持久化
- topic,channel管理
- 接收客户端生产消息-- 消息流转 -- 消息push到消费者客户端
- 支持分布式部署,但各个node相互独立并无通讯,所以未做数据分片
- 消息顺序无序 (想改成有序?)
go-nsq 都做了哪些事情
- 通过topicName,channelName找nsqlookupd要到了所有nsqd,并全部建立tcp
- 告知nsqd消息每次push几条,超时时间,最大重试次数(MaxAttempts)等等(nsqd暴露能力,客户端连接时指定)
- 本地并发消费消息的work数量(每一条消息消费是串的,但是支持设置并发work数量)
- nsqlookupd告知消费的topic在新的nsqd出现了,会响应并建立新的tcp监听
nsqlookupd 都做了哪些事情
- 负责接收处理nsqd的注册信息
- 负责接收处理nsqd的unregister消息
- 负责响应go-nsq 通过topic,channel 索取 nsqd的响应(doLookup)
- 负责推送新增或变更topic的nsqd信息到go-nsq
- 支持集群部署(每一个nsqlookupd都能单独对外提供所有服务,这根分布式不同,分布式是所有node加起来对外提供整体服务,也就是每个节点只是整体服务的一小部分)
有关nsq无序改变成有序需要考虑的一些事
- go-nsqd消费的指定topic消息可能是来自于多个nsqd的,所以要优先保证topic只存在于一个nsqd中(业务对于该topic指定nsqd即可)
- go-nsqd 在连接nsqd的时候会设置每次push(nsq是push模式)过来的消息条数,当然这个MaxInFlight默认为1,你要注意这个值非1,然后本地work也大于1的时候,有序也无法保证了
- go-nsqd 会设置MaxAttempts,消息消费超时后的重复次数,服务端一超时就会从inflight移除,重新put走消息push流程,这也是个能影响到消息插队的因素。 保持简单我们可以通过设置业务单条msg消费的最长时间,nsq认为100%成功消费, 其他由业务方案保证(持久,增加work。。。)
- nsqd 持久化是将内存队列消息存入磁盘,但消息下发流程是内存chan,磁盘chan在一个select的io多路复用上的,所以最求有序就将消息流转设置成100%磁盘chan(非内存模式)模式吧
服务管理
前面说了nsq对外起了http,https,tcp三个服务,每个服务的start,Init,stop是通过github.com/judwhite/go-svc
这个包统一管理的
package main
import (
"sync"
"github.com/judwhite/go-svc"
)
//一个服务启动总不是分为几部
//1.服务启动前的准备工作 Init()
// 2. 服务运行的核心
// 3. 服务被kill 的时候的退出工作 Stop()
type program struct {
wg sync.WaitGroup
quit chan struct{}
}
func main() {
prg := &program{}
if err := svc.Run(prg); err != nil {
log.Fatal(err)
}
}
func (p *program) Init(env svc.Environment) error {
return nil
}
func (p *program) Start() error {
return nil
}
func (p *program) Stop() error {
//被kill,github.com/judwhite/go-svc帮你监听了系统信号,并触发这个stop
//记得sync.Once避免多次收到的幂等性
return nil
}
nsqd消息持久化
//持久化的数据先写新文件,成功后mv修改文件名
err := p.nsqd.LoadMetadata()
if err != nil {
logFatal("failed to load metadata - %s", err)
}
err = p.nsqd.PersistMetadata()
if err != nil {
logFatal("failed to persist metadata - %s", err)
}
消息流转
- 客户端publish消息到nsqd, 这里分析下http方式
//路由定义,参数定里面要求带必要参数 ,topicname,message等
1. router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
- 通过topicname获取topic对象
reqParams, topic, err := s.getTopicFromQuery(req)
- 根据本次的topic以及消息信息,获取message对象
msg := NewMessage(topic.GenerateID(), body)
- 使用topic对象提供的put方法将消息进行投递到内存chan或者磁盘chan
err = topic.PutMessage(msg) // 投递 PutMessage的实现见下文
if err != nil {
return nil, http_api.Err{503, "EXITING"}
}
// 投递消息的实现
func (t *Topic) PutMessage(m *Message) error {
t.RLock()
defer t.RUnlock()
if atomic.LoadInt32(&t.exitFlag) == 1 {
return errors.New("exiting")
}
err := t.put(m) // 投递消息到内存chan或者 磁盘中的实现
if err != nil {
return err
}
atomic.AddUint64(&t.messageCount, 1)//topic全局对象记录消息又加了一个
atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))// 记录topic下的消息总字节数
return nil
}
func (t *Topic) put(m *Message) error {
select {
//优先写入内存chan,这个内存队列会被NewTopic时候起的gotoutinue给for select 监控消费,
//然后又入 topic的所有channel的内存chan
case t.memoryMsgChan <- m:
default:
err := writeMessageToBackend(m, t.backend) //写入磁盘的方法实现
t.nsqd.SetHealth(err)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
}
return nil
}
//写入磁盘
//1.定义格式
//2.写入 diskQueue 这个抽象,这里面有自己的ioloop进行异步,缓冲,批量刷盘
func writeMessageToBackend(msg *Message, bq BackendQueue) error {
buf := bufferPoolGet()
defer bufferPoolPut(buf)
_, err := msg.WriteTo(buf)
if err != nil {
return err
}
return bq.Put(buf.Bytes())
}
// 消息被持久化到磁盘的格式
// message format:
// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
// | (int64) || || (hex string encoded in ASCII) || (binary)
// | 8-byte || || 16-byte || N-byte
// ------------------------------------------------------------------------------------------...
// nanosecond timestamp ^^ message ID message body
// (uint16)
// 2-byte
// attempts(重试次数)
func decodeMessage(b []byte) (*Message, error) {
var msg Message
if len(b) < minValidMsgLength {
return nil, fmt.Errorf("invalid message buffer size (%d)", len(b))
}
msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
msg.Attempts = binary.BigEndian.Uint16(b[8:10])
copy(msg.ID[:], b[10:10+MsgIDLength])
msg.Body = b[10+MsgIDLength:]
return &msg, nil
}
- 消息从topic的chan流转到topic关联的所有channel的队列中
1. NewTopic生成topic对象的时候就起了goroutinue监控
// Topic constructor
func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic {
......
...... 此处省略对象初始化的其他代码
......
t.waitGroup.Wrap(t.messagePump) //这个 messagePump 是消息流转泵
t.nsqd.Notify(t, !t.ephemeral)
return t
}
// messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic
func (t *Topic) messagePump() {
......
...... 此处省略其他代码
......
// main message loop
for {
select {
case msg = <-memoryMsgChan:
case buf = <-backendChan:
msg, err = decodeMessage(buf)
......
case <-t.channelUpdateChan:
......
continue
case <-t.pauseChan:
......
continue
case <-t.exitChan:
goto exit
}
//往topic对应的所有channel的内存队列写
for i, channel := range chans {
......
if chanMsg.deferred != 0 {
//nsq 还支持延时消息队列,就是这里支持的
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
}
exit:
t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}
- 消息从channel推送到各个消费客户端
1. nsqd通过Main方法进行服务启动的
func (n *NSQD) Main() error {
......
n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) //TCPServer 这个便是与客户端进行tcp连接的逻辑了,push数据很定也是这里负责最终交付流程了
})
if n.httpListener != nil {
httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
})
}
if n.httpsListener != nil {
httpsServer := newHTTPServer(n, true, true)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
})
}
n.waitGroup.Wrap(n.queueScanLoop) //消息超时,重试等监控业务流程
n.waitGroup.Wrap(n.lookupLoop) //注册中心业务流程
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}
err := <-exitCh
return err
}
2. 客户端连接管理
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
......
for {
clientConn, err := listener.Accept()//连接事件,拿到连接客户端
if err != nil {
......
break
}
......
go func() {
handler.Handle(clientConn)//为每个客户端起一个goroutinue,进行协议对接
wg.Done()
}()
}
......
return nil
}
2. 处理客户端tcp发送过来的各种指令(其实就是前面说到的nsq自定义协议)
func (p *tcpServer) Handle(conn net.Conn) {
......
client := prot.NewClient(conn)
p.conns.Store(conn.RemoteAddr(), client)
err = prot.IOLoop(client) //起for 进行不断的监听客户端发送的包,并解析成协议对应指令
if err != nil {
p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err)
}
p.conns.Delete(conn.RemoteAddr())
client.Close()
}
// 监听获取数据包,解析成cmd协议指令
func (p *protocolV2) IOLoop(c protocol.Client) error {
client := c.(*clientV2)
go p.messagePump(client, messagePumpStartedChan)//消息泵,负责接收channel对应的队列消息,并推送给客户端
<-messagePumpStartedChan
//下面的for是用于鉴定客户端的数据包,然后将其解析成cmd指定,最后执行
for {
line = line[:len(line)-1]
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
params := bytes.Split(line, separatorBytes) //协议解析
var response []byte
response, err = p.Exec(client, params) //命令执行
......
}
3. channel的消息推送给客户端消费者
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
......
......
for {
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil
backendMsgChan = nil
flusherChan = nil
// force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
} else if flushed {
// last iteration we flushed...
// do not select on the flusher ticker channel
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
} else {
// we're buffered (if there isn't any more data we should flush)...
// select on the flusher ticker channel, too
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}
select {
case <-flusherChan:
......
case <-client.ReadyStateChan:
case subChannel = <-subEventChan:
......
case identifyData := <-identifyEventChan:
......
case <-heartbeatChan:
......
case b := <-backendMsgChan://磁盘chan的消息消费
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg, err := decodeMessage(b)
if err != nil {
p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
msg.Attempts++//重试次数累加,这个值会被发送客户端,客户端发现到达最大值就直接回复finish,达到丢弃效果,所以服务端只是傻瓜式,有消费者客户端决定消息丢弃不,不丢弃就继续。。。
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)//把消息推送到客户端
if err != nil {
goto exit
}
flushed = false
case msg := <-memoryMsgChan://channel的内存队列消息
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
exit:
p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
heartbeatTicker.Stop()
outputBufferTicker.Stop()
if err != nil {
p.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
}
}
总结
1.平时还是调用服务居多,如果自己写个tcp服务对外,那岂不是自己可以把它的协议封装,解析,沾包处理拿过来用么。
- nsq在处理消息流转,消息输入到输出看做一个io模型的话,那nsq还是做了很多事情的,比如持久化,相互独立的分布式,io多路复用的编程思想,重试机制
- nsq其实是多个子服务组装而成的,代码层级维护也可以细品品,如果实现功能的同时保持简单。。
- nsq的很多设置都可以通过启动命令行参数指定,这个cli到代码config这个过程还是挺简单灵活的
参考文献:
http://doc.yonyoucloud.com/doc/wiki/project/nsq-guide/quick_start.html
nsq客户端
消费流程