程序员GO语言入门宝典

Rabbitmq的Simple模式Go语言版本

2020-06-06  本文已影响0人  Goplayer王布斯

package RabbitMQ

import (

"fmt"

"github.com/streadway/amqp"

"log"

)

//url 格式 amqp://账号:密码@rabbitmq服务器地址:端口号 /vhost

const MQURL ="amqp://guest:guest@127.0.0.1:5672/wangyao"

type RabbitMQ struct {

conn *amqp.Connection

  channel *amqp.Channel

  //队列

  QueueName string

  //交换机

  Exchange string

  //key

  Key string

  Mqurl string

}

func NewRabbitMQ(queueName string,exchange string,key string)*RabbitMQ{

rabbitmq:= &RabbitMQ{QueueName:queueName,Exchange:exchange,Key:key,Mqurl:MQURL}

var err error

  rabbitmq.conn,err=amqp.Dial(rabbitmq.Mqurl)

rabbitmq.failOnErr(err,"连接错误")

rabbitmq.channel,err=rabbitmq.conn.Channel()

rabbitmq.failOnErr(err,"获取channel失败")

return rabbitmq

}

//断开channel 和 connection

func (r *RabbitMQ)Destroy(){

r.channel.Close()

r.conn.Close()

}

//错误处理函数

func (r *RabbitMQ)failOnErr(err error,message string){

if  err !=nil {

log.Fatalf("%s:%s",message,err)

panic(fmt.Sprintf("%s:%s",message,err))

}

}

//1.创建简单模式rabbitmq实例

func NewRabbitMQSimple(queueName string)*RabbitMQ{

return NewRabbitMQ(queueName,"","")

}

//简单模式下生产代码

func (r *RabbitMQ)PublishSimple(message string){

//1.申请队列,如果队列不存在,自动创建,如果存在就跳过

//保证队列存在 消息能发到队列中

  _,err:=r.channel.QueueDeclare(

r.QueueName,

      //是否持久化

      false,

      //是否自动删除

      false,

      //排他性

      false,

      //是否阻塞

      false,

      //额外属性

      nil,

      )

if err!=nil{

fmt.Println(err)

}

//2.发送消息到队列中

  r.channel.Publish(

r.Exchange,

      r.QueueName,

      //默认推荐,如果为true会根据exchange类型和routinkey规则,如果

//无法找到符合条件的队列会把消息返还给发送者

      false,

      //如果为true当exchange发送到队列的消息如果没有绑定消费者

//会返还给发送者

      false,

      amqp.Publishing{

ContentType:"text/plain",

        Body:[]byte(message),

      },

      )

}

func(r *RabbitMQ)ConsumeSimple(){

//1.申请队列,如果队列不存在,自动创建,如果存在就跳过

//保证队列存在 消息能发到队列中

  _,err:=r.channel.QueueDeclare(

r.QueueName,

      //是否持久化

      false,

      //是否自动删除

      false,

      //排他性

      false,

      //是否阻塞

      false,

      //额外属性

      nil,

  )

if err!=nil{

fmt.Println(err)

}

//接收消息

  msgs,err:=r.channel.Consume(

r.QueueName,

      //区分多个消费者

      "",

      //是否自动

      true,

      //是否排他性

      false,

      //如果设置为true表示不能将同一个collection发送的消息传递给这个connection消费者

      false,

      //队列消费是否阻塞 false为阻塞

      false,

      nil,

      )

if err !=nil {

fmt.Println(err)

}

forever:=make(chan bool)

//启用写成处理消息

  go func(){

for d:=range msgs{

log.Printf("receive a message%s",d.Body)

}

}()

log.Printf("[*] wating for msg")

<-forever

}

上一篇下一篇

猜你喜欢

热点阅读