nsq简易使用

2019-04-16  本文已影响0人  funcx
  1. 启动nsqd
  2. 生产者代码tcp
//Nsq发送测试
package main

import (
    "bufio"
    "dog/util/log"
    "fmt"
    "os"

    "github.com/nsqio/go-nsq"
)

var producer *nsq.Producer

// 主函数
func main() {
    strIP1 := "127.0.0.1:4150"
    // strIP2 := "127.0.0.1:4152"
    InitProducer(strIP1)

    running := true

    //读取控制台输入
    reader := bufio.NewReader(os.Stdin)
    for running {
        data, _, _ := reader.ReadLine()
        command := string(data)
        if command == "stop" {
            running = false
        }

        if err := Publish("test", command); err != nil {
            log.Error.Fatal(err)
        }
    }
    //关闭
    producer.Stop()
}

// 初始化生产者
func InitProducer(str string) {
    var err error
    fmt.Println("address: ", str)
    producer, err = nsq.NewProducer(str, nsq.NewConfig())
    if err != nil {
        panic(err)
    }
}

//发布消息
func Publish(topic string, message string) error {
    var err error
    if producer != nil {
        if message == "" { //不能发布空串,否则会导致error
            return nil
        }
        err = producer.Publish(topic, []byte(message)) // 发布消息
        return err
    }
    return fmt.Errorf("producer is nil")
}
  1. 消费者代码tcp
//Nsq接收测试
package main

import (
    "fmt"
    "time"

    "github.com/nsqio/go-nsq"
)

// 消费者
type ConsumerT struct{}

// 主函数
func main() {
    InitConsumer("test", "test-channel", "127.0.0.1:4151")
    select {}
}

//处理消息
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
    return nil
}

//初始化消费者
func InitConsumer(topic string, channel string, address string) {
    cfg := nsq.NewConfig()
    cfg.LookupdPollInterval = time.Second          //设置重连时间
    c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者
    if err != nil {
        panic(err)
    }
    c.SetLogger(nil, 0)        //屏蔽系统日志
    c.AddHandler(&ConsumerT{}) // 添加消费者接口

    //建立NSQLookupd连接
    // if err := c.ConnectToNSQLookupd(address); err != nil {
    //  panic(err)
    // }

    //建立多个nsqd连接
    // if err := c.ConnectToNSQDs([]string{"127.0.0.1:4150", "127.0.0.1:4152"}); err != nil {
    //  panic(err)
    // }

    // 建立一个nsqd连接
    if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil {
        panic(err)
    }
}
上一篇下一篇

猜你喜欢

热点阅读