物联网相关技术研究Android开发Android开发经验谈

Paho MQTT Android 源码分析 — MqttSer

2021-11-07  本文已影响0人  braincs

Android的service方式的MQTT的客户端的实现

MqttService的API和IMqttAsyncClient 高度一致,只是在Android平台上进行了一定适配:

  1. invocationContext 字符串类型,用于标记application 上下文

  2. activityToken 字符串类型,用于标记Activity回调方法或上下文相关的数据

大部分代码为了实现:多客户端连接,client handle用于被high level APIs访问

使用

设计:

onCreate 创建

创建的时候,创建一下两个实例:

@Override
public void onCreate() {
    super.onCreate();

    // 创建Binder以便调用方给service发指令
    mqttServiceBinder = new MqttServiceBinder(this);

    // 持久化接受的消息,直到送达到Service的调用方
    messageStore = new DatabaseMessageStore(this, this);
}

MqttServiceBinder:在绑定服务的时候,Service传递给Activity的结构体,service和string类型的activitytoken的关联关系。

class MqttServiceBinder extends Binder {

 private MqttService mqttService;
 private String activityToken;
 //...

绑定:

@Override
public IBinder onBind(Intent intent) {
    // What we pass back to the Activity on binding -
    // a reference to ourself, and the activityToken
    // we were given when started
    String activityToken = intent.getStringExtra(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN);
    mqttServiceBinder.setActivityToken(activityToken);
    return mqttServiceBinder;
}

启动:

@Override
public int onStartCommand(final Intent intent, int flags, final int startId) {
    // run till explicitly stopped, restart when
    // process restarted
    // 注册广播监听
    registerBroadcastReceivers();

    return START_STICKY;
}

注册广播监听

@SuppressWarnings("deprecation")
private void registerBroadcastReceivers() {
    //注册网络连接状态监听
    if (networkConnectionMonitor == null) {
        networkConnectionMonitor = new NetworkConnectionIntentReceiver();
        registerReceiver(networkConnectionMonitor, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
    }

    // 可忽略,低版本后台数据监控
    if (Build.VERSION.SDK_INT < 14 /**Build.VERSION_CODES.ICE_CREAM_SANDWICH**/) {
        // Support the old system for background data preferences
        ConnectivityManager cm = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
        backgroundDataEnabled = cm.getBackgroundDataSetting();
        if (backgroundDataPreferenceMonitor == null) {
            backgroundDataPreferenceMonitor = new BackgroundDataPreferenceReceiver();
            registerReceiver(backgroundDataPreferenceMonitor, new IntentFilter(ConnectivityManager.ACTION_BACKGROUND_DATA_SETTING_CHANGED));
        }
    }
}

网络连接的广播

private class NetworkConnectionIntentReceiver extends BroadcastReceiver {

    @Override
    @SuppressLint("Wakelock")
    public void onReceive(Context context, Intent intent) {
        traceDebug(TAG, "Internal network status receive.");
        // we protect against the phone switching off
        // by requesting a wake lock - we request the minimum possible wake
        // lock - just enough to keep the CPU running until we've finished
        PowerManager pm = (PowerManager) getSystemService(POWER_SERVICE);
        WakeLock wl = pm.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "MQTT");
        wl.acquire();
        traceDebug(TAG, "Reconnect for Network recovery.");
        if (isOnline()) {
            traceDebug(TAG, "Online,reconnect.");
            // we have an internet connection - have another try at
            // connecting
            reconnect();
        } else {
            notifyClientsOffline();
        }

        wl.release();
    }
}

连接状态

public boolean isOnline() {
    // 获取连接Service
    ConnectivityManager cm = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
    // 获取网络状态
    NetworkInfo networkInfo = cm.getActiveNetworkInfo();
    //noinspection RedundantIfStatement
    // 如果连接则返回true
    if (networkInfo != null && networkInfo.isAvailable() && networkInfo.isConnected() && backgroundDataEnabled) {
        return true;
    }

    return false;
}

通知离线

/**
* Notify clients we're offline
* 调用 MqttConnection的 offline方法
*/
private void notifyClientsOffline() {
    for (MqttConnection connection : connections.values()) {
        connection.offline();
    }
}

参考MqttConnection

reconnect重连

/**
 * Request all clients to reconnect if appropriate
 */
void reconnect() {
    traceDebug(TAG, "Reconnect to server, client size=" + connections.size());
    // 遍历所有的 Connections 进行重连
    for (MqttConnection client : connections.values()) {
        traceDebug("Reconnect Client:", client.getClientId() + '/' + client.getServerURI());
        if (this.isOnline()) {
            client.reconnect();
        }
    }
}

调用 MqttConnect中的reconnect方法进行重连 参考 reconnect重连

onDestroy 销毁

销毁:

// 并发Hash表,对应存储了所有的connections
// ConcurrentHashMap <clientHandle, client>
private Map<String/* clientHandle */, MqttConnection/* client */> connections = new ConcurrentHashMap<>();

@Override
public void onDestroy() {
    // 断开连接
    for (MqttConnection client : connections.values()) {
        client.disconnect(null, null);
    }

    // 释放Binder
    if (mqttServiceBinder != null) {
        mqttServiceBinder = null;
    }

    // 注销广播监听
    unregisterBroadcastReceivers();

    // 关闭消息存储(数据库)
    if (this.messageStore != null) {
        this.messageStore.close();
    }

    super.onDestroy();
}
//...

// 注销广播
private void unregisterBroadcastReceivers() {
    // 注销 网络连接 广播
    if (networkConnectionMonitor != null) {
        unregisterReceiver(networkConnectionMonitor);
        networkConnectionMonitor = null;
    }

    if (Build.VERSION.SDK_INT < 14 /**Build.VERSION_CODES.ICE_CREAM_SANDWICH**/) {
        if (backgroundDataPreferenceMonitor != null) {
            unregisterReceiver(backgroundDataPreferenceMonitor);
        }
    }
}

关于断联缓存消息

/**
 * Sets the DisconnectedBufferOptions for this client
 * 为 client 设置断联缓存
 *
 * @param clientHandle identifier for the client
 * @param bufferOpts   the DisconnectedBufferOptions for this client 断联缓存配置
 */
public void setBufferOpts(String clientHandle, DisconnectedBufferOptions bufferOpts) {
    MqttConnection client = getConnection(clientHandle);
    client.setBufferOpts(bufferOpts);
}

/**
* 获取缓存消息数量
* @param clientHandle client的标识
* @return 数量
*/
public int getBufferedMessageCount(String clientHandle) {
    MqttConnection client = getConnection(clientHandle);
    return client.getBufferedMessageCount();
}

// 从缓存队列中获取缓存的 MqttMessage
public MqttMessage getBufferedMessage(String clientHandle, int bufferIndex) {
    MqttConnection client = getConnection(clientHandle);
    return client.getBufferedMessage(bufferIndex);
}

// 删除缓存消息
public void deleteBufferedMessage(String clientHandle, int bufferIndex) {
    MqttConnection client = getConnection(clientHandle);
    client.deleteBufferedMessage(bufferIndex);
}

DisconnectedBufferOptions

在mqttv3包中:

// 断联配置项:在断开连接后消息的缓存
public class DisconnectedBufferOptions {
    
