NSQ

2020-08-05  本文已影响0人  强某某

本文参考:https://zhuanlan.zhihu.com/p/37081073

简介

应用场景

NSQ组件介绍

NSQ架构介绍

4.jpg 5.jpg

topic消息的逻辑关键词

topic 是 NSQ 消息发布的 逻辑关键词,可以理解为人为定义的一种消息类型。当程序初次发布带 topic 的消息时,如果 topic 不存在,则会在 nsqd中创建

producer消息的生产者/发布者

channel消息传递的通道

consumer消息的消费者

概叙

NSQ特性介绍

NSQ搭建

6.jpg

解压之后如下


7.jpg

基本使用案例

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/nsqio/go-nsq"
)

// 消费者
type Consumer struct {
}

//处理消息
func (*Consumer) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
    return nil
}

// 主函数
func main() {
    err := initConsumer("order_queue", "first", "127.0.0.1:4161")
    if err != nil {
        fmt.Printf("init consumer failed, err:%v\n", err)
        return
    }
    c := make(chan os.Signal)
    signal.Notify(c, syscall.SIGINT)
    <-c
}

//初始化消费者
func initConsumer(topic string, channel string, address string) error {
    cfg := nsq.NewConfig()
    cfg.LookupdPollInterval = 15 * time.Second     //设置服务发现的轮询时间,例如新的nsq出现
    c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者
    if err != nil {
        return err
    }

    consumer := &Consumer{}
    c.AddHandler(consumer) // 添加消费者接口

    //建立NSQLookupd连接
    if err := c.ConnectToNSQLookupd(address); err != nil {
        return err
    }
    return nil
}

package main

import (
    "bufio"
    "fmt"
    "os"
    "strings"

    "github.com/nsqio/go-nsq"
)

var producer *nsq.Producer

//入口函数
func main() {
    //nsq的地址
    nsqAddress := "127.0.0.1:4150"
    err := initProducer(nsqAddress)
    if err != nil {
        fmt.Printf("init producer failed, err:%v\n", err)
        return
    }
    //读取控制台输入
    reader := bufio.NewReader(os.Stdin)
    for {
        data, err := reader.ReadString('\n')
        if err != nil {
            fmt.Printf("read string failed, err:%v\n", err)
            continue
        }

        data = strings.TrimSpace(data)
        if data == "stop" {
            break
        }

        err = producer.Publish("order_queue", []byte(data))
        if err != nil {
            fmt.Printf("publish message failed, err:%v\n", err)
            continue
        }
        fmt.Printf("publish data:%s succ\n", data)
    }
}

// 初始化生产者
func initProducer(str string) error {
    var err error
    config := nsq.NewConfig()
    producer, err = nsq.NewProducer(str, config)

    if err != nil {
        return err
    }
    return nil
}

总结:由上代码测试时候发现,必须先建立topic和channel,否则生产者发送的消息不会被topic接收,nqsadminui会提示必须创建channel才会显示具体信息。

而且生产者此时是向topic发送消息,而topic下面可以有多个channel,所以这个消息会一个channel一份。而消费者则指定了topic和channel才能订阅消息。

上一篇下一篇

猜你喜欢

热点阅读