go中Nats基本使用

2021-09-22  本文已影响0人  岑吾

NATS是一个开源的,云原生的消息系统。前面讲过CentOS 7 安装nats server。当NATS作为发布-订阅引擎时,它提供了三种消息传递模式:

下面简单介绍一下Go中实现这三种消息传递。

一、安装

go get github.com/nats-io/nats.go/

二、导入

import "github.com/nats-io/nats.go"

三、连接Nats服务器

// 直接传入nats服务器端口和地址就可以了
nc, _ := nats.Connect("nats://127.0.0.1:4222")

四、消息传递

订阅者

package main

import (
    "fmt"
    "github.com/nats-io/nats.go"
    "os/signal"
    "runtime"
    "syscall"
)

func main() {
    // 连接Nats服务器
    nc, _ := nats.Connect("nats://127.0.0.1:4222")

    // 发布-订阅 模式,异步订阅 test1
    _, _ = nc.Subscribe("test1", func(m *nats.Msg) {
        fmt.Printf("Received a message: %s\n", string(m.Data))
    })

    // 队列 模式,订阅 test2, 队列为queue, test2 发向所有队列,同一队列只有一个能收到消息
    _, _ = nc.QueueSubscribe("test2", "queue", func(msg *nats.Msg) {
        fmt.Printf("Queue a message: %s\n", string(msg.Data))
    })

    // 请求-响应, 响应 test3 消息。
    _, _ = nc.Subscribe("test3", func(m *nats.Msg) {
        fmt.Printf("Reply a message: %s\n", string(m.Data))
        _ = nc.Publish(m.Reply, []byte("I can help!!"))
    })

    // 持续发送不需要关闭
    //_ = nc.Drain()

    // 关闭连接
    //nc.Close()

    // 阻止进程结束而收不到消息
    signal.Ignore(syscall.SIGHUP)
    runtime.Goexit()
}

发布者

import (
    "fmt"
    "github.com/nats-io/nats.go"
    "time"
)

func main() {

    // 连接Nats服务器
    nc, _ := nats.Connect("nats://127.0.0.1:4222")

    // 发布-订阅 模式,向 test1 发布一个 `Hello World` 数据
    _ = nc.Publish("test1", []byte("Hello World"))

    // 队列 模式,发布是一样的,只是订阅不同,向 test2 发布一个 `Hello zngw` 数据
    _ = nc.Publish("test2", []byte("Hello zngw"))

    // 请求-响应, 向 test3 发布一个 `help me` 请求数据,设置超时间3秒,如果有多个响应,只接收第一个收到的消息
    msg, err := nc.Request("test3", []byte("help me"), 3*time.Second)
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Printf("help answer : %s\n", string(msg.Data))
    }

    // 持续发送不需要关闭
    //_ = nc.Drain()

    // 关闭连接
    //nc.Close()
}

五、subject 通配符

nats-server 在管理 subject 的时候是通过’.’ 进行分割的,server 底层是使用 tree module 分层管理 subject. 此处有两个通配符*>

上一篇下一篇

猜你喜欢

热点阅读