物联网相关技术研究Android开发经验谈

# Paho MQTT Android 源码分析 — MqttA

2021-11-07  本文已影响0人  braincs

客户端接口API

客户端有两种接口:

  1. IMqttClient

    提供阻塞方法,结束操作后返回调用方,是异步Client的使用到的较”薄“的一层设计,为早期版本的MQTT客户端使用。

  2. IMqttAsyncClient

    提供异步方法,调用方可通过获取返回值token的,使用waitForCompletion() 方式转变为阻塞同步方式

主要区别:1 为同步接口,2 为异步接口。

IMqttAsyncClient

异步接口,提供非阻塞式的方法,后台处理任务,以连接为例,连接到MQTT server是一个耗时操作,非阻塞方式在后台进行连接的时候,可以通知调用方,连接busy的状态。

非阻塞方式在时间驱动型程序和图形界面程序中较为多用,不会影响UI线程的绘制。

举例:连接,同步方式

// 方式1
IMqttToken conToken;
conToken = asyncClient.client.connect(conToken);
//... do some work...
conToken.waitForCompletion();

// 方式2
IMqttToken token;
token = asyncClient.method(parms).waitForCompletion();


连接,异步方式

IMqttToken conToken;
conToken = asyncClient.connect("some context",new new MqttAsyncActionListener() {
    public void onSuccess(IMqttToken asyncActionToken) {
        log("Connected");
    }

    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
        log ("connect failed" +exception);
    }
});

异步回调中如果需要使用context上下文,可以通过异步方法传入,最终可在回调中,返回给调用方。

关键方法

方法名 描述
connect 创建连接
disconnect 断开连接
disconnectForcibly 强制断开连接(用于disconnect失败后)
isConnected 判断是否连接
getClientId 获取Client ID
getServerURI 获取server的URI
publish 消息发布
subscribe 消息订阅
unsubscribe 取消消息订阅
setCallback 设置异步回调,监听
接收的消息
连接状态
消息发送结果
getPendingDeliveryTokens 获取为发送的消息的token,需cleanSession false才有效
setManualAcks 是否手动返回ACK消息
messageArrivedComplete 消息已成功送达,触发发送ack消息给server
close 释放所有client的资源,client被close后无法重复使用

IMqttClient

同步阻塞方法

关键方法

与异步client相似,~~表示同上

方法 描述
connect 建立连接,无返回
connectWithResult 建立连接,返回token
disconnect 同上~~
disconnectForcibly ~~
subscribe ~~,支持wildcard的 topicFilter
subscribeWithResponse 有返回的订阅,wildcard
unsubscribe ~~
publish ~~
setCallback ~~
getTopic 获取用于publish的主题
isConnected ~~
getServerURI ~~
getPendingDeliveryTokens ~~
setManualAcks ~~
messageArrivedComplete ~~
close ~~

MqttAndroidClient

Android client的主要实现类,extends BroadcastReceiver,实现IMqttAsyncClient

通过 Android的service服务于 MQTT服务进行通信。提供了包含 一下方法的简单易用的MQTT 客户端:

connect
publish
subscribe
unsubscribe
disconnect

连接 connect

