RabbitMQGo

RabbitMQ入门学习3 - Go Package实现

2021-05-02  本文已影响0人  红薯爱帅

1. 概述

RabbitMQ在生产环境的应用Tips,基于Golang。

go with rabbitmq

2. 基础功能

基础功能包含Exchange、Queue的Declare,如何Consume和Publish,以及对Delivery的操作(Delivery.Ack, Delivery.Reject or Delivery.Nack)。

参考:https://github.com/zspishere/rabbitmq-demo

这里增加两个必要的RabbitMQ Error Catch,可以实现重连或Queue重建等操作。

Catch Connection Error

import "github.com/streadway/amqp"

chConnErr chan *amqp.Error
chConnErr = make(chan *amqp.Error)

amqpConn.NotifyClose(chConnErr)

Catch Channel Error

import "github.com/streadway/amqp"

chNotifyClose       chan *amqp.Error
chNotifyReturn      chan amqp.Return
chNotifyCancel      chan string

chNotifyClose:       make(chan *amqp.Error)
chNotifyReturn:      make(chan amqp.Return)
chNotifyCancel:      make(chan string)

ch.NotifyClose(c.chNotifyClose)
ch.NotifyReturn(c.chNotifyReturn)
ch.NotifyCancel(c.chNotifyCancel)

3. 其他功能

3.1. 消息持久化

func publishToQueue(pub *Publishment) error {
    message, err := json.Marshal(pub.Payload)
    if err != nil {
        msg := "Failed to publish a message in json.marshal"
        log.Printf("%s: %#v - %s", msg, pub, err)
        return err
    }

    err = producerCh.amqpCh.Publish(
        pub.Exchange,
        pub.RoutingKey,
        false,
        false,
        amqp.Publishing{
            ContentType:  "text/plain",
            Body:         message,
            DeliveryMode: amqp.Persistent,
            //Expiration: "5000",
        })
    if err != nil {
        msg := "Failed to publish a message"
        log.Printf("%s: %s", msg, err)
        return err
    }
    //log.Printf("Publish message: %#v", pub)
    return nil
}

https://www.cnblogs.com/davenkin/p/rabbitmq-best-practices.html

Durable and Non-Auto-Deleted queues will survive server restarts and remain
when there are no remaining consumers or bindings. Persistent publishings will
be restored in this queue on server restart. These queues are only able to be
bound to durable exchanges.

Make sure durable=true.

amqpCh.ExchangeDeclare(c.exchange, "topic", true, false, false, false, nil)
amqpCh.QueueDeclare(c.queue, true, false, false, false, args)

3.2. Lazy Queue

惰性队列和持久化消息可谓是“最佳拍档”。注意如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失。
https://blog.csdn.net/u013256816/article/details/77987216

amqpCh.QueueDeclare时,增加args参数。

args := amqp.Table{}
args["x-queue-mode"] = "lazy"

3.3. Queue TTL

RabbitMQ支持Message TTL和Queue TTL,可以先给Queue设置一个Default TTL。
https://www.rabbitmq.com/ttl.html

amqpCh.QueueDeclare时,增加args参数。

args := amqp.Table{}
args["x-message-ttl"] = 7 * 86400 * 1000 // ms

3.4. 死信队列

三种情况的message会被resend到死信exchange,基于死信Exchange,可以建立死信队列。

amqpCh.QueueDeclare时,增加args参数。

args := amqp.Table{}
args["x-dead-letter-exchange"] = rabbitmqdlx

3.5. 限流

// 每次只推1条
ch.Qos(1, 0, false)
// 设置Channel autoAck一定要设置为false,才能做限流
amqpCh.Consume(c.queue, c.tag, false, false, false, false, nil)

3.6. 消费者保证消息幂等操作

https://blog.csdn.net/eluanshi12/article/details/88959856

3.7. 数据安全相关

4. 坑s

坑1,amqp.channel.Close()

坑2,Queue和Xchange Declare参数需要一致(除了nowait字段)

坑3,对Delivery有三种操作,可能存在重复推送,幂等需要应用系统来保证

如果Reject(true),会重复推送同一条数据给当前ch的当前consumer,一直一直。。

更多使用规范

https://www.cloudamqp.com/blog/part4-rabbitmq-13-common-errors.html

5. Next

Publishing Ack

RabbitMQ Cluster

Quorum-queues

上一篇下一篇

猜你喜欢

热点阅读