Go消息中间件Nsq系列(八)------topic(主题发布)
上一篇: Go消息中间件Nsq系列(七)------go-diskqueue 文件队列实现
1. Topic/Channel 回顾
前文有说过 Topic/Channel是发布/订阅模型的一种实现。Topic对应发布, Channel对应于订阅。
- 订阅: SUB 消费者订阅某个Topic-Channel的消息
- 发布: PUB, 消息提供者往某个NSQD PUB一条消息,在由topic投递给其下面的所有channel
func PUB(client *clientV2, params [][]byte) ([]byte, error) // ... 省略 topic := p.ctx.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg)
其他请往回看.
2. 思考一下
- 要投递就要对应的topic, 那么topic是什么时候获取,如何创建的呢?
- 它是怎么处理投递的消息的呢?
- 如何暂停/取消暂停, 关闭和删除的呢?
- 与topic相关联的channel 查询,创建,删除呢?
- 如何统计投递延迟呢?
3. Topic的创建 GetTopic(topicName string) *Topic
3.1 topic的结构体定义如下:
type Topic struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
messageCount uint64 // 消息总数
messageBytes uint64 // 消息字节数
sync.RWMutex
name string // 主题名称
channelMap map[string]*Channel // topic关联的map
backend BackendQueue // BackendQueue 后台队列实现
memoryMsgChan chan *Message // 内存消息通道 默认10000缓冲区
startChan chan int // topic启动通道
exitChan chan int // 退出通道
channelUpdateChan chan int // channel更新通道
waitGroup util.WaitGroupWrapper // 多协程任务封装
exitFlag int32 // 退出标志
idFactory *guidFactory // guid 生成器
ephemeral bool // 临时的topic
deleteCallback func(*Topic) // 删除回调
deleter sync.Once // 删除仅一次
paused int32 // 暂停标志位
pauseChan chan int // 暂停通道
ctx *context // 透传 nsqd 上下文
}
3.2Topic的获取情况与过程:
- nsqd启动时,LoadMetadata() , 获取并启动Topic进行消息投递处理
- 通过暴露的api接口去查询的时候
getTopicFromQuery(req *http.Request)
- 客户端进行
PUB
DPUB
MPUB
SUB
根据对应的Topic去处理
3.3获取过程:
GetTopic 是线程安全的操作, 返回一个Topic指针对象
1.如果该topic已存在 则返回已存在,
2.否则新建一个topic,并保存起来
如果不是正在加载的情况, 则根据该topic去lookupd发现获取对应的channels(非临时的)
并通过chan(异步通知)启动消息投递处理协程
func (n *NSQD) GetTopic(topicName string) *Topic {
// ..其他省略 新建Topic
t = NewTopic(topicName, &context{n}, deleteCallback)
return t
}
3.4Topic的创建:
是否创建临时的Topic,所使用的backedQueue不一样, 临时的只是一个模拟的队列, 非临时的就是需要持久化的
然后启动一个协程messagePump
来进行消息投递处理
// Topic constructor
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
t := &Topic{
name: topicName,
channelMap: make(map[string]*Channel),
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
startChan: make(chan int, 1),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
ctx: ctx,
paused: 0,
pauseChan: make(chan int),
deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID),
}
// 如果是临时的topic
if strings.HasSuffix(topicName, "#ephemeral") {
t.ephemeral = true
t.backend = newDummyBackendQueue()
} else {
// 日志
dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
opts := ctx.nsqd.getOpts()
lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
}
// 初始化 磁盘队列, 用于内存消息缓冲超过阈值(默认配置10000, 详细看上一篇)
t.backend = diskqueue.New(
topicName,
ctx.nsqd.getOpts().DataPath,
ctx.nsqd.getOpts().MaxBytesPerFile,
int32(minValidMsgLength),
int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
ctx.nsqd.getOpts().SyncEvery,
ctx.nsqd.getOpts().SyncTimeout,
dqLogf,
)
}
// topic核心处理
t.waitGroup.Wrap(t.messagePump)
// 通知持久化
t.ctx.nsqd.Notify(t)
return t
}
4.Topic处理消息投递
4.1外部调用Topic投递消息:
- 单条消息投递(a message)
如果该topic已关闭,则无法进行消息投递
进行消息投递, 先走case也就是内存,缓冲区已满 在走default(磁盘队列)
添加统计信息 (消息计数器, 消息主体大小)
// PutMessage writes a Message to the queue
func (t *Topic) PutMessage(m *Message) error {
t.RLock()
defer t.RUnlock()
if atomic.LoadInt32(&t.exitFlag) == 1 {
return errors.New("exiting")
}
err := t.put(m)
if err != nil {
return err
}
atomic.AddUint64(&t.messageCount, 1)
atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))
return nil
}
4.2. 批量消息投递(multiple message)
如果该topic已关闭,则无法进行消息投递
遍历投递, 统计..
如果用可选参数 PutMessage(msgs ...Message) 单或者多都行
// PutMessages writes multiple Messages to the queue
func (t *Topic) PutMessages(msgs []*Message) error {
t.RLock()
defer t.RUnlock()
if atomic.LoadInt32(&t.exitFlag) == 1 {
return errors.New("exiting")
}
messageTotalBytes := 0
for i, m := range msgs {
err := t.put(m)
if err != nil {
atomic.AddUint64(&t.messageCount, uint64(i))
atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
return err
}
messageTotalBytes += len(m.Body)
}
atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
atomic.AddUint64(&t.messageCount, uint64(len(msgs)))
return nil
}
4.3以下内部调用的put
函数
select 先走case, 也就是内存缓冲区
如果case没有执行,看看有没有default, 接着执行default
也就是说 消息先投递到内存, 内存缓冲区满了之后会将消息写入到磁盘队列
这里使用了sync.Pool 减少GC
同时也会看每次写入磁盘是否有错误, 设置其健康状态保存已暴露给api接口/ping使用
func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
}
return nil
}
4.4 Topic的启动
messagePump topic的核心协程处理 阻塞等待
通过发送startChan 信号 异步通知 启动
func (t *Topic) Start() {
select {
case t.startChan <- 1: // 写入chan
default:
}
}
4.5 messagePump()
处理消息往channel投递, 以及更新channel
messagePump()
整个topic处理的核心所在:
这里使用了一个技巧, 使用channel阻塞等待, 在异步唤醒的操作继续走接下流程.
从内存或磁盘队列中读取消息, 遍历该topic下所有的channel,复制消息进行投递,如果是延时消息则投递到延时队列, 否则就正常投递
删除或新建的Channel的时候, 走channelUpdateChan
case,去更新当前chans列表
暂停, 和退出
至于把memoryMsgChan / backendChan = nil
, 这样子 就会跳过select 选取
// messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic
func (t *Topic) messagePump() {
var msg *Message
var buf []byte
var err error
var chans []*Channel
var memoryMsgChan chan *Message
var backendChan chan []byte
// do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
// 这里使用了一个技巧, 使用channel阻塞等待, 在异步唤醒的操作
for {
select {
case <-t.channelUpdateChan:
continue
case <-t.pauseChan:
continue
case <-t.exitChan:
goto exit
case <-t.startChan:
}
break
}
t.RLock()
// 读锁 把所有的channel遍历合并
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
// topic没有暂停(pause) 并且 有可进行投递的channel
if len(chans) > 0 && !t.IsPaused() {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
// main message loop
for {
select {
// 从内存或者磁盘 获取消息 并序列化成Message
case msg = <-memoryMsgChan:
case buf = <-backendChan:
msg, err = decodeMessage(buf)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
case <-t.channelUpdateChan:
// 更新channel
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.pauseChan:
// 暂停
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.exitChan:
// 退出
goto exit
}
// 遍历所有的channel, 复制消息进行投递
// 如果是延时消息则投递到延时队列, 否则就正常投递
for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
}
exit:
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}
5. Topic的暂停/恢复 , 删除/关闭
5.1 暂停与恢复
通过设置标志位
&t.paused
使用chan 异步通信控制
// 暂停 标志位 paused = 1
func (t *Topic) Pause() error {
return t.doPause(true)
}
// resume 恢复 标志位 paused = 0
func (t *Topic) UnPause() error {
return t.doPause(false)
}
// 原子操作 设置标志位值, 给messagePump发送异步通知
func (t *Topic) doPause(pause bool) error {
if pause {
atomic.StoreInt32(&t.paused, 1)
} else {
atomic.StoreInt32(&t.paused, 0)
}
select {
case t.pauseChan <- 1:
case <-t.exitChan:
}
return nil
}
// 判断是否暂停 状态
func (t *Topic) IsPaused() bool {
return atomic.LoadInt32(&t.paused) == 1
}
5.2 删除与关闭
如果是删除 则删除当前关联的channel, 关闭所有客户端连接, 并清空磁盘消息,关闭磁盘读写
关闭仅仅是关闭当前连接,把未读的消息flush到磁盘
// Delete empties the topic and all its channels and closes
func (t *Topic) Delete() error {
return t.exit(true)
}
// Close persists all outstanding topic data and closes all its channels
func (t *Topic) Close() error {
return t.exit(false)
}
// 1. cas锁
// 2. 关闭 exitChan
// 3. 阻塞等待messagePump() 结束
// 4. 如果标记为删除 删除记录并关闭所有客户端连接, 并清空磁盘文件,关闭磁盘队列
// -- 否则 关闭关闭所有客户端连接而已, 把剩余的消息写到磁盘
func (t *Topic) exit(deleted bool) error {
if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) {
return errors.New("exiting")
}
if deleted {
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name)
// since we are explicitly deleting a topic (not just at system exit time)
// de-register this from the lookupd
t.ctx.nsqd.Notify(t)
} else {
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name)
}
close(t.exitChan)
// synchronize the close of messagePump()
t.waitGroup.Wait()
if deleted {
t.Lock()
for _, channel := range t.channelMap {
delete(t.channelMap, channel.name)
channel.Delete()
}
t.Unlock()
// empty the queue (deletes the backend files, too)
t.Empty()
return t.backend.Delete()
}
// close all the channels
for _, channel := range t.channelMap {
err := channel.Close()
if err != nil {
// we need to continue regardless of error to close all the channels
t.ctx.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel.name, err)
}
}
// write anything leftover to disk
t.flush()
return t.backend.Close()
}
// 磁盘队列的 empty
func (t *Topic) Empty() error {
for {
select {
case <-t.memoryMsgChan:
default:
goto finish
}
}
finish:
return t.backend.Empty()
}
func (t *Topic) flush() error {
var msgBuf bytes.Buffer
if len(t.memoryMsgChan) > 0 {
t.ctx.nsqd.logf(LOG_INFO,
"TOPIC(%s): flushing %d memory messages to backend",
t.name, len(t.memoryMsgChan))
}
for {
select {
case msg := <-t.memoryMsgChan: // 写入到磁盘
err := writeMessageToBackend(&msgBuf, msg, t.backend)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
"ERROR: failed to write message to backend - %s", err)
}
default:
goto finish
}
}
finish:
return nil
}
6. 与Topic关联的Channel的获取,创建,删除
GetChannel()
获取Channel的逻辑是在channelMap
里面去查找,如果找不到的话就新建返回, 如果是新建的话, 要去发送通知去更新chans
同时对外提供了接口进行查询GetExistingChannel()
, 删除DeleteExistingChannel()
// GetChannel performs a thread safe operation
// to return a pointer to a Channel object (potentially new)
// for the given Topic
// 获取或者创建topic对应的channel, 他是线程安全,
// 如果是新建,则发送通知至messagePump进行更新
func (t *Topic) GetChannel(channelName string) *Channel {
t.Lock()
channel, isNew := t.getOrCreateChannel(channelName)
t.Unlock()
if isNew {
// update messagePump state
select {
case t.channelUpdateChan <- 1:
case <-t.exitChan:
}
}
return channel
}
// this expects the caller to handle locking
func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) {
channel, ok := t.channelMap[channelName]
if !ok {
// 在与topic关联的channel中找不到则新建
deleteCallback := func(c *Channel) {
t.DeleteExistingChannel(c.name)
}
channel = NewChannel(t.name, channelName, t.ctx, deleteCallback)
t.channelMap[channelName] = channel
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, channel.name)
return channel, true
}
return channel, false
}
// 对外暴露接口
func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) {
t.RLock()
defer t.RUnlock()
channel, ok := t.channelMap[channelName]
if !ok {
return nil, errors.New("channel does not exist")
}
return channel, nil
}
// DeleteExistingChannel removes a channel from the topic only if it exists
// 对外暴露接口 删除已存在的channel
// 如果已存在, 从map中删除, 并发送更新通知至channelUpdateChan
// 如果已经channel已经删除完毕, 并且是临时topic 则删除该topic
func (t *Topic) DeleteExistingChannel(channelName string) error {
t.Lock()
channel, ok := t.channelMap[channelName]
if !ok {
t.Unlock()
return errors.New("channel does not exist")
}
delete(t.channelMap, channelName)
// not defered so that we can continue while the channel async closes
numChannels := len(t.channelMap)
t.Unlock()
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, channel.name)
// delete empties the channel before closing
// (so that we dont leave any messages around)
channel.Delete()
// update messagePump state
select {
case t.channelUpdateChan <- 1:
case <-t.exitChan:
}
if numChannels == 0 && t.ephemeral == true {
// 仅执行一次 删除当前topic
go t.deleter.Do(func() { t.deleteCallback(t) })
}
return nil
}
7. 消息投递的延时统计
TopicStats结构体定义, 使用了该库
https://github.com/bmizerany/perks 使用了该库
// topic统计信息结构体定义
type TopicStats struct {
// 主题名称
TopicName string `json:"topic_name"`
// 该主题下所有channel的统计信息 1:n
Channels []ChannelStats `json:"channels"`
// 所有消息未读量
Depth int64 `json:"depth"`
// 磁盘队列未读消息
BackendDepth int64 `json:"backend_depth"`
// 消息总数
MessageCount uint64 `json:"message_count"`
// 消息字节
MessageBytes uint64 `json:"message_bytes"`
// 是否暂停
Paused bool `json:"paused"`
// 四分位数(Quartile),即统计学中,把所有数值由小到大排列并分成四等份,处于三个分割点位置的得分就是四分位数。
E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
}
// https://github.com/bmizerany/perks 使用了该库 进行channel投递延迟统计
// 具体我不知道咋算...
func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile {
var latencyStream *quantile.Quantile
t.RLock()
realChannels := make([]*Channel, 0, len(t.channelMap))
for _, c := range t.channelMap {
realChannels = append(realChannels, c)
}
t.RUnlock()
for _, c := range realChannels {
if c.e2eProcessingLatencyStream == nil {
continue
}
if latencyStream == nil {
latencyStream = quantile.New(
t.ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime,
t.ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles)
}
latencyStream.Merge(c.e2eProcessingLatencyStream)
}
return latencyStream
}
8. 其他的一些方法
Depth()
,GenerateID()
,Exiting()
// 消息未读数量= 内存未读+磁盘未读
func (t *Topic) Depth() int64 {
return int64(len(t.memoryMsgChan)) + t.backend.Depth()
}
// 生成message id
func (t *Topic) GenerateID() MessageID {
retry:
id, err := t.idFactory.NewGUID()
if err != nil {
time.Sleep(time.Millisecond)
goto retry
}
return id.Hex()
}
// 返回是否关闭/删除状态
func (t *Topic) Exiting() bool {
return atomic.LoadInt32(&t.exitFlag) == 1
}