Android(安卓)连接MQTT Broker(服务器)
2018-07-21 本文已影响141人
EMQ
准备工作
- 关于 MQTT 协议的基本介绍读者可以阅读EMQ的这篇文章。
- 在使用 MQTT 之前,需要搭建 MQTT 服务器,关于 MQTT 服务器的文章可以EMQ君写的常见 MQTT 服务器的搭建与使用。
添加依赖
MQTT 客户端我们使用官方实现,Eclipse Paho Android Service
Maven
<project ...>
<repositories>
<repository>
<id>Eclipse Paho Repo</id>
<url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
</repository>
</repositories>
...
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.android.service</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
</project>
Gradle
dependencies {
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
配置 AndroidManifest.xml
修改 AndroidManifest.xml
,添加网络权限以及添加后台 service
<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android">
<!-- Permissions the Application Requires -->
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" />
<uses-permission android:name="android.permission.INTERNET" />
<application
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:supportsRtl="true"
android:theme="@style/AppTheme">
<activity android:name=".MainActivity">
<intent-filter>
<action android:name="android.intent.action.MAIN" />
<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>
</activity>
<!-- Mqtt Service -->
<service android:name="org.eclipse.paho.android.service.MqttService">
</service>
</application>
</manifest>
开始使用
初始化
private void initClient() {
String serverURI = "tcp://127.0.0.1:1883";
String clientId = "your client id";
mClient = new MqttAndroidClient(getApplicationContext(), serverURI, clientId);
mClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
}
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message) {
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
}
回调方法说明
public void connectComplete(boolean reconnect, String serverURI)
连接成功
public void connectionLost(Throwable cause)
与服务断开连接
public void messageArrived(String topic, MqttMessage message)
接收到消息,可在该回调方法中进行消息的处理
public void deliveryComplete(IMqttDeliveryToken token)
发布消息成功
配置
配置连接
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName("username");
options.setPassword("password".toCharArray());
options.setAutomaticReconnect(true);
可以设置用户名,密码,是否重连,设置遗愿等,更多配置可以查看 MqttConnectOptions
类
配置客户端离线或者断开连接的选项
DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
disconnectedBufferOptions.setBufferEnabled(true);
disconnectedBufferOptions.setBufferSize(5000);
disconnectedBufferOptions.setDeleteOldestMessages(true);
disconnectedBufferOptions.setPersistBuffer(true);
mClient.setBufferOpts(disconnectedBufferOptions);
配置是否缓存消息,缓存大小,缓存满的时候是否删除旧消息,是否持久化消息
连接
private void connect(MqttConnectOptions options) throws MqttException {
mClient.connect(options, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
});
}
订阅
private void subscribe() throws MqttException {
mClient.subscribe("topic", 1, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
});
}
发布消息
private void publish() throws MqttException {
mClient.publish("topic", "payload".getBytes(), 1, false, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
});
}
取消订阅
private void unsubscribe() throws MqttException {
mClient.unsubscribe("topic", null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
});
}
断开连接
private void disconnect() throws MqttException {
mClient.disconnect(null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
});
}
配置 SSL 连接
- 以 emqtt 为例子,复制
emqttd/etc/certs/cacert.pem
到自定义目录之下 - Android 仅支持 BKS 的证书,需要使用 keytool 转换为 BKS 格式。下载 bcprov-jdk16-146.jar(下载地址),放到
jdk/Contents/Home/jre/lib/ext
目录下。 - 进入自定义目录,执行以下命令:
keytool -importcert -keystore key.bks -file cacert.pem -storetype BKS -provider org.bouncycastle.jce.provider.BouncyCastleProvider
按照提示输入密码,然后就在该目录下生成了 key.bks 证书
- 将证书复制到 Android 项目的
res/raw
目录下面,通过以下代码进行使用:
SSLSocketFactory sslSocketFactory=
mClient.getSSLSocketFactory(getResources().openRawResource(R.raw.key), "your password");
MqttConnectOptions options = new MqttConnectOptions();
options.setSocketFactory(sslSocketFactory);
// more options
-
serverURI
记得修改为ssl://
和对应的8883
端口
结语
- 以上为简单代码实例,更多用法,可参考 MqttAndroidClient.java
- 因为 Android 平台的特殊性,需要通过 service 保持连接,该库是 Paho Java Client 在 Android 平台的接口实现,用法很相似。
- 可以在消息回调中考虑使用 EventBus 或者 RxBus 进行消息分发。
更多关于MQTT的文章,请关注EMQ君的博客。