    /**
     * The default size of the disconnected buffer
     * 默认值 5000 大小
     */
    public static final int DISCONNECTED_BUFFER_SIZE_DEFAULT = 5000;
    // 默认不开启断联buffer
    public static final boolean DISCONNECTED_BUFFER_ENABLED_DEFAULT = false;
    // 默认不开启持久化
    public static final boolean PERSIST_DISCONNECTED_BUFFER_DEFAULT = false;
    // 默认不删除历史消息
    public static final boolean DELETE_OLDEST_MESSAGES_DEFAULT = false;

    private int bufferSize = DISCONNECTED_BUFFER_SIZE_DEFAULT;
    private boolean bufferEnabled = DISCONNECTED_BUFFER_ENABLED_DEFAULT;
    private boolean persistBuffer = PERSIST_DISCONNECTED_BUFFER_DEFAULT;
    private boolean deleteOldestMessages = DELETE_OLDEST_MESSAGES_DEFAULT;
    

功能方法

回传数据(activity)

callbackToActivity

具体实现:

void callbackToActivity(String clientHandle, Status status, Bundle dataBundle) {
    // Don't call traceDebug, as it will try to callbackToActivity leading
    // to recursion.
    // 创建 intent 进行广播,intent名字为:MqttService.TAG + ".callbackToActivity" + "." + VERSION;
    Intent callbackIntent = new Intent(MqttServiceConstants.CALLBACK_TO_ACTIVITY);
    // 添加 client标识
    if (clientHandle != null) {
        callbackIntent.putExtra(MqttServiceConstants.CALLBACK_CLIENT_HANDLE, clientHandle);
    }
    // 添加 消息状态
    callbackIntent.putExtra(MqttServiceConstants.CALLBACK_STATUS, status);
    // 添加 附带bundle
    if (dataBundle != null) {
        callbackIntent.putExtras(dataBundle);
    }
    // 通过LocalBroadcastManager进行广播
    LocalBroadcastManager.getInstance(this).sendBroadcast(callbackIntent);
}

client创建/获取

MqttService中会对所有连接进行维护,对MqttConnection 创建了ConcurrentHashMap<String, MqttConnection>

private Map<String/* clientHandle */, MqttConnection/* client */> connections = new ConcurrentHashMap<>();
/**
* Get an MqttConnection object to represent a connection to a server
*
* @param serverURI   specifies the protocol, host name and port to be used to connect to an MQTT server 服务端地址
* @param clientId    specifies the name by which this connection should be identified to the server 客户端名字
* @param contextId   specifies the app context info to make a difference between apps 应用上下文名字
* @param persistence specifies the persistence layer to be used with this client 持久化
* @return a string to be used by the Activity as a "handle" for this 返回:MqttConnection的唯一表示
* MqttConnection
*/
public String getClient(String serverURI, String clientId, String contextId, MqttClientPersistence persistence) {
    // 连接的唯一标识 serverURI + ":" + clientId + ":" + contextId
    String clientHandle = serverURI + ":" + clientId + ":" + contextId;
    // 已存在则不进行存入,重复的时候无法覆盖
    if (!connections.containsKey(clientHandle)) {
        MqttConnection client = new MqttConnection(this, serverURI, clientId, persistence, clientHandle);
        connections.put(clientHandle, client);
    }
    return clientHandle;
}

注意

connect连接

创建单一client连接 (区分于reconnect,reconnect是所有的都重连)

