mqtt发布端和服务端分别用 python和go两种语言实现

2023-10-21  本文已影响0人  yichen_china

aclfile权限配置如下

aclfile.conf文件内容如下

user1 和user2 是用户名
user1 设置为订阅权限,并且只能访问的主题为"root/topic/#"
user2 设置为发布权限,并且只能访问的主题为"root/topic/#"
user3 设置超级为发布+订阅权限
test 订阅主题
使用中订阅 topic\test

user user1
topic read root/topic/#

user user2
topic write root/topic/#

user use3
topic write $SYS/#
topic read $SYS/#

pwfile 用户名密码配置内容如下

这个是用命令生成的
mosquitto_passwd -b pwdfile user1 123456 (user1 是账号,123456是密码)

user1:$7$101$XkqkC2wVNfJY8jbl$oFhDmilRTROUDIGy4DtQQluNa32GNxS4iZEaGNXUF3hpynXCwxISbU3mPVSJu0HUtGUDlPtWcQoHrx3wBBDpeg==
user2:$7$101$NR8smdq5yB4ONq9I$8CYxS/WqBTuDDnXIbHpmX5kbfokeqU52Cp2h5E8S3PrKk+uJYPtL+/+m4cT4iu9VBXjgI9h3wHm9sgSiIkirZQ==

一、 python实现

利用python paho编写 mqtt发布端和服务端
PYTHON服务端程序 sub.py

import paho.mqtt.client as mqtt


broker = '127.0.0.1'
port = 1883
topic = "root/topic"

# 连接的回调函数
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    client.subscribe(topic)

# 收到消息的回调函数
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker, port, 60)
client.loop_forever()

PYTHON发布端程序 pub.py

import paho.mqtt.client as mqtt
import time

broker = '127.0.0.1'
port = 1883
topic = "root/topic"

def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")

client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker, port, 60)
for i in range(60):
    client.publish(topic, payload=i, qos=0, retain=False)
    print(f"send {i} to a/b{topic}")
    time.sleep(1)

client.loop_forever()

启动 服务端程序
python sub.py
启动 发布端程序
python pub.py

二、 go实现

在Golang中,我们可以使用第三方库实现MQTT功能。下面以Eclipse Paho MQTT库为例

首先,需要安装Paho MQTT库。可以使用以下命令进行安装:

go get github.com/eclipse/paho.mqtt.golang

#common.go
package main

import (
   "fmt"
   mqtt "github.com/eclipse/paho.mqtt.golang"
)

var broker = "127.0.0.1"
var port = 1883
var userName1 = "user1"
var passwd1 = "123456"
var userName2 = "user2"
var passwd2 = "123456"

var topic = "root/topic"

func sub(client mqtt.Client, producer bool) {
   token := client.Subscribe(topic, 1, nil)
   token.Wait()
   if producer {
       fmt.Printf("Producer subscribed to topic %s", topic)
   } else {
       fmt.Printf("Consumer subscribed to topic %s", topic)
   }
}

MQTT发布者

以下代码实现一个简单的MQTT发布者

#producer.go
package main

import (
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "time"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("Producer Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
    fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    fmt.Printf("Connect lost: %v", err)
}

func producerPoint() {
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
    opts.SetClientID("go_mqtt_producer")
    opts.SetUsername(userName2)
    opts.SetPassword(passwd2)
    opts.SetKeepAlive(8 * time.Second)
    opts.SetDefaultPublishHandler(messagePubHandler)
    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    sub(client, true)
    publish(client)

    time.Sleep(30 * time.Second)
    client.Disconnect(250)
}

func publish(client mqtt.Client) {
    num := 10
    for i := 0; i < num; i++ {
        text := fmt.Sprintf("Message %d", i)
        token := client.Publish(topic, 0, false, text)
        token.Wait()
        time.Sleep(time.Second)
    }
}

以上代码创建了一个MQTT客户端,连接到本地代理服务器(地址为localhost:1883)。然后,指定要发布的主题和消息,并通过client.Publish方法将消息发布到主题上。

下面是一个简单的MQTT订阅者的示例:

MQTT订阅者

# consumer.go
package main

import (
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "time"
)

var messageRecHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("Clenit Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

// 其实consumer既可以收到消息,也可以发送消息
// 作为互联网硬件收集器,采集的环境信息数据(温度、湿度等)发送到broker
// 作为互联网硬件执行器,可以接受broker的消息(执行指令信息,如显示文字、声音等),并根据消息执行硬件行为

func consumerPoint() {
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
    opts.SetClientID("go_mqtt_consumer")
    opts.SetUsername(userName1)
    opts.SetPassword(passwd1)
    opts.SetKeepAlive(8 * time.Second)
    opts.SetDefaultPublishHandler(messageRecHandler)
    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    sub(client, false)
    time.Sleep(30 * time.Second)
    client.Disconnect(250)
}

使用

package main

import "time"

func main() {
    go consumerPoint()
    go producerPoint()
    time.Sleep(30 * time.Second)
}

以上代码创建了一个MQTT客户端,连接到本地代理服务器。然后,通过client.Subscribe方法订阅指定的主题,并在回调函数中处理接收到的消息。

结论:
MQTT是一种在物联网中广泛使用的消息传输协议,具有简单、轻量级和可靠性的特点。在Golang中,可以使用第三方库(如Eclipse Paho MQTT库)来实现MQTT功能。通过发布者和订阅者的示例代码,我们可以看到在Golang中实现MQTT功能是相对简单且易于理解的。

上一篇 下一篇

猜你喜欢

热点阅读