android 集结号移动互联网

基于 JMS 标准的 Android MQTT 客户端

2017-07-11  本文已影响636人  KtYY

关键词

JMS、ActiveMQ(ActivityMQ)、Apollo、MQTT、Android

摘要

由于项目开发需要,涉及到 Android 客户端接收来自 JMS 中间件的消息推送,本文以学习过程为线索,进行记录。

目录

一、简述 JMS (引出 ActiveMQ、Apollo)
二、MQTT 协议
三、MQTT 服务器搭建
四、 MQTT Android 客户端具体实现
—— 1、添加依赖
—— 2、添加权限
—— 3、基本字段说明
—— 4、注册Service
—— 5、Android 端具体实现

正文

一、简述 JMS (引出 ActiveQM、Apollo)

企业消息系统(Java Message Service)
JMS通讯示意图
JMS两种消息模型:

发送消息的客户端:生产者, 接收消息的客户端:消费者, MOM :消息传递系统

点到点(P2P)消息传递模型
发布/订阅(Pub/Sub)消息传递模型

关于 JMS 更详细的介绍,推荐阅读下面的贴,更加深入浅出,通俗易懂
深入浅出JMS(一)——JMS简介
深入浅出JMS(二)——JMS的组成

ActiveMQ (或ActivityMQ)

由 Apache 贡献的 ActiveMQ 正是 MOM 中优秀的一员。它是一个非常流行、强大、开源的消息和集成模式服务器,速度快、支持多种跨语言客户端和协议,易于使用企业集成模式,拥有许多先进的特性,完全支持 JMS 1.1和 J2EE 1.4 规范。

Apollo

是以 ActiveMQ5.x 为基础,采用全新的线程和消息调度架构重新实现的 MOM,针对多核处理器进行了优化处理,它的速度更快、更可靠、更易于维护。Apollo 与 ActiveMQ 一样支持多协议:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介绍 MQTT 协议的 Android 客户端使用。

二、MQTT 协议

1、 Android 端实现消息推送的几种方式
2、 MQTT 协议简介

MQTT官网:http://mqtt.org/
MQTT介绍:http://www.ibm.com
MQTT Android github:https://github.com/eclipse/paho.mqtt.android
MQTT API:http://www.eclipse.org/paho/files/javadoc/index.html
MQTT Android API: http://www.eclipse.org/paho/files/android-javadoc/index.html

3、 Apollo + MQTT 协议的消息推送特征

三、MQTT 服务器搭建

1、下载Apollo服务器,解压(免安装的)。
2、进入解压后文件夹的 bin 目录下(例: E:\MQTT\apache-apollo-1.7.1\bin),按住 Shift 键,点击鼠标右键选择 "在此处打开命令窗口";
3、在命令窗口,输入 apollo create 服务器实例名称(例:apollo create mybroker),之后会在 bin 目录下创建该名称的文件夹。该文件夹中, etc\apollo.xml 文件是配置服务器信息的文件。etc\users.properties 文件包含连接 MQTT 服务器时用到的用户名和密码,默认为 admin=password,即账号为admin,密码为 password,可自行更改。
4、在命令窗口,输入 cd xxx/bin, 进入该实例的bin目录下,执行 apollo-broker.cmd run 命令,开启服务器,看到如下界面代表搭建完成。

cmd 窗口
之后在浏览器输入 http://127.0.0.1:61680/,查看是否安装成功。

四、 MQTT Android 客户端具体实现

1、在 module 的 build.gradle 添加依赖

repositories {
    maven {
        url "https://repo.eclipse.org/content/repositories/paho-releases/"
    }
}

dependencies {
    compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.0'
}

2、添加权限

    <uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
    <uses-permission android:name="android.permission.WAKE_LOCK" />

3、基本字段说明

topic:中文意思是“话题”。在 MQTT 中订阅了( subscribe )同一话题(topic)的客户端会同时收到消息推送。直接实现了“群聊”功能。
clientId:客户身份唯一标识。
qos:服务质量。
retained:要保留最后的断开连接信息。
MqttAndroidClient#subscribe():订阅某个话题。
MqttAndroidClient#publish(): 向某个话题发送消息,之后服务器会推送给所有订阅了此话题的客户。
userName:连接到MQTT服务器的用户名。
passWord :连接到MQTT服务器的密码。

4、注册Service

 <!-- Mqtt Service -->
        <service android:name="org.eclipse.paho.android.service.MqttService" />
        <service android:name="com.mqtt.demo.MQTTService"/>

5、Android 端具体实现

public class MQTTService extends Service {
    public static final String TAG = MQTTService.class.getSimpleName();
    private static MQTTService instance;
    private static MqttAndroidClient client;