主要进行的操作:

  1. 在没有service时,创建mqttService
  2. 注册广播监听
  3. 有service,则直接doConnect,无service,在连接后,进行doConnect
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) {
    // 创建 MqttTokenAndroid
    IMqttToken token = new MqttTokenAndroid(this, userContext, callback);

    connectOptions = options;
    connectToken = token;

    /*
    * 实际是通过service进行真正连接的,这里创建并bind service,
    * 真正使用 service 需要等待 service 中的异步方法
    * onServiceConnected() 连接成功,
    * connection itself takes place in the onServiceConnected() method
    */
    if (mqttService == null) { // First time - must bind to the service
        Intent serviceStartIntent = new Intent();
        serviceStartIntent.setClassName(myContext, SERVICE_NAME);
        Object service = myContext.startService(serviceStartIntent);
        if (service == null) {
            IMqttActionListener listener = token.getActionCallback();
            if (listener != null) {
                listener.onFailure(token, new RuntimeException("cannot start service " + SERVICE_NAME));
            }
        }

        // 使用bind 方式启动service,需要注意声明周期管理,
        // startService 的启动最后要调用 stopService
        myContext.bindService(serviceStartIntent, serviceConnection, Context.BIND_AUTO_CREATE);

        // 注册广播监听
        if (!receiverRegistered) {
            registerReceiver(this);
        }
    } else {
        // 如已创建 service,则调用线程池进行连接,并注册广播监听
        pool.execute(new Runnable() {

            @Override
            public void run() {
                doConnect();

                //Register receiver to show shoulder tap.
                if (!receiverRegistered) {
                    registerReceiver(MqttAndroidClient.this);
                }
            }

        });
    }

    return token;
}

/**
 * ServiceConnection to process when we bind to our service
 */
private final class MyServiceConnection implements ServiceConnection {

    @Override
    public void onServiceConnected(ComponentName name, IBinder binder) {
        if (MqttServiceBinder.class.isAssignableFrom(binder.getClass())) {
            mqttService = ((MqttServiceBinder) binder).getService();
            bindedService = true;
            // now that we have the service available, we can actually
            // connect...
            doConnect();
        }
    }

    @Override
    public void onServiceDisconnected(ComponentName name) {
        mqttService = null;
    }
}

创建连接

  1. mqttService中获取Client句柄,
  2. 将token存入tokenMap
  3. 调用服务进行连接
/**
 * Actually do the mqtt connect operation
 */
private void doConnect() {
    // 从服务中获取client标识
    if (clientHandle == null) {
        clientHandle = mqttService.getClient(serverURI, clientId, myContext.getApplicationInfo().packageName, persistence);
    }
    // 配置服务是否trace
    mqttService.setTraceEnabled(traceEnabled);
    // 设置服务callback的 clientId
    mqttService.setTraceCallbackId(clientHandle);

    // 缓存token到 SparseArray
    String activityToken = storeToken(connectToken);
    try {
        // 调用服务建立连接,传入client标识和返回给客户端调用方的token缓存在tokenMap中的id
        mqttService.connect(clientHandle, connectOptions, activityToken);
    } catch (MqttException e) {
        IMqttActionListener listener = connectToken.getActionCallback();
        if (listener != null) {
            listener.onFailure(connectToken, e);
        }
    }
}

发布 publish

消息重发机制:在发送消息的时候,如果期间连接中断或client停止,消息会在满足所有以下条件且再次创建连接后,被已设定好的QoS被发送。

关于Topic

方式1:

public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained, Object userContext, IMqttActionListener callback) {

    // 根据 payload 构造MqttMessage
    MqttMessage message = new MqttMessage(payload);
    // 配置 QoS
    message.setQos(qos);
    // 配置是否在服务端进行保留
    message.setRetained(retained);
    // 创建token,注意区别于MqttTokenAndroid,继承于MqttTokenAndroid,拓展了MqttMessage字段
    MqttDeliveryTokenAndroid token = new MqttDeliveryTokenAndroid(this, userContext, callback, message);
    // 缓存在 tokenMap 中
    String activityToken = storeToken(token);
    // 调用Service 发布消息,获取service缓存的internalToken
    IMqttDeliveryToken internalToken = mqttService.publish(clientHandle, topic, payload, qos, retained, null, activityToken);
    // 设置代理,通过设置代理的方式,其实就是个set字段,将两个token关联到了一起,内部处理消息的关系
    token.setDelegate(internalToken);
    return token;
}

方式2:

public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) {
    MqttDeliveryTokenAndroid token = new MqttDeliveryTokenAndroid(this, userContext, callback, message);
    String activityToken = storeToken(token);
    IMqttDeliveryToken internalToken = mqttService.publish(clientHandle, topic, message, null, activityToken);
    token.setDelegate(internalToken);
    return token;
}