  1. 查表获取MqttConnection
  2. 调用 MqttConnection 的 connect方法创建连接 [详细参考](#connect 连接)
/**
 * Connect to the MQTT server specified by a particular client
 * 连接到server
 *
 * @param clientHandle   identifies the MqttConnection to use client的唯一标识
 * @param connectOptions the MQTT connection options to be used 连接选项
 * @param activityToken  arbitrary identifier to be passed back to the Activity activity标识
 * @throws MqttSecurityException thrown if there is a security exception
 * @throws MqttException         thrown for all other MqttExceptions
 */
public void connect(String clientHandle, MqttConnectOptions connectOptions, String activityToken)
        throws MqttException {
    // 根据client唯一标识获取连接 查Hashmap表方式
    MqttConnection client = getConnection(clientHandle);
    // 创建连接 调用MqttConnection的连接
    client.connect(connectOptions, null, activityToken);
}

disconnect断开连接

单一client的断开连接

  1. 查表获取 MqttConnection
  2. 调用MqttConnection.disconnect
  3. 哈希表移除MqttConnection
  4. 停止当前service (这里不符合一对多的设计,hashmap无用了
/**
 * Disconnect from the server
 * 断开指定client的连接
 *
 * @param clientHandle      identifies the MqttConnection to use client唯一标识
 * @param invocationContext arbitrary data to be passed back to the application 调用方的context
 * @param activityToken     arbitrary identifier to be passed back to the Activity 调用方activity标识
 */
public void disconnect(String clientHandle, String invocationContext, String activityToken) {
    // 获取连接
    MqttConnection client = getConnection(clientHandle);
    // 调用 MqttConnection.disconnect 
    client.disconnect(invocationContext, activityToken);
    // 哈希表中移除
    connections.remove(clientHandle);


    // the activity has finished using us, so we can stop the service
    // the activities are bound with BIND_AUTO_CREATE, so the service will
    // remain around until the last activity disconnects
    // 停止service,这里感觉有问题,service和activity是一对一的时候,OK
    // 但如果是 service中有多个client的时候此时不能stop
    stopSelf();
}
// 带timeout方法的 断开连接
public void disconnect(String clientHandle, long quiesceTimeout, String invocationContext, String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    client.disconnect(quiesceTimeout, invocationContext, activityToken);
    connections.remove(clientHandle);

    // the activity has finished using us, so we can stop the service
    // the activities are bound with BIND_AUTO_CREATE, so the service will
    // remain around until the last activity disconnects
    stopSelf();
}

isConnected连接状态

单一client的连接状态判断

public boolean isConnected(String clientHandle) {
    MqttConnection client = getConnection(clientHandle);
    return client.isConnected();
}

publish发布

提供两种类型的publish消息的方式

  1. publish 中指定 payload qos
  2. publish MqttMessage (封装了payload qos retained)
public IMqttDeliveryToken publish(String clientHandle, String topic, byte[] payload, int qos, boolean retained, String invocationContext,
        String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    return client.publish(topic, payload, qos, retained, invocationContext, activityToken);
}

public IMqttDeliveryToken publish(String clientHandle, String topic, MqttMessage message, String invocationContext, String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    return client.publish(topic, message, invocationContext, activityToken);
}

subscribe订阅

提供三种subscribe消息方式

  1. 单一topic和qos
  2. 批量topic 和qos
  3. 批量topicFilter 和qos,处理返回消息的回调
// 单一topic 和qos
public void subscribe(String clientHandle, String topic, int qos, String invocationContext, String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    client.subscribe(topic, qos, invocationContext, activityToken);
}

// 批量topic 和qos
public void subscribe(String clientHandle, String[] topic, int[] qos, String invocationContext, String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    client.subscribe(topic, qos, invocationContext, activityToken);
}

// 批量topicFilter 和qos,处理返回消息的回调
public void subscribe(String clientHandle, String[] topicFilters, int[] qos, String invocationContext, String activityToken,
        IMqttMessageListener[] messageListeners) {
    MqttConnection client = getConnection(clientHandle);
    client.subscribe(topicFilters, qos, invocationContext, activityToken, messageListeners);
}

unsubscribe取消订阅

提供两种方式的取消订阅方式

  1. 单条topic
  2. 多条topic
//单条topic
public void unsubscribe(String clientHandle, final String topic, String invocationContext, String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    client.unsubscribe(topic, invocationContext, activityToken);
}

//取消多条
public void unsubscribe(String clientHandle, final String[] topic, String invocationContext, String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    client.unsubscribe(topic, invocationContext, activityToken);
}

获取待发送的消息token

/**
 * Get tokens for all outstanding deliveries for a client
 * 获取一个client对应的所有待发送的消息的token
 *
 * @param clientHandle identifies the MqttConnection
 * @return an array (possibly empty) of tokens
 */
public IMqttDeliveryToken[] getPendingDeliveryTokens(String clientHandle) {
    MqttConnection client = getConnection(clientHandle);
    return client.getPendingDeliveryTokens();
}

消息送达ack

在收到消息后,会返回给服务端消息送达ack确认

具体逻辑:

  1. 发送消息持久化中删除对应的消息 详见:DatabaseMessageStore
  2. 返回确认消息 Status.OK = 0,Status.ERROR = 1
/**
 * Called by the Activity when a message has been passed back to the
 * application
 * 主动返回ack:消息送达确认消息
 *
 * @param clientHandle identifier for the client which received the message
 * @param id           identifier for the MQTT message
 * @return {@link Status}
 */
public Status acknowledgeMessageArrival(String clientHandle, String id) {
    if (messageStore.discardArrived(clientHandle, id)) {
        return Status.OK;
    } else {
        return Status.ERROR;
    }
}

MqttConnection

离线vs断线

offline和 disconnected

/**
* Receive notification that we are offline<br>
* if cleanSession is true, we need to regard this as a disconnection
* 在收到Android 网络状态广播触发
* 如果cleanSession=true的时候,此方法不执行有效内容
*/
void offline() {
    // cleanSession = true 无效
    if (!disconnected && !cleanSession) {
        Exception e = new Exception("Android offline");
        connectionLost(e);
    }
}

connectionLost

public void connectionLost(Throwable why) {
    service.traceDebug(TAG, "connectionLost(" + why.getMessage() + ")");
    disconnected = true;
    try {
        // 在没有配置自动重连的情况下,调用MqttAsyncClient的断开连接
        if (!this.connectOptions.isAutomaticReconnect()) {
            myClient.disconnect(null, new IMqttActionListener() {

                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    // No action
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken,
                        Throwable exception) {
                    // No action
                }
            });
        } else {
            // Using the new Automatic reconnect functionality.
            // We can't force a disconnection, but we can speed one up
            //
            alarmPingSender.schedule(100);

        }
    } catch (Exception e) {
        // ignore it - we've done our best
    }

    Bundle resultBundle = new Bundle();
    resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.ON_CONNECTION_LOST_ACTION);
    if (why != null) {
        resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE, why.getMessage());
        if (why instanceof MqttException) {
            resultBundle.putSerializable(MqttServiceConstants.CALLBACK_EXCEPTION, why);
        }
        resultBundle.putString(MqttServiceConstants.CALLBACK_EXCEPTION_STACK, Log.getStackTraceString(why));
    }
    service.callbackToActivity(clientHandle, Status.OK, resultBundle);
    // client has lost connection no need for wake lock
    releaseWakeLock();
}

reconnect重连

在线的情况下,会触发reconnect() 方法进行重连

synchronized void reconnect() {

    if (myClient == null) {
        service.traceError(TAG, "Reconnect myClient = null. Will not do reconnect");
        return;
    }

    if (isConnecting) {
        service.traceDebug(TAG, "The client is connecting. Reconnect return directly.");
        return;
    }

    if (!service.isOnline()) {
        service.traceDebug(TAG, "The network is not reachable. Will not do reconnect");
        return;
    }

    // 自动重连
    if (connectOptions.isAutomaticReconnect()) {
        //The Automatic reconnect functionality is enabled here
        Log.i(TAG, "Requesting Automatic reconnect using New Java AC");
        // 通过Bundle进行记录
        final Bundle resultBundle = new Bundle();
        resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, reconnectActivityToken);
        resultBundle.putString(MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, null);
        resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.CONNECT_ACTION);
        try {
            // 调用 MqttAsyncClient 的重连
            myClient.reconnect();
        } catch (MqttException ex) {
            Log.e(TAG, "Exception occurred attempting to reconnect: " + ex.getMessage());
            setConnectingState(false);
            // 将bundle发送给调用方
            handleException(resultBundle, ex);
        }
    } else if (disconnected && !cleanSession) {
        // 断联且不清除session
        // use the activityToke the same with action connect
        service.traceDebug(TAG, "Do Real Reconnect!");
        final Bundle resultBundle = new Bundle();
        resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, reconnectActivityToken);
        resultBundle.putString(MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, null);
        resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.CONNECT_ACTION);

        try {

            IMqttActionListener listener = new MqttConnectionListener(resultBundle) {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    // since the device's cpu can go to sleep, acquire a
                    // wakelock and drop it later.
                    service.traceDebug(TAG, "Reconnect Success!");
                    service.traceDebug(TAG, "DeliverBacklog when reconnect.");
                    resultBundle.putBoolean(MqttServiceConstants.SESSION_PRESENT, asyncActionToken.getSessionPresent());
                    doAfterConnectSuccess(resultBundle);
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE, exception.getLocalizedMessage());
                    resultBundle.putSerializable(MqttServiceConstants.CALLBACK_EXCEPTION, exception);
                    service.callbackToActivity(clientHandle, Status.ERROR, resultBundle);

                    doAfterConnectFail(resultBundle);
                }
            };

            // 调用connect机制,走回调,将状态返回给调用方
            myClient.connect(connectOptions, null, listener);
            setConnectingState(true);
        } catch (MqttException e) {
            service.traceError(TAG, "Cannot reconnect to remote server." + e.getMessage());
            setConnectingState(false);
            handleException(resultBundle, e);
        } catch (Exception e) {
            /*  TODO: Added Due to: https://github.com/eclipse/paho.mqtt.android/issues/101
                For some reason in a small number of cases, myClient is null here and so
    a NullPointer Exception is thrown. This is a workaround to pass the exception
    up to the application. myClient should not be null so more investigation is
    required.
*/
            service.traceError(TAG, "Cannot reconnect to remote server." + e.getMessage());
            setConnectingState(false);
            MqttException newEx = new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR, e.getCause());
            handleException(resultBundle, newEx);
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读