MQTT example
导语
翻译自iot in five days 8.2
Example MQTT TopologyMQTT是什么?
MQTT(formerly MQ Telemetry Transport)是一个基于publish-subscribe的通知协议,位于TCP/IP协议之上。它被设计用来连接远端的“small code footprint”或是带宽受限的网络。
publish-subscribe通知模型要求一个消息中间人broker。broker负责散布消息给基于消息主题的感兴趣的客户端。
MQTT有三个级别的服务质量(QoS):
- QoS 0:broker/client只传送一次消息,没有确认(发送后不管了)
- QoS 1:broker/client至少发送一次消息,会要求确认
- QoS 3:broker/client会通过使用一个四次握手精确传送一次
查看更多关于MQTT的信息
MQTT API
MQTT的实现位于contiki/apps/mqtt文件夹。它利用了tcp-socket库。当前Contiki中实现的MQTT版本支持QoS 0和1。
下面这个函数初始化MQTT引擎,要在其他MQTT函数之前调用。
/*
conn:指向MOTT连接的指针
app_process:指向处理MQTT连接的应用进程
max_segment_size:为MQTT/TCP连接而使用的TCP段大小
*/
mqtt_status_t
mqtt_register(struct mqtt_connection *conn, struct process *app_process,
char *client_id, mqtt_event_callback_t event_callback,
uint16_t max_segment_size)
此函数连接到MQTT broker:
mqtt_status_t
mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port,
uint16_t keep_alive)
此函数与MQTT broker断开连接
void
mqtt_disconnect(struct mqtt_connection *conn)
此函数subscribe MQTT broker上的一个topic。
/*
mid:指向message id的指针
*/
mqtt_status_t
mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic,
mqtt_qos_level_t qos_level)
此函数取消subscribe MQTT broker上的一个topic
mqtt_status_t
mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic)
此函数publish一个MQTT broker topic
/*retain:如果RETAIN flag设置为1,在一个被client发送给server的PUBLISH包中,server一定要存储应用消息和它的QoS,这样才能被传给未来的订阅名与主题名相匹配的subscribers
*/
mqtt_status_t
mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic,
uint8_t *payload, uint32_t payload_size,
mqtt_qos_level_t qos_level, mqtt_retain_t retain)
下面的函数在连接到MQTT brokers时设置client用户
名和密码。
void
mqtt_set_username_password(struct mqtt_connection *conn, char *username,
char *password)
下面的函数设置clients的Last Will topic和信息(payload)。如果Will Flag被设置为1(使用函数)就意味着,如果连接request被接收,一个Will message一定会被存储在server并与网络连接相关联。在网络连接按顺序关闭时,这个Will message一定会被publish。此功能可以在设备与broker断开时获得提醒。
void
mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message,
mqtt_qos_level_t qos)
下面的函数可以用来维护MQTT连接状态,使用mqtt_connected检查节点是否连接到broker,使用mqtt_ready检查连接是否建立,buffer中还有没有空间来publish。
#define mqtt_connected(conn) \
((conn)->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER ? 1 : 0)
#define mqtt_ready(conn) \
(!(conn)->out_queue_full && mqtt_connected((conn)))
上手:MQTT和mosquitto
mosquitto的安装说明请看mosquitto website,本例中使用默认配置。关于边界路由的编译说明请参考之前的博客。
在examples/cc2538dk/mqtt-demo中有实验用例。通过较小的调整(比如移除和cc2538相关的代码)应该也可以在z1 mote上。本利主要基于IBM快速入门版本,更多信息可见README文件,或参考IBM MQTT doc。
在project-conf.h文件中,IPv6 broker地址定义如下:
#define MQTT_DEMO_BROKER_IP_ADDR "aaaa"::1
mosquitto broker默认绑定主机的IPv4/IPv6地址,如果使用tunslip6脚本的aaaa::1/64地址,它应该匹配MQTT_DEMO_BROKER_IP_ADDR的定义。
本例主要:
- 发布信息到一个MQTT broker
- 订阅一个topic并接收来自MQTT broker的命令。
注:PUBLISH_TRIGGER也被映射到用户按钮,可以触发一个publish事件。
MQTT客户端配置的数据结构如下:
/**
* \brief Data structure declaration for the MQTT client configuration
*/
typedef struct mqtt_client_config {
char org_id[CONFIG_ORG_ID_LEN];//唯一的组织ID
char type_id[CONFIG_TYPE_ID_LEN];//设备类型
char auth_token[CONFIG_AUTH_TOKEN_LEN];//授权代表(如果要求)
char event_type_id[CONFIG_EVENT_TYPE_ID_LEN];//默认事件类型
char broker_ip[CONFIG_IP_ADDR_STR_LEN];//broker IPv6地址
char cmd_type[CONFIG_CMD_TYPE_LEN];//默认的命令类型
clock_time_t pub_interval;//发布间隔周期
int def_rt_ping_interval;//周期性ping父节点和取RSSI值
uint16_t broker_port;//broker默认端口,默认是1883
} mqtt_client_config_t;
定义和初始化如下:
static mqtt_client_config_t conf;
static int
init_config()
{
/* Populate configuration with default values */
memset(&conf, 0, sizeof(mqtt_client_config_t));
memcpy(conf.org_id, DEFAULT_ORG_ID, strlen(DEFAULT_ORG_ID));
memcpy(conf.type_id, DEFAULT_TYPE_ID, strlen(DEFAULT_TYPE_ID));
memcpy(conf.auth_token, DEFAULT_AUTH_TOKEN, strlen(DEFAULT_AUTH_TOKEN));
memcpy(conf.event_type_id, DEFAULT_EVENT_TYPE_ID,
strlen(DEFAULT_EVENT_TYPE_ID));
memcpy(conf.broker_ip, broker_ip, strlen(broker_ip));
memcpy(conf.cmd_type, DEFAULT_SUBSCRIBE_CMD_TYPE, 1);
conf.broker_port = DEFAULT_BROKER_PORT;
conf.pub_interval = DEFAULT_PUBLISH_INTERVAL;
conf.def_rt_ping_interval = DEFAULT_RSSI_MEAS_INTERVAL;
return 1;
}
这个应用例程本身可以被理解为一个有限状态机,尽管看起来复杂但是很直接,mqtt_demo_process开启如下:
PROCESS_THREAD(mqtt_demo_process, ev, data)
{
PROCESS_BEGIN();
printf("MQTT Demo Process\n");
if(init_config() != 1) {//如前所述,不为1则初始化失败
PROCESS_EXIT();
}
update_config();//创建client ID,publish和subscribe topics.初始状态STATE_INIT被设置,publish_periodic_timer事件被安排
def_rt_rssi = 0x8000000;
uip_icmp6_echo_reply_callback_add(&echo_reply_notification, echo_reply_handler);//当一个ping事件发生时,注册事件回调
etimer_set(&echo_request_timer, conf.def_rt_ping_interval);//开启周期ping定时器
/* Main loop */
while(1) {
PROCESS_YIELD();
//通过用户按钮,尝试从STATE_ERROR中恢复
if(ev == sensors_event && data == PUBLISH_TRIGGER) {
if(state == STATE_ERROR) {
connect_attempt = 1;
state = STATE_REGISTERED;
}
}
if((ev == PROCESS_EVENT_TIMER && data == &publish_periodic_timer) ||
ev == PROCESS_EVENT_POLL ||
(ev == sensors_event && data == PUBLISH_TRIGGER)) {
//处理publish_periodic和button事件,这是应用实际开始的地方
state_machine();
}
if(ev == PROCESS_EVENT_TIMER && data == &echo_request_timer) {
//当周期ping定时器结束,ping the parent
ping_parent();
etimer_set(&echo_request_timer, conf.def_rt_ping_interval);
}
}
PROCESS_END();
}
当construct_client_id第一次被STATE_INIT调用,state_machine就被调用。
static void
state_machine(void)
{
switch(state) {
case STATE_INIT://进入点,注册mqtt连接,进入STATE_REGISTERED事件
/* If we have just been configured register MQTT connection */
mqtt_register(&conn, &mqtt_demo_process, client_id, mqtt_event,
MAX_TCP_SEGMENT_SIZE);
/*
* If we are not using the quickstart service (thus we are an IBM
* registered device), we need to provide user name and password
*/
if(strncasecmp(conf.org_id, QUICKSTART, strlen(conf.org_id)) != 0) {
if(strlen(conf.auth_token) == 0) {
printf("User name set, but empty auth token\n");
state = STATE_ERROR;
break;
} else {
mqtt_set_username_password(&conn, "use-token-auth",
conf.auth_token);
}
}
/* _register() will set auto_reconnect. We don't want that. */
conn.auto_reconnect = 0;
connect_attempt = 1;
state = STATE_REGISTERED;
DBG("Init\n");
/* Continue */
//企图连接broker,如果节点没有加入网络(没有一个有效的IPv6全局地址)它将稍后尝试。
//如果节点有有效地址那么调用mqtt_connect函数并设置状态为STATE_CONNECTING,
//然后设置publish_periodic_timer为更快的速度
case STATE_REGISTERED:
if(uip_ds6_get_global(ADDR_PREFERRED) != NULL) {
/* Registered and with a public IP. Connect */
DBG("Registered. Connect attempt %u\n", connect_attempt);
ping_parent();
connect_to_broker();
} else {
leds_on(STATUS_LED);
ctimer_set(&ct, NO_NET_LED_DURATION, publish_led_off, NULL);
}
etimer_set(&publish_periodic_timer, NET_CONNECT_PERIODIC);
return;
break;
//这个事件通知用户连接企图。当MQTT到broker的连接建立后,MQTT_EVENT_CONNECTED
//在mqtt_event回调函数中被触发
case STATE_CONNECTING:
leds_on(STATUS_LED);
ctimer_set(&ct, CONNECTING_LED_DURATION, publish_led_off, NULL);
/* Not connected yet. Wait */
DBG("Connecting (%u)\n", connect_attempt);
break;
//当连接建立后,开始发布,如果不用IBM的quickstart,可以跳过改变状态
//到STATE_PUBLUSHING
case STATE_CONNECTED:
/* Don't subscribe unless we are a registered device */
if(strncasecmp(conf.org_id, QUICKSTART, strlen(conf.org_id)) == 0) {
DBG("Using 'quickstart': Skipping subscribe\n");
state = STATE_PUBLISHING;
}
/* Continue */
//检查在mqtt_ready中MQTT连接是否OK,然后subscribe和publish
case STATE_PUBLISHING:
/* If the timer expired, the connection is stable. */
if(timer_expired(&connection_life)) {
/*
* Intentionally using 0 here instead of 1: We want RECONNECT_ATTEMPTS
* attempts if we disconnect after a successful connect
*/
connect_attempt = 0;
}
if(mqtt_ready(&conn) && conn.out_buffer_sent) {
/* Connected. Publish */
if(state == STATE_CONNECTED) {
subscribe();
state = STATE_PUBLISHING;
} else {
leds_on(STATUS_LED);
ctimer_set(&ct, PUBLISH_LED_ON_DURATION, publish_led_off, NULL);
publish();
}
etimer_set(&publish_periodic_timer, conf.pub_interval);
DBG("Publishing\n");
/* Return here so we don't end up rescheduling the timer */
return;
} else {
/*
* Our publish timer fired, but some MQTT packet is already in flight
* (either not sent at all, or sent but not fully ACKd).
*
* This can mean that we have lost connectivity to our broker or that
* simply there is some network delay. In both cases, we refuse to
* trigger a new message and we wait for TCP to either ACK the entire
* packet after retries, or to timeout and notify us.
*/
DBG("Publishing... (MQTT state=%d, q=%u)\n", conn.state,
conn.out_queue_full);
}
break;
//由MQTT_EVENT_DISCONNECTED触发,处理任意取消连接事件
case STATE_DISCONNECTED:
DBG("Disconnected\n");
if(connect_attempt < RECONNECT_ATTEMPTS ||
RECONNECT_ATTEMPTS == RETRY_FOREVER) {
/* Disconnect and backoff */
clock_time_t interval;
mqtt_disconnect(&conn);
connect_attempt++;
interval = connect_attempt < 3 ? RECONNECT_INTERVAL << connect_attempt :
RECONNECT_INTERVAL << 3;
DBG("Disconnected. Attempt %u in %lu ticks\n", connect_attempt, interval);
etimer_set(&publish_periodic_timer, interval);
state = STATE_REGISTERED;
return;
} else {
/* Max reconnect attempts reached. Enter error state */
state = STATE_ERROR;
DBG("Aborting connection after %u attempts\n", connect_attempt - 1);
}
break;
//停止应用,只允许有效配置的值
case STATE_CONFIG_ERROR:
/* Idle away. The only way out is a new config */
printf("Bad configuration.\n");
return;
//默认事件处理,停止定时器,do nothing
case STATE_ERROR:
default:
leds_on(STATUS_LED);
/*
* 'default' should never happen.
*
* If we enter here it's because of some error. Stop timers. The only thing
* that can bring us out is a new config event
*/
printf("Default case: State=0x%02x\n", state);
return;
}
/* If we didn't return so far, reschedule ourselves */
etimer_set(&publish_periodic_timer, STATE_MACHINE_PERIODIC);
}
publish函数创造了要被published的字符串数据。下面函数片段中高亮的是最重要的部分,例程周期性地发布下面的信息:
- 设备名
- 一个增长的序列号
- 设备运行时间(以s为单位)
- 片上温度
- 电量
mqtt_publish更新了MQTT broker的值,发布到特定的pub_topic。默认的topic是construct_pub_topic中完成的iot-2/evt/status/fmt/json。这个topic依照IBM的格式,也可以按其他格式改变。
当从subscribe的topic中接收事件时,MQTT_EVENT_PUBLISH事件被触发,pub_handler被调用。这个例子允许我们打开或关闭红色LED。
例程subscribe的默认topic是iot-2/cmd/+/fmt/json,明确地改变LED状态我们需要publish到iot-2/cmd/leds/fmt/json topic,值1代表打开灯,值0代表关闭。
static void
pub_handler(const char *topic, uint16_t topic_len, const uint8_t *chunk,
uint16_t chunk_len)
{
DBG("Pub Handler: topic='%s' (len=%u), chunk_len=%u\n", topic, topic_len,
chunk_len);
/* If we don't like the length, ignore */
if(topic_len != 23 || chunk_len != 1) {
printf("Incorrect topic or chunk len. Ignored\n");
return;
}
/* If the format != json, ignore */
if(strncmp(&topic[topic_len - 4], "json", 4) != 0) {
printf("Incorrect format\n");
}
if(strncmp(&topic[10], "leds", 4) == 0) {
if(chunk[0] == '1') {
leds_on(LEDS_RED);
} else if(chunk[0] == '0') {
leds_off(LEDS_RED);
}
return;
}
}