JAVA文集我用 Linux程序员

MQTT,RabbitMQ初步使用 - Ubuntu

2017-05-05  本文已影响241人  single430

MQTT

使用源码安装

下载 http://mosquitto.org/files/source/mosquitto-1.4.11.tar.gz
# 解压
tar -zxfv mosquitto-1.4.11.tar.gz
# 进入目录
cd mosquitto-1.4.11
# 编译
make
# 安装
sudo make install

其中会需要一些依赖(编译过程找不到)

* openssl/ssl.h
  sudo apt-get install libssl-dev
* ares.h
  sudo apt-get install libc-ares-dev
* uuid/uuid.h
  sudo apt-get install uuid-dev
* 找不到libmosquitto.so.1文件
  sudo vi /etc/ld.so.conf.d/libc.conf or liblocal.conf
  添加 /usr/local/lib64
  保存,刷新 ldconfig
* 最后我们安装 paho 用于连接MQTT服务
  sudo pip3 install paho-mqtt

接下来就可以进行测试了

* 首先输入 mosquitto -v 来启动MQTT服务```

! /usr/bin/env python3

-- coding: utf-8 --

import json
import time
import random
from datetime import datetime

import paho.mqtt.client as mqtt

HOST = "127.0.0.1"
PORT = 1883
CLIENT = mqtt.Client()

def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc), userdata)
client.subscribe("C0001/#", 1) # 订阅top

def on_message(client, userdata, msg):
if msg:
body = str(msg.payload, encoding="utf-8")
print("数据为: {0}".format(body))
else:
print("数据为空")

def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")

if name == "main":
"""mqtt_test"""

CLIENT.on_connect = on_connect
CLIENT.on_message = on_message
CLIENT.on_disconnect = on_disconnect
try:
    CLIENT.connect(HOST, PORT, 60)
    CLIENT.loop_forever()
except KeyboardInterrupt:
    print("Interrupt received")
    CLIENT.disconnect()
except Exception as error:
    print("error: ", error)
    CLIENT.disconnect()

! /usr/bin/env python3

-- coding: utf-8 --

import time
import random

import paho.mqtt.client as mqtt

HOST = "127.0.0.1"
PORT = 1883
CLIENT = mqtt.Client()

if name == "main":
""""""
while 1:
CLIENT.connect(HOST, PORT, 60)
data_json = {
'id': 'SW0021',
'pid': 'W0003',
}
CLIENT.publish('C0001/{0}/{1}'.format(data_json['pid'], data_json['id']), str(data_json), 1)
CLIENT.disconnect()
time.sleep(3)

之后分别运行两个py文件就可以收到数据了

###RabbitMQ
安装RabbitMQ服务
```sudo apt-get install rabbitmq-server```
安装pika 用于连接RabbitMQ服务
```sudo pip3 install pika```

touch rabbitmq_s.py

! /usr/bin/env python3

-- coding: utf-8 --

import pika

if name == "main":
# 建立一个实例
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost') # 默认端口5672,可不写
)
# 声明一个管道,在管道里发消息
channel = connection.channel()
# 在管道里声明queue
channel.queue_declare(queue='sensor_data', durable=True)
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
send_data = "OK"
channel.basic_publish(exchange='',
routing_key='sensor_data', # queue名字
body=send_data,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)) # 消息内容

print(" [x] Sent: " + send_data)
connection.close()  # 队列关闭

touch rabbimqt_r.py

! /usr/bin/env python3

-- coding: utf-8 --

from datetime import datetime
import pika

def callback(ch, method, properties, body): # 四个参数为标准格式
body = str(body, encoding='utf-8')
print("RabbitMQ: [{0}] Received: {1}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), body))
ch.basic_ack(delivery_tag=method.delivery_tag) # 告诉生成者,消息处理完成
print("Rabbitmq 读取OK")

if name == "main":
# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# 声明管道
channel = connection.channel()

# 为什么又声明了一个‘sensor_data’队列?
# 如果确定已经声明了,可以不声明。但是你不知道那个机器先运行,所以要声明两次。
channel.queue_declare(queue='sensor_data', durable=True)

# RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message, 需要添加更多的Consumer,或者创建更多的virtualHost来细化你的设计。
# channel.basic_qos(prefetch_count=1)

channel.basic_consume(  # 消费消息
    callback,  # 如果收到消息,就调用callback函数来处理消息
    queue='sensor_data',  # 你要从那个队列里收消息
    # no_ack=True  # 写的话,如果接收消息,机器宕机消息就丢了
    # 一般不写。宕机则生产者检测到发给其他消费者
)

print(' [*]RabbotMQ Wait Msg. To exit press CTRL+C')
channel.start_consuming()  # 开始消费消息
之后分别运行两个py文件就可以看到效果了,rabbitmq服务是自动运行的,可以使用 ***service rabbitmq-server xxx ***命令进行控制


文笔不好,还望见谅,本就是个人使用笔记的简单记录

***By: single430***
上一篇下一篇

猜你喜欢

热点阅读