基于 JMS 标准的 Android MQTT 客户端
关键词
JMS、ActiveMQ(ActivityMQ)、Apollo、MQTT、Android
摘要
由于项目开发需要,涉及到 Android 客户端接收来自 JMS 中间件的消息推送,本文以学习过程为线索,进行记录。
目录
一、简述 JMS (引出 ActiveMQ、Apollo)
二、MQTT 协议
三、MQTT 服务器搭建
四、 MQTT Android 客户端具体实现
—— 1、添加依赖
—— 2、添加权限
—— 3、基本字段说明
—— 4、注册Service
—— 5、Android 端具体实现
正文
一、简述 JMS (引出 ActiveQM、Apollo)
企业消息系统(Java Message Service)
-
又称之为面向消息的中间件(MOM),提供了应用程序之间,异步数据的存储和转发,即应用程序彼此不直接通信,而是与作为中介的 MOM 通信。应用程序开发人员无需了解消息的发送和协议的细节。
-
过程如下图,应用程序 A 只需要将 Message 发送到服务器上,然后应用程序 B 即可从服务器中接收到 A 发来的消息,由此可知 JMS 具有提供消息灵活性,松散耦合等优点。
- JMS 由 SUN 提出,是一系列的接口及相关语义的集合,通过这些接口和和其中的方法,JMS 客户端可以访问消息系统,完成消息的创建、发送、接收及读取。
- JMS 通过 MOM 为 Java 程序提供了一个发送和接收消息的标准。根据 JMS 编写的客户端程序可以访问任何实现 JMS 标准的 MOM。
JMS两种消息模型:
发送消息的客户端:生产者, 接收消息的客户端:消费者, MOM :消息传递系统
点到点(P2P)消息传递模型
- 生产者发送消息到MOM的一个特定队列(Queue)中,而消费者从一个消息队列中得到消息。
- 每条消息只有一个消费者,如果一条消息被消息者接收,那么其他的消费者就不能得到这条消息了。
- 收到消息后消费者必须确认消息已被接收,否则消息传递系统会认为该消息没有被接收,那么这条消息仍然可以被其他人接收(程序可以自动进行确认,不需要人工干预)。
发布/订阅(Pub/Sub)消息传递模型
- 发布/订阅传递消息类型与主题(Topic)有关。生产者发布消息,而消费者订阅感兴趣的消息,生产者将消息和一个特定的主题(Topic)连在一起,消息传递系统根据消费者注册的兴趣,将消息传递给消费者。这种类型非常类似出版报纸、杂志的形式。
- 每个消息都可以有多个(0,1,……)订阅者。即每条消息可以有多个消费者,如果报纸和杂志一样。
- 订阅者只能消费他们订阅之后出版的消息。这就要求订阅者必须先订阅,再等待生产者的运行,发布。
- 订阅者必须保持为活动状态才能获得这些消息,即订阅者必须保持活动状态等待发布者发布的消息。
关于 JMS 更详细的介绍,推荐阅读下面的贴,更加深入浅出,通俗易懂
深入浅出JMS(一)——JMS简介
深入浅出JMS(二)——JMS的组成
ActiveMQ (或ActivityMQ)
由 Apache 贡献的 ActiveMQ 正是 MOM 中优秀的一员。它是一个非常流行、强大、开源的消息和集成模式服务器,速度快、支持多种跨语言客户端和协议,易于使用企业集成模式,拥有许多先进的特性,完全支持 JMS 1.1和 J2EE 1.4 规范。
Apollo
是以 ActiveMQ5.x 为基础,采用全新的线程和消息调度架构重新实现的 MOM,针对多核处理器进行了优化处理,它的速度更快、更可靠、更易于维护。Apollo 与 ActiveMQ 一样支持多协议:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介绍 MQTT 协议的 Android 客户端使用。
二、MQTT 协议
1、 Android 端实现消息推送的几种方式
- 轮询:客户端定时向服务器请求数据,属于伪推送。缺点:费电,费流量。
- XMPP:是基于可扩展标记语言(XML)的协议。缺点:XML 格式的,冗余很大,花费流量。
- MQTT:由 IBM 开发的传输协议,它被设计用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的设备提供可靠的网络服务。相比于 XMPP 等传统协议,MQTT 是专门针对移动互联网开发的轻量级传输协议,这种传输协议连接稳定、心跳数据包小,所以具备耗电量低、耗流量低的优势。推送服务的最佳协议!
2、 MQTT 协议简介
MQTT官网:http://mqtt.org/
MQTT介绍:http://www.ibm.com
MQTT Android github:https://github.com/eclipse/paho.mqtt.android
MQTT API:http://www.eclipse.org/paho/files/javadoc/index.html
MQTT Android API: http://www.eclipse.org/paho/files/android-javadoc/index.html
3、 Apollo + MQTT 协议的消息推送特征
- Apollo 使用 MQTT 协议,加上 Android 上的 paho 包,即可简单实现消息通知功能,但是 MQTT 协议仅支持主题消息广播(Topic),即发布/订阅消息传递模式,而且不能用Selector,不能直接实现点对点的消息投递。
- 利用上述的特征,可将 Android 客户端划分为众多的部落,创建不同的主题, 针对特定订阅者群体进行消息传递。该特征特别适用于智能家居等物联网系统。
三、MQTT 服务器搭建
1、下载Apollo服务器,解压(免安装的)。
2、进入解压后文件夹的 bin 目录下(例: E:\MQTT\apache-apollo-1.7.1\bin),按住 Shift 键,点击鼠标右键选择 "在此处打开命令窗口";
3、在命令窗口,输入 apollo create 服务器实例名称(例:apollo create mybroker),之后会在 bin 目录下创建该名称的文件夹。该文件夹中, etc\apollo.xml 文件是配置服务器信息的文件。etc\users.properties 文件包含连接 MQTT 服务器时用到的用户名和密码,默认为 admin=password,即账号为admin,密码为 password,可自行更改。
4、在命令窗口,输入 cd xxx/bin, 进入该实例的bin目录下,执行 apollo-broker.cmd run 命令,开启服务器,看到如下界面代表搭建完成。
之后在浏览器输入 http://127.0.0.1:61680/,查看是否安装成功。
四、 MQTT Android 客户端具体实现
1、在 module 的 build.gradle 添加依赖
repositories {
maven {
url "https://repo.eclipse.org/content/repositories/paho-releases/"
}
}
dependencies {
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.0'
}
2、添加权限
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
3、基本字段说明
topic:中文意思是“话题”。在 MQTT 中订阅了( subscribe )同一话题(topic)的客户端会同时收到消息推送。直接实现了“群聊”功能。
clientId:客户身份唯一标识。
qos:服务质量。
retained:要保留最后的断开连接信息。
MqttAndroidClient#subscribe():订阅某个话题。
MqttAndroidClient#publish(): 向某个话题发送消息,之后服务器会推送给所有订阅了此话题的客户。
userName:连接到MQTT服务器的用户名。
passWord :连接到MQTT服务器的密码。
4、注册Service
<!-- Mqtt Service -->
<service android:name="org.eclipse.paho.android.service.MqttService" />
<service android:name="com.mqtt.demo.MQTTService"/>
5、Android 端具体实现
public class MQTTService extends Service {
public static final String TAG = MQTTService.class.getSimpleName();
private static MQTTService instance;
private static MqttAndroidClient client;
private MqttConnectOptions conOpt;
private String host = "tcp://192.168.7.31:61613";
private String userName = "admin";
private String passWord = "password";
private static String myTopic = "topic";
private String clientId = "test2";
@Override
public void onCreate() {
super.onCreate();
instance = this;
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
init();
return super.onStartCommand(intent, flags, startId);
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
return null;
}
@Override
public void onDestroy() {
instance = null;
try {
client.disconnect();
client.unregisterResources();
} catch (MqttException e) {
e.printStackTrace();
}
super.onDestroy();
}
public static void publish(String msg){
String topic = myTopic;
Integer qos = 0;
Boolean retained = false;
try {
client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue());
} catch (MqttException e) {
e.printStackTrace();
}
}
// MQTT监听连接情况
private IMqttActionListener iMqttActionListener = new ActionListener();
// MQTT监听并且接受消息
private MqttCallback mqttCallback = new CallBack();
private void init() {
clientId = MacAddressUtil.getLocalMacAddress(this);
// 服务器地址(协议+地址+端口号)
String uri = host;
client = new MqttAndroidClient(this, uri, clientId);
// 设置MQTT监听并且接受消息
client.setCallback(mqttCallback);
conOpt = new MqttConnectOptions();
// 清除缓存
conOpt.setCleanSession(true);
// 设置超时时间,单位:秒
conOpt.setConnectionTimeout(10);
// 心跳包发送间隔,单位:秒
conOpt.setKeepAliveInterval(20);
// 用户名
conOpt.setUserName(userName);
// 密码
conOpt.setPassword(passWord.toCharArray());
// last will message
boolean doConnect = true;
String message = "{\"terminal_uid\":\"" + clientId + "\"}";
String topic = myTopic;
Integer qos = 0;
Boolean retained = false;
if (TextUtils.isEmpty(message) || TextUtils.isEmpty(topic)) {
// 最后发送的消息
try {
conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
} catch (Exception e) {
Log.i(TAG, "Exception Occured", e);
doConnect = false;
iMqttActionListener.onFailure(null, e);
}
}
if (doConnect) {
doClientConnection();
}
}
/** 连接MQTT服务器 */
private void doClientConnection() {
if (!client.isConnected() && isConnectIsNomarl()) {
try {
client.connect(conOpt, null, iMqttActionListener);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/** 判断网络是否连接 */
private boolean isConnectIsNomarl() {
ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo info = connectivityManager.getActiveNetworkInfo();
if (info != null && info.isAvailable()) {
String name = info.getTypeName();
Log.i(TAG, "MQTT当前网络名称:" + name);
return true;
} else {
Log.i(TAG, "MQTT 没有可用网络");
return false;
}
}
public static boolean isConnect(){
if (instance != null && client != null){
return instance.client.isConnected();
}
return false;
}
public static void connect(){
if (instance != null && client != null){
instance.doClientConnection();
}
}
public static void disconnect(){
if (instance != null){
try {
instance.client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
public static class ActionListener implements IMqttActionListener {
@Override
public void onSuccess(IMqttToken arg0) {
Log.i(TAG, "连接成功 ");
try {
// 订阅myTopic话题
client.subscribe(myTopic,1);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
arg1.printStackTrace();
// 连接失败,重连
Log.i("mtqq", "onFailure");
}
}
public static class CallBack implements MqttCallback{
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String str1 = new String(message.getPayload());
MQTTMessage msg = new MQTTMessage();
msg.setMessage(str1);
EventBus.getDefault().post(msg);
String str2 = "topic:" + topic + ", qos:" + message.getQos() + ", retained:" + message.isRetained();
Log.i(TAG, "messageArrived:" + str1);
Log.i(TAG, str2);
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
Log.i("mtqq", "deliveryComplete");
}
@Override
public void connectionLost(Throwable arg0) {
// 失去连接,重连
String message = "null";
if (arg0 != null) {
message = arg0.getMessage();
}
Log.i("mtqq", "connectionLost:"+message);
}
}
}
5、手机显示
Paste_Image.png6、服务端显示
打开服务端http://127.0.0.1:61680/,看到的是这个样子
7、例子代码下载
https://github.com/jkjk66/AndroidMQTT.git