MQTT简介
早在1999年,IBM的Andy Stanford-Clark博士以及Arcom公司ArlenNipper博士发明了MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议。
通过MQTT协议,目前已经扩展出了数十个MQTT服务器端程序,可以通过PHP,JAVA,Python,C,C#等系统语言来向MQTT发送相关消息。
此外,国内很多企业都广泛使用MQTT作为Android手机客户端与服务器端推送消息的协议。其中Sohu,Cmstop手机客户端中均有使用到MQTT作为消息推送消息。随着移动互联网的发展,MQTT由于开放源代码,耗电量小等特点,将会在移动消息推送领域会有更多的贡献,在物联网领域,传感器与服务器的通信,信息的收集,MQTT都可以作为考虑的方案之一。在未来MQTT会进入到我们生活的各各方面。
如果需要下载MQTT服务器端,可以直接去MQTT官方网站点击software进行下载MQTT协议衍生出来的各个不同版本。
mqtt-and-iotMQTT特点
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
- 对负载内容屏蔽的消息传输;
- 使用 TCP/IP 提供网络连接;
- 有三种消息发布服务质量:
- “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
- “至少一次”,确保消息到达,但消息重复可能会发生。
- “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
消息与主题 Topic
MQTT发布者与订阅者之间通过”主题”(Topic)进行消息推送,主题(Topic)格式类似URL或文件路径,通过”/“分隔层级,
例如:
sensor/1/temperature
chat/room/subject
presence/user/feng
sensor/1/#
sensor/+/temperature
uber/drivers/joe/inbox
MQTT主题(Topic)支持’+’, ‘#’的通配符,’+’通配一个层级,’#’通配多个层级(必须在末尾)。
MQTT消息发布者(Publisher)只能向特定’名称主题’(不支持通配符)发布消息,订阅者(Subscriber)
通过订阅’过滤主题’(支持通配符)来匹配消息。
注意:必须要转义’+’, ‘#’。
MQTT支持推送消息的QoS(服务质量)。在MQTT 中有三个等级的 QoS:
- QoS 0: 该等级表示“最多一次”交付(最佳状况)。 消息不会得到确认,因而,这是一种一劳永逸的方法。
- QoS 1: 该等级表示“至少一次”交付。 用户可能不止一次获得消息,但是允许收到的人确认已经收到。
- QoS 2: 该等级表示“刚好一次”交付。最慢但是最有保障的服务质量等级。确保用户只收到一次消息,
并包含四个阶段的交付握手。该等级最慢,但是最安全。
MQTT遗愿消息(Last Will)
MQTT客户端向服务器端CONNECT请求时,可以设置是否发送遗愿消息(Will Message)标志,和遗愿消息主题(Topic)与内容(Payload)。
MQTT客户端异常下线时(客户端断开前未向服务器发送DISCONNECT消息),MQTT消息服务器会发布遗愿消息。
MQTT保留消息(Retained Message)
MQTT客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息(Retained Message)会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息。
保留消息(Retained Message)有两种清除方式:
- 客户端向有保留消息的主题发布一个空消息。
- 消息服务器设置保留消息的超期时间。
共享订阅(Shared Subscription)
共享订阅是非标准MQTT约定,不过大多数开源MQTT服务器已经实现。
共享订阅(Shared Subscription)支持在多订阅者间采用分组负载平衡方式派发消息:
也就是订阅相同共享订阅(Shared Subscription)的客户根据负载平衡方式只有1人收到。
这个场景可以用于:集群执行分布式任务(1个人接了活,其他人就不要接了)。
---------
| | --Msg1--> Subscriber1
Publisher--Msg1,Msg2,Msg3-->| EMQ | --Msg2--> Subscriber2
| | --Msg3--> Subscriber3
---------
共享订阅支持两种使用方式:
订阅前缀 | 使用示例 |
---|---|
$queue/ |
mosquitto_sub -t ‘$queue/topic’ |
$share/<group>/ |
mosquitto_sub -t ‘$share/group/topic’ |
本地订阅(Local Subscription)
本地订阅也是非标准MQTT约定。
本地订阅(Local Subscription)只在本节点创建订阅与路由表,不会在集群节点间广播全局路由,非常适合物联网数据采集应用:
mosquitto_sub -t '$local/topic'
mosquitto_pub -t 'topic'
使用方式: 订阅者在主题(Topic)前增加’$local/’前缀。
MQTT会话(Clean Session)
MQTT支持Session(会话),当MQTT客户端向服务器发起CONNECT请求时,可以通过’Clean Session’标志设置会话。
‘Clean Session’设置为0,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。
‘Clean Session’设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动清除销毁。
MQTT 方法定义很简单:
- 连接 - 建立与 MQTT 经纪人(Broker)之间的连接。
- 断开连接 - 断开与 MQTT 经纪人(Broker)之间的连接。
- 发布 - 在 MQTT 经纪人(Broker)上发布主题。
- 订阅 - 从 MQTT 经纪人(Broker)上订阅主题。
- 退订 - 从 MQTT 经纪人(Broker)上退订主题。
MQTT 服务
要尝试使用MQTT那么首先是需要一台MQTT服务器,两个途径:
- 自建MQTT服务,可以在自己的Web服务器上部署一个Eclipse Mosquitto™。我在国内找到一个叫EMQ的服务器貌似讲得很强大也不妨下载试试。再或者去GitHub搜索一下也会找到一大堆的MQTT服务器的源代码。
- 使用现成的MQTT服务,我个人比较推荐采取此种方式,入手快而且省不少时间商用时也只是给点钱就完事了。国内在百度云的IoT接入产品中就有。如果只是想测试一下MQTT的话推荐使用 Eclipse 的 MQTT 沙盒。
ESP8266 的MQTT客户端实现
在开始之前你要先在Arduino的库管理中先安装Adafruit MQTT Library
Adafruit MQTT Library当然不一定非要使用这个库,包括亚马逊和微软等的大公司都提供了一些现成的库直接访问他们的MQTT云服务。
#include <ESP8266WiFi.h>
#include "Adafruit_MQTT.h"
#include "Adafruit_MQTT_Client.h"
/************************* WiFi Access Point *********************************/
#define WLAN_SSID "...your SSID..."
#define WLAN_PASS "...your password..."
/************************* Adafruit.io Setup *********************************/
#define AIO_SERVER "io.adafruit.com"
#define AIO_SERVERPORT 1883 // use 8883 for SSL
#define AIO_USERNAME "...your AIO username (see https://accounts.adafruit.com)..."
#define AIO_KEY "...your AIO key..."
/************ Global State (you don't need to change this!) ******************/
// Create an ESP8266 WiFiClient class to connect to the MQTT server.
WiFiClient client;
// or... use WiFiFlientSecure for SSL
//WiFiClientSecure client;
// Setup the MQTT client class by passing in the WiFi client and MQTT server and login details.
Adafruit_MQTT_Client mqtt(&client, AIO_SERVER, AIO_SERVERPORT, AIO_USERNAME, AIO_KEY);
/****************************** Feeds ***************************************/
// Setup a feed called 'photocell' for publishing.
// Notice MQTT paths for AIO follow the form: <username>/feeds/<feedname>
Adafruit_MQTT_Publish photocell = Adafruit_MQTT_Publish(&mqtt, AIO_USERNAME "/feeds/photocell");
// Setup a feed called 'onoff' for subscribing to changes.
Adafruit_MQTT_Subscribe onoffbutton = Adafruit_MQTT_Subscribe(&mqtt, AIO_USERNAME "/feeds/onoff");
/*************************** Sketch Code ************************************/
// Bug workaround for Arduino 1.6.6, it seems to need a function declaration
// for some reason (only affects ESP8266, likely an arduino-builder bug).
void MQTT_connect();
void setup() {
Serial.begin(115200);
delay(10);
Serial.println(F("Adafruit MQTT demo"));
// Connect to WiFi access point.
Serial.println(); Serial.println();
Serial.print("Connecting to ");
Serial.println(WLAN_SSID);
WiFi.begin(WLAN_SSID, WLAN_PASS);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println();
Serial.println("WiFi connected");
Serial.println("IP address: "); Serial.println(WiFi.localIP());
// Setup MQTT subscription for onoff feed.
mqtt.subscribe(&onoffbutton);
}
uint32_t x=0;
void loop() {
// Ensure the connection to the MQTT server is alive (this will make the first
// connection and automatically reconnect when disconnected). See the MQTT_connect
// function definition further below.
MQTT_connect();
// this is our 'wait for incoming subscription packets' busy subloop
// try to spend your time here
Adafruit_MQTT_Subscribe *subscription;
while ((subscription = mqtt.readSubscription(5000))) {
if (subscription == &onoffbutton) {
Serial.print(F("Got: "));
Serial.println((char *)onoffbutton.lastread);
}
}
// Now we can publish stuff!
Serial.print(F("\nSending photocell val "));
Serial.print(x);
Serial.print("...");
if (! photocell.publish(x++)) {
Serial.println(F("Failed"));
} else {
Serial.println(F("OK!"));
}
// ping the server to keep the mqtt connection alive
// NOT required if you are publishing once every KEEPALIVE seconds
/*
if(! mqtt.ping()) {
mqtt.disconnect();
}
*/
}
// Function to connect and reconnect as necessary to the MQTT server.
// Should be called in the loop function and it will take care if connecting.
void MQTT_connect() {
int8_t ret;
// Stop if already connected.
if (mqtt.connected()) {
return;
}
Serial.print("Connecting to MQTT... ");
uint8_t retries = 3;
while ((ret = mqtt.connect()) != 0) { // connect will return 0 for connected
Serial.println(mqtt.connectErrorString(ret));
Serial.println("Retrying MQTT connection in 5 seconds...");
mqtt.disconnect();
delay(5000); // wait 5 seconds
retries--;
if (retries == 0) {
// basically die and wait for WDT to reset me
while (1);
}
}
Serial.println("MQTT Connected!");
}
接下来的这个示例是订阅MQTT上的指定信道上的新消息并执行ESP8266内的回调函数:
#include <ESP8266WiFi.h>
#include "Adafruit_MQTT.h"
#include "Adafruit_MQTT_Client.h"
/************************* WiFi Access Point *********************************/
#define WLAN_SSID "network"
#define WLAN_PASS "password"
/************************* Adafruit.io Setup *********************************/
#define AIO_SERVER "io.adafruit.com"
#define AIO_SERVERPORT 1883
#define AIO_USERNAME "user"
#define AIO_KEY "key"
/************ Global State (you don't need to change this!) ******************/
// Create an ESP8266 WiFiClient class to connect to the MQTT server.
WiFiClient client;
// Setup the MQTT client class by passing in the WiFi client and MQTT server and login details.
Adafruit_MQTT_Client mqtt(&client, AIO_SERVER, AIO_SERVERPORT, AIO_USERNAME, AIO_USERNAME, AIO_KEY);
/****************************** Feeds ***************************************/
// Setup a feed called 'time' for subscribing to current time
Adafruit_MQTT_Subscribe timefeed = Adafruit_MQTT_Subscribe(&mqtt, "time/seconds");
// Setup a feed called 'slider' for subscribing to changes on the slider
Adafruit_MQTT_Subscribe slider = Adafruit_MQTT_Subscribe(&mqtt, AIO_USERNAME "/feeds/slider", MQTT_QOS_1);
// Setup a feed called 'onoff' for subscribing to changes to the button
Adafruit_MQTT_Subscribe onoffbutton = Adafruit_MQTT_Subscribe(&mqtt, AIO_USERNAME "/feeds/onoff", MQTT_QOS_1);
/*************************** Sketch Code ************************************/
int sec;
int min;
int hour;
int timeZone = -4; // utc-4 eastern daylight time (nyc)
void timecallback(uint32_t current) {
// adjust to local time zone
current += (timeZone * 60 * 60);
// calculate current time
sec = current % 60;
current /= 60;
min = current % 60;
current /= 60;
hour = current % 24;
// print hour
if(hour == 0 || hour == 12)
Serial.print("12");
if(hour < 12)
Serial.print(hour);
else
Serial.print(hour - 12);
// print mins
Serial.print(":");
if(min < 10) Serial.print("0");
Serial.print(min);
// print seconds
Serial.print(":");
if(sec < 10) Serial.print("0");
Serial.print(sec);
if(hour < 12)
Serial.println(" am");
else
Serial.println(" pm");
}
void slidercallback(double x) {
Serial.print("Hey we're in a slider callback, the slider value is: ");
Serial.println(x);
}
void onoffcallback(char *data, uint16_t len) {
Serial.print("Hey we're in a onoff callback, the button value is: ");
Serial.println(data);
}
void setup() {
Serial.begin(115200);
delay(10);
Serial.println(F("Adafruit MQTT demo"));
// Connect to WiFi access point.
Serial.println(); Serial.println();
Serial.print("Connecting to ");
Serial.println(WLAN_SSID);
WiFi.begin(WLAN_SSID, WLAN_PASS);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println();
Serial.println("WiFi connected");
Serial.println("IP address: "); Serial.println(WiFi.localIP());
timefeed.setCallback(timecallback);
slider.setCallback(slidercallback);
onoffbutton.setCallback(onoffcallback);
// Setup MQTT subscription for time feed.
mqtt.subscribe(&timefeed);
mqtt.subscribe(&slider);
mqtt.subscribe(&onoffbutton);
}
uint32_t x=0;
void loop() {
// Ensure the connection to the MQTT server is alive (this will make the first
// connection and automatically reconnect when disconnected). See the MQTT_connect
// function definition further below.
MQTT_connect();
// this is our 'wait for incoming subscription packets and callback em' busy subloop
// try to spend your time here:
mqtt.processPackets(10000);
// ping the server to keep the mqtt connection alive
// NOT required if you are publishing once every KEEPALIVE seconds
if(! mqtt.ping()) {
mqtt.disconnect();
}
}
// Function to connect and reconnect as necessary to the MQTT server.
// Should be called in the loop function and it will take care if connecting.
void MQTT_connect() {
int8_t ret;
// Stop if already connected.
if (mqtt.connected()) {
return;
}
Serial.print("Connecting to MQTT... ");
uint8_t retries = 3;
while ((ret = mqtt.connect()) != 0) { // connect will return 0 for connected
Serial.println(mqtt.connectErrorString(ret));
Serial.println("Retrying MQTT connection in 10 seconds...");
mqtt.disconnect();
delay(10000); // wait 10 seconds
retries--;
if (retries == 0) {
// basically die and wait for WDT to reset me
while (1);
}
}
Serial.println("MQTT Connected!");
}
附: MQTT协议中文版