Go消息中间件Nsq系列(一)------初识Nsq
-
Nsq简介
-
1.1. 是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件。
1.2. 去中心化,分布式易部署,易水平扩展,高可用,消除单点故障,内置服务发现,并提供相对友好的Web管理UI
1.3. 官方提供详细架构设计说明, 使用文档.性能测试等
1.4. 多语言支持(Go,Python,Java等)
-
Nsq 三大组件
-
2.1 nsqd:
负责接收消息,存储消息,分发消息给客户端,nsqd可以单独部署,也可以多节点部署,主要监听了两个端口,一个用来服务客户端(4150),一个用来提供api服务(4151).当然也可以配置监听https(4152)端口
2.2 nsqlookupd:
主要负责服务发现,nsqd的心跳、状态监测,给客户端、nsqadmin提供nsqd地址与状态,主要监听端口(4160)服务客户端,端口(4161)提供api服务.
2.3 nsqadmin:
是nsq的web后台管理,比如节点管理,topic管理,实时消息状态等,主要监听端口(4171)提供web服务
-
一些注意的问题
-
3.1 Producer与nsqd是1:1关系,nsqd与topic是1:N关系,nsqlookupd与nqsd关系可以是M:N
3.2 Consumer可以指定单个nsqd,或者多个nsqd地址,也可以通过单个或者多个nsqlookupd去匹配对应的topic+channel的nsqd去消费消息
3.3 Consumer第一次通过nsqlookupd,会重试三次, 然后就是根据LookupdPollInterval(60s)时间轮询或者防抖动算法LookupdPollJitter(0.3默认)去进行重试查询
3.4 Producer发布消息没有topic会新建,如果先Consumer使用nsqlookupd去寻找消费,也就是没有创建topic之前会无法连接,重试3.3步骤
3.5 Consumer退出,channel不会自动删除, 多个nsqd服务都有相同的topic的时候,需要修改默认的config.MaxInflight才能连接,代表一次性可以接受多少条消息.
3.6 channel和topic的命名都有限制,正则匹配如下^[\.a-zA-Z0-9_-]+(#ephemeral)?$
3.7 多个Consumer消费channel数据是随机的,无序. 消息至少投递一次,可能会重复投递
3.8 nsqd之间不会扩散消息. 但是topic的消息会分发给下面所有channel,但一个channel如果有多个消费者,消息会随机发送给其中一个消费者
3.9 nsq延时消息最长是一小时(60min)
-
有赞自研版Nsq
-
4.1 有赞的自研版 NSQ 在高可用性以及负载均衡方面进行了改造,自研版的 nsqd 中引入了数据分区以及副本,副本保存在不同的 nsqd 上,达到容灾目的。此外,自研版 NSQ 在原有 Protocol Spec 基础上进行了拓展,支持基于分区的消息生产、消费,以及基于消息分区的有序消费,以及消息追踪功能。
-
Nsq 在Go简单使用
-
5.1 docker安装nsq,具体使用以下docker-compose.yml,然后再
docker-compose up -d
启动,不需要的话就是用docker-compose down
停止并删除容器
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "4160:4160"
- "4161:4161"
nsqd:
image: nsqio/nsq
# 广播地址不填的话默认就是oshostname, 那样子在程序lookupd 连接不上
command: /nsqd --broadcast-address=192.168.1.103 --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "4150:4150"
- "4151:4151"
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- "4171:4171"
5.2 Go的测试代码如下
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"log"
"math"
"time"
)
// nsq demo
var topicName = "nsq_test"
func main() {
host := "localhost:4150"
discoverAddr := "localhost:4161"
go producer(host)
go consumer_nsqd(discoverAddr)
go consumer_nsqd2(host)
for{}
}
func producer(addr string){
// 1:1 nsqd:producer 一比一的关系
producer,err := nsq.NewProducer(addr,nsq.NewConfig())
if err != nil {
log.Fatal(err)
}
for i := 1; i< math.MaxInt64 ; i++ {
// 直接发布消息
_ = producer.Publish(topicName,[]byte(fmt.Sprintf("%s%d",topicName,i+1)))
// 发布延时消息
//_ = producer.DeferredPublish()
// 发送消息数组
//producer.MultiPublish()
// 还有异步发送, doneChan通知
//producer.DeferredPublishAsync()
time.Sleep(time.Second*10)
}
}
// 通过服务发现, 根据topicName,Channel作为key去寻找对应的nsqd去连接
func consumer_nsqd(addr string){
consumer,err := nsq.NewConsumer(topicName,"default",nsq.NewConfig())
if err != nil {
log.Println(err)
}
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error{
log.Println("nsqd1",message.Timestamp,message.NSQDAddress,string(message.Body))
return nil
}))
err = consumer.ConnectToNSQLookupd(addr)
if err != nil {
log.Println(err)
}
}
// 直连方式
func consumer_nsqd2(addr string){
consumer,err := nsq.NewConsumer(topicName,"spec",nsq.NewConfig())
if err != nil {
log.Fatal(err)
}
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error{
log.Println("nsqd2",message.Timestamp,message.NSQDAddress,string(message.Body))
return nil
}))
err = consumer.ConnectToNSQD(addr)
if err != nil {
log.Fatal(err)
}
}
输出结果:
2019/06/26 23:16:40 nsqd2 1561562080041909815 localhost:4150 nsq_test481
2019/06/26 23:16:40 nsqd2 1561562090046390841 localhost:4150 nsq_test482
2019/06/26 23:16:40 nsqd2 1561562100049180666 localhost:4150 nsq_test483
2019/06/26 23:16:40 nsqd2 1561562110055842132 localhost:4150 nsq_test484
2019/06/26 23:16:40 nsqd2 1561562120060210535 localhost:4150 nsq_test485
2019/06/26 23:16:40 nsqd2 1561562130066839086 localhost:4150 nsq_test486
2019/06/26 23:16:40 nsqd2 1561562140073121280 localhost:4150 nsq_test487
2019/06/26 23:16:40 nsqd2 1561562150076661111 localhost:4150 nsq_test488
2019/06/26 23:16:40 nsqd2 1561562160078242317 localhost:4150 nsq_test489
2019/06/26 23:16:40 nsqd2 1561562170081163554 localhost:4150 nsq_test490
2019/06/26 23:16:40 nsqd2 1561562180083818010 localhost:4150 nsq_test491
2019/06/26 23:16:40 nsqd2 1561562190088318866 localhost:4150 nsq_test492
2019/06/26 23:16:40 nsqd2 1561562200159782512 localhost:4150 nsq_test2