nats安装与客户端连接

2021-09-05  本文已影响0人  简书网abc

1. 下载与安装服务端程序

下载地址:
https://docs.nats.io/nats-server/installation#downloading-a-release-build
这里推荐下载编译好的二进制安装包,解压缩后放在一个合适的目录即可.

image.png

2. 启动服务端

image.png

配置文件默认没有,可以手工创建一个server.conf 和 auth.conf, 如下所示

// server.conf 内容
listen: 0.0.0.0:4222
include ./auth.conf

// auth.conf 内容
authorization: {
    user: "aa",
    password: "bb"
}
// 配置文件格式和参数请参阅官方网站

3,创建发布者和订阅者程序
使用类库 : https://github.com/nats-io/nats.go go客户端文档地址

// 发布者代码片段
package main
import (
    "fmt"
    "github.com/nats-io/nats.go"
    "log"
)
const (
    DefaultUrl  = "localhost:4222"
    User        = "aa"
    PassWord    = "bb"
)
func main() {
    nc, err := nats.Connect(DefaultUrl, nats.UserInfo(User, PassWord))
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    // Do something with the connection
    err = nc.Publish("foo", []byte("Hello World"))
    if err != nil {
        fmt.Println(err.Error())
    }
}
//订阅者代码片段
package main
import (
    "github.com/nats-io/nats.go"
    "log"
    "runtime"
)
const (
    DefaultUrl  = "localhost:4222"
    User        = "aa"
    PassWord    = "bb"
)
func printMsg(m *nats.Msg, i int) {
    log.Printf("[#%d] Received on [%s]: '%s'", i, m.Subject, string(m.Data))
}

func main() {
    // Connect to NATS
    //nc, err := nats.Connect(*urls, opts...)
    nc, err := nats.Connect(DefaultUrl, nats.UserInfo(User, PassWord))
    if err != nil {
        log.Fatal(err)
    }
    i := 0
    nc.Subscribe("foo", func(msg *nats.Msg) {
        i += 1
        printMsg(msg, i)
    })
    nc.Flush()
    if err := nc.LastError(); err != nil {
        log.Fatal(err)
    }
    runtime.Goexit()
    nc.Close()
}
// 这个是queue模式,
// 在分发消息时,进行负载均衡,随机发送给同一组中的任意一个订阅者,
// 可以随时增加删除订阅者,配合响应的监控数据和统计数据,对下游的业务进行自动伸缩。
// 提高系统的可用性,避免业务在单点处理导致系统瓶颈。

package main

import (
    "flag"
    "log"
    "os"
    "os/signal"
    "time"

    "github.com/nats-io/nats.go"
)

// nats-qsub -s demo.nats.io <subject> <queue>
// nats-qsub -s demo.nats.io:4443 <subject> <queue> (TLS version)

const (
    DefaultUrl  = "localhost:4222"
    User        = "aa"
    PassWord    = "bb"
)
func usage() {
    log.Printf("Usage: nats-qsub [-s server] [-t] [-h] <subject> <queue>\n")
    flag.PrintDefaults()
}

func showUsageAndExit(exitcode int) {
    usage()
    os.Exit(exitcode)
}

func printMsg(m *nats.Msg, i int) {
    log.Printf("[#%d] Received on [%s] Queue[%s] Pid[%d]: '%s'", i, m.Subject, m.Sub.Queue, os.Getpid(), string(m.Data))
}

func main() {
    var urls = flag.String("s", DefaultUrl, "The nats server URLs (separated by comma)")
    var showTime = flag.Bool("t", false, "Display timestamps")
    var showHelp = flag.Bool("h", false, "Show help message")

    log.SetFlags(0)
    flag.Usage = usage
    flag.Parse()

    if *showHelp {
        showUsageAndExit(0)
    }

    args := flag.Args()
    if len(args) != 2 {
        showUsageAndExit(1)
    }
    println("参数数量: ", len(args))

    // Connect Options.
    opts := []nats.Option{nats.Name("NATS Sample Queue Subscriber"), nats.UserInfo(User, PassWord)}
    opts = setupConnOptions(opts)

    nc, err := nats.Connect(*urls, opts...)
    if err != nil {
        log.Fatal(err)
    }

    subj, queue, i := args[0], args[1], 0

    nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
        i++
        printMsg(msg, i)
    })
    nc.Flush()

    if err := nc.LastError(); err != nil {
        log.Fatal(err)
    }

    log.Printf("Listening on [%s], queue group [%s]", subj, queue)
    if *showTime {
        log.SetFlags(log.LstdFlags)
    }

    // Setup the interrupt handler to drain so we don't miss
    // requests when scaling down.
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    <-c
    log.Println()
    log.Printf("Draining...")
    nc.Drain()
    log.Fatalf("Exiting")
}

func setupConnOptions(opts []nats.Option) []nats.Option {
    totalWait := 10 * time.Minute
    reconnectDelay := time.Second

    opts = append(opts, nats.ReconnectWait(reconnectDelay))
    opts = append(opts, nats.MaxReconnects(int(totalWait/reconnectDelay)))
    opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
        log.Printf("Disconnected due to: %s, will attempt reconnects for %.0fm", err, totalWait.Minutes())
    }))
    opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
        log.Printf("Reconnected [%s]", nc.ConnectedUrl())
    }))
    opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
        log.Fatalf("Exiting: %v", nc.LastError())
    }))
    return opts
}
上一篇 下一篇

猜你喜欢

热点阅读