PHP笔记PHP架构Golang

消息队列NSQ使用

2019-03-13  本文已影响0人  零一间

NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,代码托管在GitHub。NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。另外,官方还提供了拆箱即用Go和Python库。

部署

官网下载地址

安装步骤

# 下载
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
# 解压
tar -zxvf nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
# 启动服务
cd nsq-1.1.0.linux-amd64.go1.10.3/bin/
nohup ./nsqlookupd > /dev/null 2>&1 &
nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &

使用

1、创建一个test主题,并发送一个hello world消息

curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'

2、浏览器访问NSQ的管理界面: http://127.0.0.1:4171/

image.png

3 消费test主题的消息

$ ./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
2019/03/13 11:09:49 INF    1 [test/nsq_to_file] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2019/03/13 11:09:49 INF    1 [test/nsq_to_file] (jinchunguang-TM1701:4150) connecting to nsqd
2019/03/13 11:09:49 INFO: opening /tmp/test.jinchunguang-TM1701.2019-03-13_11.log
2019/03/13 11:09:49 syncing 1 records to disk

$ cat /tmp/test.jinchunguang-TM1701.2019-03-13_11.log
hello world

客户端

生产者用php实现,消费者用go实现

生产者 采用CURL

<?php

$msg="我是世界最好的语言,谁赞成,谁反对!";
$url= "http://127.0.0.1:4151/pub?topic=test";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST");
curl_setopt($ch, CURLOPT_POSTFIELDS, $msg);
curl_setopt($ch, CURLOPT_HTTPHEADER, array(
        'Content-Type: text/html; charset=utf-8',
        'Content-Length: ' . strlen($msg))
);
$output = curl_exec($ch);
if($output === FALSE ){
    echo "CURL Error:".curl_error($ch);
}

执行脚本

$ php producer.php 
OK

消费者 采用go-nsq库实现

下载你所需要的库

go get github.com/nsqio/go-nsq

代码实现

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "sync"
)

func main() {
    testNSQ()
}

type NSQHandler struct {
}

func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
    return nil
}

func testNSQ() {
    url := "127.0.0.1:4150"

    waiter := sync.WaitGroup{}
    waiter.Add(1)

    go func() {
        defer waiter.Done()
        config:=nsq.NewConfig()
        config.MaxInFlight=9

        for i := 0; i<10; i++ {
            consumer, err := nsq.NewConsumer("test", "struggle", config)
            if nil != err {
                fmt.Println("err", err)
                return
            }

            consumer.AddHandler(&NSQHandler{})
            err = consumer.ConnectToNSQD(url)
            if nil != err {
                fmt.Println("err", err)
                return
            }
        }
        select{}
    }()

    waiter.Wait()
}

执行代码

$ go run consumer.go 
2019/03/13 13:24:43 INF    1 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF    2 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF    3 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF    4 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF    5 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF    6 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF    7 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF    8 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF    9 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF   10 [test/struggle] (127.0.0.1:4150) connecting to nsqd
receive 127.0.0.1:4150 message: 我是世界最好的语言,谁赞成,谁反对!

Nsql官网
快速入门
高性能消息队列NSQ
我们是如何使用NSQ处理7500亿消息的
消息中间件NSQ深入与实践

上一篇下一篇

猜你喜欢

热点阅读