Golang

nsq源码(5) nsqd 消息投递

2019-01-21  本文已影响25人  Linrundong

保证成功投递

protocolV2处理对象

protocolV2.messagePump()协程
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    ...
        select {
        ...
        case msg := <-memoryMsgChan:
            ...
            // 发送前,在subChannel.StartInFlightTimeout()标记消息
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            // 向订阅的此client发送消息
            client.SendingMessage()
            err = p.SendMessage(client, msg)
            if err != nil {
                goto exit
            }
            flushed = false
        case <-client.ExitChan:
            goto exit
        }
    }
}
标记超时时间-subchannel.StartInFlightTimeout()
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
    now := time.Now()
    msg.clientID = clientID
    msg.deliveryTS = now
    msg.pri = now.Add(timeout).UnixNano()

    // 给subchannel使用messageID标记msg
    err := c.pushInFlightMessage(msg)
    if err != nil {
        return err
    }
    // 向subchannel的inFlightPQ消息数组添加此消息
    c.addToInFlightPQ(msg)
    return nil
}

定时检查超时-queueScanLoop

如果一条消息一直没有被消费,nsqd如何处理?

参考《Redis设计与实现》9.6 Redis的过期键删除策略,结合了两种策略:

惰性删除。每次客户端对某个key读写时,会检查它是否过期,如果过期,就把它删掉。

定期删除。定期删除并不会遍历整个DB,它会在规定时间内,分多次遍历服务器中各个DB,从数据库的expires字典中随机检查一部分键的过期时间,如果过期,则删除。

func (n *NSQD) Main() {
    ...
    // 超时消息检索和处理任务
    n.waitGroup.Wrap(n.queueScanLoop)
}
func (n *NSQD) queueScanLoop() {
    // 任务派发队列
    workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
    // 任务结果队列
    responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
    closeCh := make(chan int)

    workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
    refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

    channels := n.channels()
    // 创建worker并控制数量min(0.25 * chans, configMax)
    n.resizePool(len(channels), workCh, responseCh, closeCh)

    for {
        select {
        case <-workTicker.C:
            if len(channels) == 0 {
                continue
            }
        // 定时刷新
        case <-refreshTicker.C:
            channels = n.channels()
            n.resizePool(len(channels), workCh, responseCh, closeCh)
            continue
        case <-n.exitChan:
            goto exit
        }
        
        num := n.getOpts().QueueScanSelectionCount
        if num > len(channels) {
            num = len(channels)
        }

    loop:
        // 随机获取几个chan,发送到workCh任务队列
        for _, i := range util.UniqRands(num, len(channels)) {
            workCh <- channels[i]
        }

        // 接收worker结果, 统计有多少channel是"脏"的
        numDirty := 0
        for i := 0; i < num; i++ {
            if <-responseCh {
                numDirty++
            }
        }

        // 如果dirty的数量超过配置直接进行下一轮
        if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
            goto loop
        }
    }

exit:
    n.logf(LOG_INFO, "QUEUESCAN: closing")
    close(closeCh)
    workTicker.Stop()
    refreshTicker.Stop()
}

NSQD.resizePool() 动态调整worker协程数量

func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    idealPoolSize := int(float64(num) * 0.25)
    if idealPoolSize < 1 {
        idealPoolSize = 1
    } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
        idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    }
    for {
        if idealPoolSize == n.poolSize {
            break
        } else if idealPoolSize < n.poolSize {
            closeCh <- 1
            n.poolSize--
        } else {
            // idealPoolSize > n.poolSize,还需增加worker
            n.waitGroup.Wrap(func() {
                n.queueScanWorker(workCh, responseCh, closeCh)
            })
            n.poolSize++
        }
    }
}

NSQD.queueScanWorker() 消费者worker

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            if c.processInFlightQueue(now) {
                dirty = true
            }
            if c.processDeferredQueue(now) {
                dirty = true
            }
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}

检索和处理超时消息-channel.processInFlightQueue()

// queueScanWorker任务会传入当前时间到t
func (c *Channel) processInFlightQueue(t int64) bool {
    // 同步状态,防止正在这个channel正在退出
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()

    if c.Exiting() {
        return false
    }

    dirty := false
    // 循环处理inFlightPQ消息队列栈顶消息
    for {
        c.inFlightMutex.Lock()
        // 没有超时,则返回nil, 然后goto exit->return dirty
        // 超时了,inFlightPQ弹出并返回msg
        msg, _ := c.inFlightPQ.PeekAndShift(t)
        c.inFlightMutex.Unlock()

        if msg == nil {
            goto exit
        }
        // 只要发送过消息,则标记此subchannel为dirty
        dirty = true

        // 删除超时消息对应channel的inFlightMessages消息map
        _, err := c.popInFlightMessage(msg.clientID, msg.ID)
        if err != nil {
            goto exit
        }
        atomic.AddUint64(&c.timeoutCount, 1)
        c.RLock()
        client, ok := c.clients[msg.clientID]
        c.RUnlock()
        if ok {
            // 向超时消息对应client发送超时通知
            client.TimedOutMessage()
        }
        // 重新发送消息
        c.put(msg)
    }

exit:
    return dirty
}
上一篇 下一篇

猜你喜欢

热点阅读