省略了方式1的 MqttMessage的构造。

订阅 subscribe

注意:

机制:

订阅单一Topic

public IMqttToken subscribe(String topic, int qos, Object userContext, IMqttActionListener callback) {
    IMqttToken token = new MqttTokenAndroid(this, userContext, callback, new String[]{topic});
    String activityToken = storeToken(token);
    // 通过Service 订阅消息
    mqttService.subscribe(clientHandle, topic, qos, null, activityToken);
    return token;
}

订阅多个Topic

优势:优化比逐一订阅

@Override
public IMqttToken subscribe(String[] topic, int[] qos, Object userContext, IMqttActionListener callback) {
    IMqttToken token = new MqttTokenAndroid(this, userContext, callback, topic);
    String activityToken = storeToken(token);
    mqttService.subscribe(clientHandle, topic, qos, null, activityToken);
    return token;
}

取消订阅 unsubscribe

取消订阅与订阅是相反的,取消订阅需要在服务端收到取消后,查询是否有match的订阅,然后移除。

取消订阅

public IMqttToken unsubscribe(String topic, Object userContext, IMqttActionListener callback) {
    IMqttToken token = new MqttTokenAndroid(this, userContext, callback);
    String activityToken = storeToken(token);
    mqttService.unsubscribe(clientHandle, topic, null, activityToken);
    return token;
}

取消多个订阅

@Override
public IMqttToken unsubscribe(String[] topic, Object userContext, IMqttActionListener callback) {
    IMqttToken token = new MqttTokenAndroid(this, userContext, callback);
    String activityToken = storeToken(token);
    mqttService.unsubscribe(clientHandle, topic, null, activityToken);
    return token;
}

断开连接 disconnect

注意:

断线时候消息发送的机制:

方式1:

public IMqttToken disconnect() {
    IMqttToken token = new MqttTokenAndroid(this, null, null);
    String activityToken = storeToken(token);
    // 调用service断开连接
    mqttService.disconnect(clientHandle, null, activityToken);
    return token;
}

方式2:带超时机制

public IMqttToken disconnect(long quiesceTimeout) {
    IMqttToken token = new MqttTokenAndroid(this, null, null);
    String activityToken = storeToken(token);
    mqttService.disconnect(clientHandle, quiesceTimeout, null, activityToken);
    return token;
}

方式3:带回调

public IMqttToken disconnect(long quiesceTimeout, Object userContext, IMqttActionListener callback) {
    IMqttToken token = new MqttTokenAndroid(this, userContext, callback);
    String activityToken = storeToken(token);
    mqttService.disconnect(clientHandle, quiesceTimeout, null, activityToken);
    return token;
}

其他API

获取挂起的消息token

在客户端stop的时候,可能还有未发送的message,这种情况下,可以通过 getPendingDeliveryTokens 方法在客户端重启后获取为发送消息(in-flight message)的token,从而追踪这些消息的状态。

替代方法:MqttCallback中的deliveryComplete 也可以获取到消息送达的状态。

注意:

public IMqttDeliveryToken[] getPendingDeliveryTokens() {
    // 从service中根据client标识获取
    return mqttService.getPendingDeliveryTokens(clientHandle);
}

辅助debug功能

设置MqttTraceHandler,和使能在service中trace的功能

// 设置traceCallback,将从Service广播的trace action 透传给调用方
public void setTraceCallback(MqttTraceHandler traceCallback) {
    this.traceCallback = traceCallback;
}

/**
 * turn tracing on and off
 */
public void setTraceEnabled(boolean traceEnabled) {
    this.traceEnabled = traceEnabled;
    if (mqttService != null) {
        mqttService.setTraceEnabled(traceEnabled);
    }
}

