nsq消息队列部署以及使用

2019-08-20  本文已影响0人  零一间

NSQ是一个实时的分布式消息平台。它的设计目标是为在多台计算机上运行的松散服务提供一个现代化的基础设施骨架。

NSQ是由3个进程组成的:

1 源码部署

软件下载直接去官网:https://nsq.io/deployment/installing.html

cd /usr/local/nsq-1.1.0.linux-amd64.go1.10.3/bin/
nohup ./nsqlookupd > /dev/null 2>&1 &
nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &

访问 nsqadmin

2 docker部署

获取镜像

docker pull nsqio/nsq

启动容器

~docker run -d --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
450cbab82b8eb491d42bf105185c1022010b4d05e65a04f6c52ba15e1f5af06f
~ docker inspect -f '{{ .NetworkSettings.IPAddress }}' lookupd
172.17.0.2
# --broadcast-address=广播到虚拟机地址
~ docker run -d --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=172.17.0.1 --lookupd-tcp-address=172.17.0.2:4160
3bc0901c8c485c351cfe31b0ef1a4fa32bf6bf148f0d74907afec6cbb1e4a034
~ docker run -d --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin  --lookupd-http-address=172.17.0.2:4161
1d4cb219b862613d42bbc0f0bd7d08146f48a32d4e68abae2073cf28ed765bb0

注意:宿主机防火墙是否有拦截

~ docker ps -a 
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                                                            NAMES
1d4cb219b862        nsqio/nsq           "/nsqadmin --lookupd…"   3 minutes ago       Up 3 minutes        4150-4151/tcp, 4160-4161/tcp, 4170/tcp, 0.0.0.0:4171->4171/tcp   nsqadmin
3bc0901c8c48        nsqio/nsq           "/nsqd --broadcast-a…"   3 minutes ago       Up 3 minutes        4160-4161/tcp, 0.0.0.0:4150-4151->4150-4151/tcp, 4170-4171/tcp   nsqd
450cbab82b8e        nsqio/nsq           "/nsqlookupd"            4 minutes ago       Up 4 minutes        4150-4151/tcp, 4170-4171/tcp, 0.0.0.0:4160-4161->4160-4161/tcp   lookupd

3 docker-compose部署

创建docker-compose.yml


➜  nsq pwd
/root/nsq
➜  nsq vim docker-compose.yml

文件内容如下

version: '2'
services:

  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    networks:
      - nsq-network
    hostname: nsqlookupd
    ports:
      - "4161:4161"
      - "4160:4160"
      
  nsqd:
    image: nsqio/nsq
    # -broadcast-address=宿主机地址 
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 -broadcast-address=172.17.0.1
    depends_on:
      - nsqlookupd
    hostname: nsqd
    networks:
      - nsq-network
    ports:
      - "4151:4151"
      - "4150:4150"
      
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    depends_on:
      - nsqlookupd
    hostname: nsqadmin
    ports:
      - "4171:4171"
    networks:
      - nsq-network
 
networks:
  nsq-network:
    driver: bridge

配置检查

docker-compose config

启动 docker-compose

➜  nsq docker-compose up -d
Starting nsq_nsqlookupd_1_a12f31d6a776 ... done
Starting nsq_nsqd_1_1c0db410157f       ... done
Starting nsq_nsqadmin_1_8c94f3c4a1b7   ... done

image.png

客户端支持的库

https://nsq.io/clients/client_libraries.html

image.png

golang客户端使用

发送消息

方式一

package main

import (
    "bytes"
    "fmt"
    "net/http"
)

func main() {

    httpclient := &http.Client{}
    data := `haha`

    endpoint := fmt.Sprintf("http://127.0.0.1:%d/%s?topic=%s", 4151, "pub", "test")
    req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer([]byte(data)))
    resp, err := httpclient.Do(req)
    if err != nil {
        fmt.Printf(err.Error())
        return
    }
    if resp.StatusCode != 200 {
        fmt.Printf("%s status code: %d", "pub", resp.StatusCode)
    }
    defer resp.Body.Close()

}

方式二:

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "io/ioutil"
    "log"
    "sync"
    "time"
)

var err error

// 推送消息
func main() {

    url := "127.0.0.1:4150"
    topicName := "test"
    config := nsq.NewConfig()

    // new
    producer, err := nsq.NewProducer(url, config)
    if err != nil {
        fmt.Println("nsq.NewProducer", err)
        return
    }
    fmt.Println("nsq.NewProducer", "√")
    defer producer.Stop()

    producer.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags), nsq.LogLevelInfo)
    //  ping
    err = producer.Ping()
    if err != nil {
        fmt.Println("producer.Ping", err)
        return
    }
    fmt.Println("producer.Ping", "√")

    msgCt:=1000
    wg := &sync.WaitGroup{}
    wg.Add(msgCt)
    // 测试10 次
    for i := 0; i < msgCt; i++ {

        // 消息内容
        msg :=  time.Now().Format("0102150405")
        sendMessage(producer, topicName, msg)
        wg.Done()

        time.Sleep(10*time.Millisecond)
        // time.Sleep(1 * time.Second)
    }

    wg.Wait()
    fmt.Println("producer.Push.Status", "ok")
}

// 发送消息
func sendMessage(producer *nsq.Producer, topicName string, msg string) {

    err = producer.Publish(topicName, []byte(msg))
    if err != nil {
        fmt.Println("producer.Publish", err)
        return
    }
    fmt.Println("producer.Publish",msg, "√")

}

消费记录

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "io/ioutil"
    "log"
    "sync"
)


func main() {
    testNSQ()
}

type NSQHandler struct {
}

func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
    return nil
}

const (
    TOPIC   = "test"
    CHANNEL_1 = "consumer_channel_1"
    CHANNEL_2 = "consumer_channel_2"
    URL     = "127.0.0.1:4150"
)

func testNSQ() {

    waiter := sync.WaitGroup{}
    waiter.Add(1)

    go func() {
        defer waiter.Done()

        config := nsq.NewConfig()
        config.MaxInFlight = 10

        for i := 0; i < 10; i++ {
            consumer, err := nsq.NewConsumer(TOPIC, CHANNEL_1, config)
            if nil != err {
                fmt.Println("err", err)
                return
            }
            consumer.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags), nsq.LogLevelInfo)
            consumer.AddHandler(&NSQHandler{})
            err = consumer.ConnectToNSQD(URL)
            if nil != err {
                fmt.Println("err", err)
                return
            }

            fmt.Println(CHANNEL_1,i)
        }
        select {}
    }()


    waiter.Wait()
}

golang实现的demo

上一篇下一篇

猜你喜欢

热点阅读