什么是死信队列?如何实现消息的幂等性?拿来吧你!

2021-07-27  本文已影响0人  张清柏

死信队列

我们先来看消费的代码

package main

import (
    "fmt"
    "github.com/streadway/amqp"
)

func main() {
    //声明交换机和消费的队列和key
    var exchange = "direct_guofu_exchange"
    var queue = "direct_guofu_queue_dlx_key"
    var key = "direct_key"
    
    //声明死信队列的交换机和路由键
    var dlxExchange = "dlx_exchange"//死信队列交换机
    var dlxKey = "dlx_key"//死信队列路由键

    //建立连接  用户名+密码+ip+端口号+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //试探性声明交换机类型
    ch.ExchangeDeclare(
        exchange,
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )

    //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
    _, err := ch.QueueDeclare(
        queue,
        true,
        false,
        false,
        false,
        //此处绑定死信交换机和路由键,不指定路由键默认是fanout模式
        amqp.Table{
            "x-dead-letter-exchange":dlxExchange,//交换机
            "x-message-ttl":6000,//消息过期时间
            "x-dead-letter-routing-key":dlxKey,//绑定死信队列的key
        },
    )
    if err != nil {
        panic(err)
    }
    //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
    ch.QueueBind(queue, key, exchange, false, nil)
    //选择消费死信队列  Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
    msg, err := ch.Consume(
        queue,
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    for d := range msg {
        fmt.Println(string(d.Body))
        d.Ack(false)

    }

}


package main

import (
    "fmt"
    "github.com/streadway/amqp"
)

func main() {
    var dlxExchange = "dlx_exchange"//死信队列交换机
    var dlxKey = "dlx_key"//死信队列交换机
    var dxlxQueue="dlx_queue"
    //建立连接  用户名+密码+ip+端口号+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //试探性声明交换机类型
    ch.ExchangeDeclare(
        dlxExchange,
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )

    //试探性创建队列
    //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
    _, err := ch.QueueDeclare(
        dxlxQueue,
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        panic(err)
    }
    //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
    ch.QueueBind(dxlxQueue, dlxKey, dlxExchange, false, nil)

    //选择消费死信队列  Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
    msg, err := ch.Consume(
        dxlxQueue,
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    for d := range msg {
        fmt.Println(string(d.Body))
        d.Ack(false)

    }

}

package main

import (
    "github.com/streadway/amqp"
)

/**
 * @Description: 演示死信队列,
 */
func main() {
    var exchange = "direct_guofu_exchange"
    var key = "direct_key"
    //建立连接  用户名+密码+ip+端口号+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //声明交换机类型
    ch.ExchangeDeclare(
        exchange,
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )

    //定义消息
    msgBody := "i am a dead_letter"
    //发送消息  相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
    err := ch.Publish(
        exchange, //exchange
        key,      //routing key(queue name)
        false,
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent, //Msg set as persistent
            ContentType:  "text/plain",
            Body:         []byte(msgBody),
        })

    if err != nil {
        panic(err)
    }
}

消息的幂等性参考

上一篇下一篇

猜你喜欢

热点阅读