Android常用功能

Android——MQTT推送

2018-05-11  本文已影响17人  海晨忆

个人博客:haichenyi.com。感谢关注

整体介绍

  最近公司用到的推送MQTT。不想过多的介绍背景什么的,我就直接讲怎么实现这个功能。

  他这个原理长连接,这个不用多讲,用法类似于EventBus,需要先订阅,然后通过topic再发送消息。topic是什么呢?我先来讲讲整体流程:

  1. 先连接服务器,要先建立长连接

  2. 然后需要订阅topic,连接之后才能订阅topic

  3. 最后就是通过topic推送消息,接收消息

一步一步讲:

第一步,与服务器建立连接

  先丢代码,然后看注释:

private void initPush() {
        // 服务器地址(协议+地址+端口号)
        String uri = host;
        client = new MqttAndroidClient(this, uri, clientId);
        // 设置MQTT监听并且接受消息
        client.setCallback(mqttCallback);
        //Mqtt的一些设置
        conOpt = new MqttConnectOptions();
        conOpt.setAutomaticReconnect(true);
        // 清除缓存
        conOpt.setCleanSession(true);
        // 设置超时时间,单位:秒
        conOpt.setConnectionTimeout(10);
        // 心跳包发送间隔,单位:秒
        conOpt.setKeepAliveInterval(20);
        myTopic = String.format(TOPIC_SUB, mDeviceId);
        Log.e(TAG,"myTopic_________"+myTopic);
        doClientConnection();
    }

  上面的这些参数,我碰到了两个问题。

  上面的这些参数,我碰到了两个问题。

  上面的这些参数,我碰到了两个问题。

  1. 第一个问题,与服务器建立连接,你得先有一个服务器吧?我根据网上的步骤,创建了一个apache-apollo服务器,并且启动了,也启动成功了,我建立连接的时候,总是失败。然后,找啊找,找啊找。问题没有解决,但是,我找到了一个可以用的服务器,也就是这里的uri,不要设置MqttConnectOptions的用户名和密码,设置了他会拒绝
private String host = "tcp://test.mosquitto.org:1883";
  1. 第二个问题,我连接成功之后,不一会,他就会自动断开连接,或者,推送完消息之后,他就会断开连接。然后,网上搜原因,找啊找,诶,我找到了。MqttAndroidClient的构造方法:
/**
     * Constructor - create an MqttAndroidClient that can be used to communicate with an MQTT server on android
     * 
     * @param context 
     *            object used to pass context to the callback. 
     * @param serverURI
     *            specifies the protocol, host name and port to be used to
     *            connect to an MQTT server
     * @param clientId
     *            specifies the name by which this connection should be
     *            identified to the server
     */
    public MqttAndroidClient(Context context, String serverURI,
            String clientId) {
        this(context, serverURI, clientId, null, Ack.AUTO_ACK);
    }

看第三个参数,clientId,指定一个名字,用来连接服务器的身份标识。就是说,你设置的这个值,是你在服务器的唯一标识,不能跟其他用户的相同。我把这个clientId直接用uuid生成,就没问题了。

第二步,订阅topic

  回到上面,接着往下面走,

/**
     * 连接MQTT服务器
     */
    private void doClientConnection() {
        if (!client.isConnected() && isConnectIsNormal()) {
            try {
                client.connect(conOpt, null, iMqttActionListener);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

    }
    
    /**
     * 判断网络是否连接
     */
    private boolean isConnectIsNormal() {
        ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext()
                .getSystemService(Context.CONNECTIVITY_SERVICE);
        if (connectivityManager != null) {
            NetworkInfo info = connectivityManager.getActiveNetworkInfo();
            if (info != null && info.isAvailable()) {
                String name = info.getTypeName();
                Log.e(TAG, "MQTT当前网络名称:" + name);
                return true;
            } else {
                Log.e(TAG, "MQTT 没有可用网络");
                return false;
            }
        } else {
            return false;
        }
    }

  这个方法就是用来连接服务器的,首先判断是否正在连接,后面那个是判断当前有没有网络。再就是这个iMqttActionListener监听了

// MQTT是否连接成功
    private IMqttActionListener iMqttActionListener = new IMqttActionListener() {

        @Override
        public void onSuccess(IMqttToken arg0) {
            Log.e(TAG, "连接成功 ");
            try {
                // 订阅myTopic话题
                client.subscribe(myTopic, 0);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onFailure(IMqttToken arg0, Throwable arg1) {
            Log.e(TAG, "连接失败");
            arg1.printStackTrace();
            // 连接失败,重连
        }
    };

  讷,就是这里,你如果服务器有问题,他一直走onFailure方法。服务器连接成功之后,就是订阅topic。我来说说这个

client.subscribe(myTopic, 0);

  首先,这个主题,是你自己跟服务器商量好的,随便什么都可以。为什么要订阅主题呢?我提前给你瞅瞅推送消息是怎么推送的

  第二个参数,消息的类型qos,有三种:0、1、2

  1. 0代表“至多一次”,消息发布完全依赖底层 TCP/IP 协议。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送
  2. 1代表“至少一次”,确保消息到达,但消息重复可能会发生
  3. 2代表“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

  简单说明下,如果发送的是临时的消息,例如给某topic所有在线的设备发送一条消息,丢失的话也无所谓,0就可以了。如果需要客户端保证能接收消息,需要指定QoS为1

client.publish(topic, new MqttMessage(msg.getBytes()));

  讷,推送消息,是根据topic推送的,第二个参数,就是你要推送的具体消息。我个人认为,你可以理解成就类似于键值对的形式,

不同的用户可以订阅相同的主题

不同的用户可以订阅相同的主题

不同的用户可以订阅相同的主题

  这个就是跟其他长连接不同的地方,底层,其实都一样,虽然我没有看底层的代码。想也想的到,服务器肯定是根据这个主题,去找对应的用户,然后推送消息。而其他的长连接就是直接指定用户。跑题了,跑题了。

第三步,推送、接收消息

  当你连接服务器成功之后,就要推送消息了,我用的EventBus发的

private void publishData(String msg) {
        String topic = myTopic;
        try {
            Log.e(TAG,"给__"+topic+"__topic发送的消息为:"+msg);
            client.publish(topic, new MqttMessage(msg.getBytes()));
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    
    // MQTT监听并且接受消息
    private MqttCallback mqttCallback = new MqttCallback() {

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            Log.e(TAG,"接受到__"+topic+"__topic的消息为:"+new String(message.getPayload()));
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {
            Log.e(TAG,"deliveryComplete");
        }

        @Override
        public void connectionLost(Throwable arg0) {
            // 失去连接,重连
            Log.e(TAG,"失去连接");
        }
    };

  当你的clientId重复的时候,他就会一直走connectionLost方法。到这里,基本上就讲完了,要注意的是,退出的时候,记得要释放资源

@Override
    public void onDestroy() {
        try {
            if (client != null && client.isConnected()) {
                client.disconnect();
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
        EventBus.getDefault().unregister(this);
        super.onDestroy();
    }

网上很多都是直接讲整体流程,重来不讲中间碰到的问题。难受

项目链接

上一篇下一篇

猜你喜欢

热点阅读