RabbitMQ消费 发送

2018-01-03  本文已影响0人  Feng_Sir
package connectors

import (
    "../commons"
    "github.com/streadway/amqp"
    log "github.com/sirupsen/logrus"
    "bytes"
    "strconv"
    "fmt"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func rabbitConnector() (*amqp.Connection, error) {
    rabbitConfig := commons.RabbitConfig
    addr := bytes.Buffer{}
    addr.WriteString("amqp://")
    addr.WriteString(rabbitConfig.Username)
    addr.WriteString(":")
    addr.WriteString(rabbitConfig.Password)
    addr.WriteString("@")
    addr.WriteString(rabbitConfig.Host)
    addr.WriteString(":")
    addr.WriteString(strconv.Itoa(rabbitConfig.Port))
    addr.WriteString("/")
    addr.WriteString(rabbitConfig.Vhost)
    conn, err := amqp.Dial(addr.String())
    if err != nil {
        failOnError(err, "Failed to connect to RabbitMQ")
    }
    return conn, err
}

func Send(msg string) {

    defer func() {
        if err := recover(); err != nil {
            log.Errorf("RabbitMQ发送存储消息错误 %s", err)
        }
    }()
    rabbitConfig := commons.RabbitConfig
    conn, err := rabbitConnector()
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        rabbitConfig.Exchange, // name
        "topic",              // type
        true,                  // durable
        false,                 // auto-deleted
        false,                 // internal
        false,                 // no-wait
        nil,                   // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    err = ch.Publish(
        rabbitConfig.Exchange, // exchange
        "custom",              // routing key
        false,                 // mandatory
        false,                 // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(msg),
        })
    failOnError(err, "Failed to publish a message")

}

func RabbitConsume() {
    conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/dashboard", commons.RabbitConfig.Username, commons.RabbitConfig.Password, commons.RabbitConfig.Host, commons.RabbitConfig.Port))
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "dashboard", // name
        "topic",     // type
        true,        // durable
        false,       // auto-deleted
        false,       // internal
        false,       // no-wait
        nil,         // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    q, err := ch.QueueDeclare(
        "", // name
        true,        // durable
        false,        // delete when usused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.QueueBind(
        q.Name,      // queue name
        "custom",    // routing key
        "dashboard", // exchange
        false,
        nil)
    failOnError(err, "Failed to bind a queue")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf(" [x] %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever
}

上一篇下一篇

猜你喜欢

热点阅读