什么是死信队列?如何实现消息的幂等性?拿来吧你!
2021-07-27 本文已影响0人
张清柏
死信队列
-
死信队列:DLX,dead-letter-exchange,一般是由于消息被否定了,消息过期了,或者消息队列超过最大长度导致信息不能被正常消费,那么这条消息就成了死信消息,如果我们绑定了死信队列,那么这个消息就会被投递到死信队列
-
使用场景:
使用rabbitmq的死信队列完成对库存、题库的回收工作,比如 某个商品被下单了减了库存,但是迟迟没有付款,超过30分钟我们就默认订单取消,并恢复库存。或者在医生抢题 答题的业务中,有的医生抢了10道题,但是只在有效期(比如1天)内答了3道题,但是剩余的7道题不可能一直被这个医生绑定,超过1天就要被解绑回库。 -
先说明以下内容:
1.队列可以在producer里面声明创建和绑定,也可以在consumer里面声明创建和绑定。declare并不关系和影响你的逻辑
2.如果你的死信队列想使用fanout ,在绑定的时候不要绑定key 即可,即key为空,如果你想使用direct,则必须指定key,其他类型也是如此。
我们先来看消费的代码
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
//声明交换机和消费的队列和key
var exchange = "direct_guofu_exchange"
var queue = "direct_guofu_queue_dlx_key"
var key = "direct_key"
//声明死信队列的交换机和路由键
var dlxExchange = "dlx_exchange"//死信队列交换机
var dlxKey = "dlx_key"//死信队列路由键
//建立连接 用户名+密码+ip+端口号+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//试探性声明交换机类型
ch.ExchangeDeclare(
exchange,
"direct",
true,
false,
false,
false,
nil,
)
//声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
_, err := ch.QueueDeclare(
queue,
true,
false,
false,
false,
//此处绑定死信交换机和路由键,不指定路由键默认是fanout模式
amqp.Table{
"x-dead-letter-exchange":dlxExchange,//交换机
"x-message-ttl":6000,//消息过期时间
"x-dead-letter-routing-key":dlxKey,//绑定死信队列的key
},
)
if err != nil {
panic(err)
}
//绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
ch.QueueBind(queue, key, exchange, false, nil)
//选择消费死信队列 Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
msg, err := ch.Consume(
queue,
"",
false,
false,
false,
false,
nil,
)
for d := range msg {
fmt.Println(string(d.Body))
d.Ack(false)
}
}
- 我们再来看消费死信队列的代码,这里面就是普通消费队列的代码,没有什么特殊
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
var dlxExchange = "dlx_exchange"//死信队列交换机
var dlxKey = "dlx_key"//死信队列交换机
var dxlxQueue="dlx_queue"
//建立连接 用户名+密码+ip+端口号+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//试探性声明交换机类型
ch.ExchangeDeclare(
dlxExchange,
"direct",
true,
false,
false,
false,
nil,
)
//试探性创建队列
//声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
_, err := ch.QueueDeclare(
dxlxQueue,
true,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
//绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
ch.QueueBind(dxlxQueue, dlxKey, dlxExchange, false, nil)
//选择消费死信队列 Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
msg, err := ch.Consume(
dxlxQueue,
"",
false,
false,
false,
false,
nil,
)
for d := range msg {
fmt.Println(string(d.Body))
d.Ack(false)
}
}
- 生产代码,也是一段普通的生产代码
package main
import (
"github.com/streadway/amqp"
)
/**
* @Description: 演示死信队列,
*/
func main() {
var exchange = "direct_guofu_exchange"
var key = "direct_key"
//建立连接 用户名+密码+ip+端口号+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//声明交换机类型
ch.ExchangeDeclare(
exchange,
"direct",
true,
false,
false,
false,
nil,
)
//定义消息
msgBody := "i am a dead_letter"
//发送消息 相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
err := ch.Publish(
exchange, //exchange
key, //routing key(queue name)
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent, //Msg set as persistent
ContentType: "text/plain",
Body: []byte(msgBody),
})
if err != nil {
panic(err)
}
}
-
我们暂停队列的正常消费,看看6秒后能否进入到死信队列
image.png - 从上图可以看到,死信消息在6s后被投递到死信队列