MQTT深入浅出系列(一)
mqtt介绍与使用
mqtt协议是轻量级的消息订阅和发布(publish/subscribe)协议,建立在TCP/IP协议之上,物联网、消息推送中用的很多。
如何使用请参考此篇博客,写的通俗易懂。
本人的github地址:
https://github.com/fighter-lee/EasyMqtt
使用
获取源码文件
github上是没有mqtt源码的,虽然我们能从远程依赖上拿到jar包,但是因为java文件编译后成class文件后注释、常量、以及部分方法都发生了一些变化,可读性很差,那我们如何拿源码呢,当然是jcenter了,如图:
在jcenter官网上搜索关键字:eclipse.paho.client.mqttv3
点进去后,找一下以下文件,如图的那个就是java文件(下载下来后改成zip格式,然后解压就好啦)
在此源码上,对mqtt进行封装,便于调用,具体实现请访问我的github:
https://github.com/fighter-lee/EasyMqtt
如何使用
Androidstudio添加如下依赖:
compile 'top.fighter-lee:mqttlibs:1.0.1'
搭建服务器
如果有自己的mqtt服务器的话,请跳过此步骤。
-
请点击,下载Apollo服务器,安装。
-
命令行进入安装目录bin目录下。
D: cd D:\develop\tools\apache-apollo-1.7.1\bin
-
输入apollo create XXX(xxx为创建的服务器实例名称,例:apollo create mybroker),之后会在bin目录下创建名称为XXX的文件夹。XXX文件夹下etc\apollo.xml文件下是配置服务器信息的文件。etc\users.properties文件包含连接MQTT服务器时用到的用户名和密码,默认为admin=password,即账号为admin,密码为password,可自行更改。
-
进入XXX/bin目录,输入apollo-broker.cmd run开启服务器,看到如下界面代表搭建完成
-
在浏览器输入http://127.0.0.1:61680/,查看是否安装成功。
客户端编码
连接
先介绍API吧。
ConnectCommand为连接操作类,可以设置相应属性。
-
setClientId()
设置客户身份唯一标识
-
setServer()
设置建立连接的域名或者服务器ip
-
setPort
设置端口号
-
setUserNameAndPassword
设置连接认证的用户名和密码
-
setKeepAlive
设置保持长连接ping的频率,单位为秒,建议100
-
setTimeout
设置操作超时时间。
-
setCleanSession
设置cleansession,若为true,当 disconnect 时,会移除这个 client 所有的 subscriptions.
-
setSsl
建立ssl长连接,若没有设置的话,默认为tcp长连接。
-
setLastWill
设置遗愿消息,即当设备断开连接时会主动pub的消息。
-
setTraceEnabled
是否打印日志,默认false
-
setTraceCallback
监听日志回调,需要setTraceEnabled(true)
MqttManager.getInstance()
.connect(new ConnectCommand()
.setClientId(getClientId())
.setServer("172.17.3.35")
.setPort(61613)
.setUserNameAndPassword("admin", "password")
.setKeepAlive(30)
.setTimeout(10)
.setCleanSession(false)
, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Trace.d(TAG, "onSuccess() ");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Trace.d(TAG, "onFailure() ");
Trace.e(TAG, exception);
}
});
连接成功后在页面上显示如图:
setUserNameAndPassword,若使用的是Apollo服务器,则默认的用户名是admin,密码是:password。
发送消息
API
-
setMessage
设置消息内容
-
setQos
设置qos,决定消息到达次数。
-
setTopic
设置消息主题
-
setRetained
服务器是否保存消息
MqttManager.getInstance().pub(new PubCommand() .setMessage("哈哈哈,我来了") .setQos(1) .setTopic("/fighter-lee.top/mqttlibs") .setRetained(false), new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { Trace.d(TAG, "onSuccess() "); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { Trace.e(TAG, exception); } });
订阅消息主题
MqttManager.getInstance().sub(new SubCommand()
.setQos(1)
.setTopic("/fighter-lee.top/mqttlibs"), new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Trace.d(TAG, "onSuccess() ");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Trace.e(TAG, exception);
}
});
取消订阅消息主题
MqttManager.getInstance().unSub(new UnsubCommand()
.setTopic("/fighter-lee.top/mqttlibs"), new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Trace.d(TAG, "onSuccess() ");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Trace.e(TAG, exception);
}
});
接收消息
MqttManager.getInstance().registerMessageListener(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
Trace.e(TAG, cause);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Trace.d(TAG, "messageArrived() topic:"+topic);
Trace.d(TAG, "messageArrived() message:"+message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});