    private MqttConnectOptions conOpt;
    private String host = "tcp://192.168.7.31:61613";
    private String userName = "admin";
    private String passWord = "password";
    private static String myTopic = "topic";
    private String clientId = "test2";

    @Override
    public void onCreate() {
        super.onCreate();
        instance = this;
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        init();
        return super.onStartCommand(intent, flags, startId);
    }

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }

    @Override
    public void onDestroy() {
        instance = null;
        try {
            client.disconnect();
            client.unregisterResources();

        } catch (MqttException e) {
            e.printStackTrace();
        }
        super.onDestroy();
    }

    public static void publish(String msg){
        String topic = myTopic;
        Integer qos = 0;
        Boolean retained = false;
        try {
            client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


    // MQTT监听连接情况
    private IMqttActionListener iMqttActionListener = new ActionListener();

    // MQTT监听并且接受消息
    private MqttCallback mqttCallback = new CallBack();

    private void init() {
        clientId = MacAddressUtil.getLocalMacAddress(this);

        // 服务器地址(协议+地址+端口号)
        String uri = host;
        client = new MqttAndroidClient(this, uri, clientId);
        // 设置MQTT监听并且接受消息
        client.setCallback(mqttCallback);

        conOpt = new MqttConnectOptions();
        // 清除缓存
        conOpt.setCleanSession(true);
        // 设置超时时间,单位:秒
        conOpt.setConnectionTimeout(10);
        // 心跳包发送间隔,单位:秒
        conOpt.setKeepAliveInterval(20);
        // 用户名
        conOpt.setUserName(userName);
        // 密码
        conOpt.setPassword(passWord.toCharArray());

        // last will message
        boolean doConnect = true;
        String message = "{\"terminal_uid\":\"" + clientId + "\"}";
        String topic = myTopic;
        Integer qos = 0;
        Boolean retained = false;
        if (TextUtils.isEmpty(message) || TextUtils.isEmpty(topic)) {
            // 最后发送的消息
            try {
                conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
            } catch (Exception e) {
                Log.i(TAG, "Exception Occured", e);
                doConnect = false;
                iMqttActionListener.onFailure(null, e);
            }
        }

        if (doConnect) {
            doClientConnection();
        }

    }

    /** 连接MQTT服务器 */
    private void doClientConnection() {
        if (!client.isConnected() && isConnectIsNomarl()) {
            try {
                client.connect(conOpt, null, iMqttActionListener);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    /** 判断网络是否连接 */
    private boolean isConnectIsNomarl() {
        ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo info = connectivityManager.getActiveNetworkInfo();
        if (info != null && info.isAvailable()) {
            String name = info.getTypeName();
            Log.i(TAG, "MQTT当前网络名称:" + name);
            return true;
        } else {
            Log.i(TAG, "MQTT 没有可用网络");
            return false;
        }
    }

    public static boolean isConnect(){
        if (instance != null && client != null){
            return instance.client.isConnected();
        }
        return false;
    }

    public static void connect(){
        if (instance != null && client != null){
            instance.doClientConnection();
        }
    }

    public static void disconnect(){
        if (instance != null){
            try {
                instance.client.disconnect();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    public static class ActionListener implements IMqttActionListener {

        @Override
        public void onSuccess(IMqttToken arg0) {
            Log.i(TAG, "连接成功 ");
            try {
                // 订阅myTopic话题
                client.subscribe(myTopic,1);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onFailure(IMqttToken arg0, Throwable arg1) {
            arg1.printStackTrace();
            // 连接失败,重连
            Log.i("mtqq", "onFailure");
        }
    }


    public static class CallBack implements MqttCallback{

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            String str1 = new String(message.getPayload());
            MQTTMessage msg = new MQTTMessage();
            msg.setMessage(str1);
            EventBus.getDefault().post(msg);
            String str2 = "topic:" + topic + ", qos:" + message.getQos() + ", retained:" + message.isRetained();
            Log.i(TAG, "messageArrived:" + str1);
            Log.i(TAG, str2);
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {
            Log.i("mtqq", "deliveryComplete");
        }

        @Override
        public void connectionLost(Throwable arg0) {
            // 失去连接,重连
            String message = "null";
            if (arg0 != null) {
                message = arg0.getMessage();
            }
            Log.i("mtqq", "connectionLost:"+message);
        }
    }
}

5、手机显示

Paste_Image.png

6、服务端显示

打开服务端http://127.0.0.1:61680/,看到的是这个样子

serverPic

7、例子代码下载

https://github.com/jkjk66/AndroidMQTT.git

注:本文是通过搜集网络资源,项目实践后,整合撰写,转载请备注出处。

上一篇下一篇

猜你喜欢

热点阅读