/**
* Process trace action - pass trace data back to the callback
*/
private void traceAction(Bundle data) {

    if (traceCallback != null) {
        String severity = data.getString(MqttServiceConstants.CALLBACK_TRACE_SEVERITY);
        String message = data.getString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE);
        String tag = data.getString(MqttServiceConstants.CALLBACK_TRACE_TAG);
        if (MqttServiceConstants.TRACE_DEBUG.equals(severity)) {
            traceCallback.traceDebug(tag, message);
        } else if (MqttServiceConstants.TRACE_ERROR.equals(severity)) {
            traceCallback.traceError(tag, message);
        } else {
            Exception e = (Exception) data.getSerializable(MqttServiceConstants.CALLBACK_EXCEPTION);
            traceCallback.traceException(tag, message, e);
        }
    }
}

MqttTraceCallback 是最简单的 MqttTraceHandler 的实现,直接输出到Android的Logcat

接收从MqttService返回

使用 BroadcastReceiver 的方式接收从 MqttService 的返回消息:

public void onReceive(Context context, Intent intent) {
    Bundle data = intent.getExtras();

    String handleFromIntent = data.getString(MqttServiceConstants.CALLBACK_CLIENT_HANDLE);

    if ((handleFromIntent == null) || (!handleFromIntent.equals(clientHandle))) {
        return;
    }

    String action = data.getString(MqttServiceConstants.CALLBACK_ACTION);

    if (MqttServiceConstants.CONNECT_ACTION.equals(action)) {
        connectAction(data);
    } else if (MqttServiceConstants.CONNECT_EXTENDED_ACTION.equals(action)) {
        connectExtendedAction(data);
    } else if (MqttServiceConstants.MESSAGE_ARRIVED_ACTION.equals(action)) {
        messageArrivedAction(data);
    } else if (MqttServiceConstants.SUBSCRIBE_ACTION.equals(action)) {
        subscribeAction(data);
    } else if (MqttServiceConstants.UNSUBSCRIBE_ACTION.equals(action)) {
        unSubscribeAction(data);
    } else if (MqttServiceConstants.SEND_ACTION.equals(action)) {
        sendAction(data);
    } else if (MqttServiceConstants.MESSAGE_DELIVERED_ACTION.equals(action)) {
        messageDeliveredAction(data);
    } else if (MqttServiceConstants.ON_CONNECTION_LOST_ACTION.equals(action)) {
        connectionLostAction(data);
    } else if (MqttServiceConstants.DISCONNECT_ACTION.equals(action)) {
        disconnected(data);
    } else if (MqttServiceConstants.TRACE_ACTION.equals(action)) {
        traceAction(data);
    } else {
        mqttService.traceError(MqttService.TAG, "Callback action doesn't exist.");
    }

}

具体以sub的消息接收为参考(messageArrivedAction):

private void messageArrivedAction(Bundle data) {
    if (callback != null) {
        // 获取消息的 messageId destinationName
        String messageId = data.getString(MqttServiceConstants.CALLBACK_MESSAGE_ID);
        String destinationName = data.getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME);

        // 获取Parcelable的消息
        ParcelableMqttMessage message = data.getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL);
        try {
            if (messageAck == Ack.AUTO_ACK) {
                // 调用callback 方法中的 messageArrived
                callback.messageArrived(destinationName, message);
                // 自动acknowledge的话调用 service的接口
                mqttService.acknowledgeMessageArrival(clientHandle, messageId);
            } else {
                // 将messageId设置到message中
                message.messageId = messageId;
                // 调用callback 方法
                callback.messageArrived(destinationName, message);
            }

            // let the service discard the saved message details
        } catch (Exception e) {
            mqttService.traceError(MqttService.TAG, "messageArrivedAction failed: " + e);
        }
    }
}

移除token

从SparseArray的tokenMap中移除token:

private synchronized IMqttToken removeMqttToken(Bundle data) {

    String activityToken = data.getString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN);
    if (activityToken != null) {
        int tokenNumber = Integer.parseInt(activityToken);
        IMqttToken token = tokenMap.get(tokenNumber);
        tokenMap.delete(tokenNumber);
        return token;
    }
    return null;
}

注意:

上一篇 下一篇

猜你喜欢

热点阅读