RabbitMQ入门学习3 - Go Package实现
1. 概述
RabbitMQ在生产环境的应用Tips,基于Golang。
go with rabbitmq2. 基础功能
基础功能包含Exchange、Queue的Declare,如何Consume和Publish,以及对Delivery的操作(Delivery.Ack, Delivery.Reject or Delivery.Nack)。
参考:https://github.com/zspishere/rabbitmq-demo
这里增加两个必要的RabbitMQ Error Catch,可以实现重连或Queue重建等操作。
Catch Connection Error
import "github.com/streadway/amqp"
chConnErr chan *amqp.Error
chConnErr = make(chan *amqp.Error)
amqpConn.NotifyClose(chConnErr)
Catch Channel Error
import "github.com/streadway/amqp"
chNotifyClose chan *amqp.Error
chNotifyReturn chan amqp.Return
chNotifyCancel chan string
chNotifyClose: make(chan *amqp.Error)
chNotifyReturn: make(chan amqp.Return)
chNotifyCancel: make(chan string)
ch.NotifyClose(c.chNotifyClose)
ch.NotifyReturn(c.chNotifyReturn)
ch.NotifyCancel(c.chNotifyCancel)
3. 其他功能
3.1. 消息持久化
- Publish a Persistent Pulishing
func publishToQueue(pub *Publishment) error {
message, err := json.Marshal(pub.Payload)
if err != nil {
msg := "Failed to publish a message in json.marshal"
log.Printf("%s: %#v - %s", msg, pub, err)
return err
}
err = producerCh.amqpCh.Publish(
pub.Exchange,
pub.RoutingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: message,
DeliveryMode: amqp.Persistent,
//Expiration: "5000",
})
if err != nil {
msg := "Failed to publish a message"
log.Printf("%s: %s", msg, err)
return err
}
//log.Printf("Publish message: %#v", pub)
return nil
}
https://www.cnblogs.com/davenkin/p/rabbitmq-best-practices.html
- Declare a durable Exchange and Queue
Durable and Non-Auto-Deleted queues will survive server restarts and remain
when there are no remaining consumers or bindings. Persistent publishings will
be restored in this queue on server restart. These queues are only able to be
bound to durable exchanges
.
Make sure durable=true
.
amqpCh.ExchangeDeclare(c.exchange, "topic", true, false, false, false, nil)
amqpCh.QueueDeclare(c.queue, true, false, false, false, args)
3.2. Lazy Queue
惰性队列和持久化消息可谓是“最佳拍档”。注意如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失。
https://blog.csdn.net/u013256816/article/details/77987216
在amqpCh.QueueDeclare
时,增加args参数。
args := amqp.Table{}
args["x-queue-mode"] = "lazy"
3.3. Queue TTL
RabbitMQ支持Message TTL和Queue TTL,可以先给Queue设置一个Default TTL。
https://www.rabbitmq.com/ttl.html
在amqpCh.QueueDeclare
时,增加args参数。
args := amqp.Table{}
args["x-message-ttl"] = 7 * 86400 * 1000 // ms
3.4. 死信队列
三种情况的message会被resend到死信exchange,基于死信Exchange,可以建立死信队列。
- 消息被拒绝(basic.rejcet/basic.nack)且requeue=false
- 消息TTL过期
- 队列达到最大长度
在amqpCh.QueueDeclare
时,增加args参数。
args := amqp.Table{}
args["x-dead-letter-exchange"] = rabbitmqdlx
3.5. 限流
// 每次只推1条
ch.Qos(1, 0, false)
// 设置Channel autoAck一定要设置为false,才能做限流
amqpCh.Consume(c.queue, c.tag, false, false, false, false, nil)
3.6. 消费者保证消息幂等操作
https://blog.csdn.net/eluanshi12/article/details/88959856
3.7. 数据安全相关
- Vhost
- 账号密码
- TLS证书
4. 坑s
坑1,amqp.channel.Close()
-
amqp.channel.Close()
好像有bug,释放资源不干净,反复重建channel会报错,有空写个test程序试一下
坑2,Queue和Xchange Declare参数需要一致(除了nowait字段)
- 问题:QueueDeclare的参数要一致,否则报错,且retry会被block住
- 解决:如果要改变参数,先delete,再declare。是否
可删除
,需要注意
坑3,对Delivery有三种操作,可能存在重复推送,幂等
需要应用系统来保证
- func (d Delivery) Ack(multiple bool) error
- func (d Delivery) Reject(requeue bool) error
- func (d Delivery) Nack(multiple, requeue bool) error
如果Reject(true)
,会重复推送同一条数据给当前ch的当前consumer,一直一直。。
更多使用规范
https://www.cloudamqp.com/blog/part4-rabbitmq-13-common-errors.html
5. Next
Publishing Ack
RabbitMQ Cluster
- https://www.cnblogs.com/neverc/p/6888560.html
- https://my.oschina.net/u/4047016/blog/4553578
- https://www.rabbitmq.com/distributed.html
- https://www.rabbitmq.com/clustering.html