Android

MQTT、RxJava封装

2020-09-25  本文已影响0人  h2coder

最近物联网的项目需要做推送,功能实现方式有以下几种:

MQTT简介

消息发布有以下质量选择:

  1. “至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。

  2. “至少一次”,确保消息到达,但消息重复可能会发生。

  3. “只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。

选用方案

物联网一般也使用MQTT,为了后续其他项目也方便用,需要对MQTT做一次封装,由于RxJava的Retry重试系列操作符对重连可以很简单的封装。消息一对多分发刚好RxJava的多订阅者进行消息,很容易解耦,所以将MQTT、RxJava整合封装。

第三方库

我这里采用paho.mqtt.android这个第三方库。

repositories {
    maven {
        url "https://repo.eclipse.org/content/repositories/paho-releases/"
    }
}
dependencies {
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1'
}
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
        package="com.app.mqtt">

    <uses-permission android:name="android.permission.INTERNET"/>
    <uses-permission android:name="android.permission.WAKE_LOCK"/>
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"/>

    <application>
        <!-- MqttService -->
        <service android:name="org.eclipse.paho.android.service.MqttService" />
    </application>
</manifest>

知识点补充

MQTT是基于发布、订阅的消息模式,支持一对多消息发布和订阅多个主题,先来说一下一些必要的知识点:

类结构

代码时间

MqttImpl的代码较多,我们最后逐个讲解~

public interface MqttApi {
    /**
     * 连接服务器
     *
     * @param option 配置
     */
    Observable<Boolean> connectServer(MqttOption option);

    /**
     * 断开连接
     */
    Observable<Boolean> disconnectServer();

    /**
     * 重启连接
     */
    Observable<Boolean> restartConnectServer();

    /**
     * 发布消息
     *
     * @param topic   主题
     * @param message 消息文本
     */
    Observable<Boolean> publishMessage(String topic, String message);

    /**
     * 监听消息接收
     */
    Observable<MqttMessage> subscribeMessage();

    /**
     * 订阅连接状态
     */
    Observable<ConnectionStatus> subscribeConnectionStatus();

    /**
     * 订阅消息发送状态
     */
    Observable<MessagePublishStatus> subscribeMessagePublishStatus();

    /**
     * 动态订阅一个主题
     *
     * @param topic 主题
     */
    Observable<Boolean> subscribeTopic(String... topic);

    /**
     * 动态取消订阅一个主题
     *
     * @param topic 主题
     */
    Observable<Boolean> unsubscribeTopic(String... topic);
}
public class MqttProxy implements MqttApi {
    private final MqttApi mImpl;

    private MqttProxy() {
        mImpl = new MqttImpl(ContextProvider.get().getContext());
    }

    private static final class SingleHolder {
        private static final MqttProxy INSTANCE = new MqttProxy();
    }

    public static MqttProxy getInstance() {
        return SingleHolder.INSTANCE;
    }

    @Override
    public Observable<Boolean> connectServer(MqttOption option) {
        return mImpl.connectServer(option);
    }

    @Override
    public Observable<Boolean> disconnectServer() {
        return mImpl.disconnectServer();
    }
    
    //...省略其他方法的转调代码
}
public class MqttMessage {
    /**
     * 订阅的主题
     */
    private String topic;
    /**
     * 接收到的消息
     */
    private String message;

    public MqttMessage(String topic, String message) {
        this.topic = topic;
        this.message = message;
    }

    //...省略get、set方法
}
public class MqttOption {
    private String serverUrl;
    private String username;
    private String passWord;
    private String clientId;
    private String[] topics;
    private int retryIntervalTime;
    private int connectionTimeout;
    private int keepAliveInterval;

    private MqttOption(Builder builder) {
        this.serverUrl = builder.serverUrl;
        this.username = builder.username;
        this.passWord = builder.passWord;
        this.clientId = builder.clientId;
        this.topics = builder.topics == null ? new String[0] : builder.topics;
        this.retryIntervalTime = builder.retryIntervalTime == 0 ? 2 : builder.retryIntervalTime;
        this.connectionTimeout = builder.connectionTimeout <= 0 ? 10 : builder.connectionTimeout;
        this.keepAliveInterval = builder.keepAliveInterval <= 0 ? 30 : builder.keepAliveInterval;
    }

    public String getServerUrl() {
        return serverUrl;
    }

    public String getUsername() {
        return username;
    }

    public String getPassWord() {
        return passWord;
    }
    
    //...省略其他参数的get方法

    public static class Builder {
        /**
         * 服务端地址
         */
        private String serverUrl;
        /**
         * 账号
         */
        private String username;
        /**
         * 密码
         */
        private String passWord;
        /**
         * 客户端Id,必须唯一
         */
        private String clientId;
        /**
         * 需要订阅的主题,连接成功后,会自动订阅
         */
        private String[] topics;
        /**
         * 重试间隔时间,单位为秒
         */
        private int retryIntervalTime;
        /**
         * 连接超时时间
         */
        private int connectionTimeout;
        /**
         * 保持活动时间,超过时间没有消息收发将会触发ping消息确认
         */
        private int keepAliveInterval;

        public Builder setServerUrl(String serverUrl) {
            this.serverUrl = serverUrl;
            return this;
        }

        public Builder setUsername(String username) {
            this.username = username;
            return this;
        }

        //...省略其他参数的set方法

        public MqttOption build() {
            return new MqttOption(this);
        }
    }
}
public enum MqttQos {
    /**
     * 最多一次,有可能重复或丢失
     */
    MOST_ONCE(0),
    /**
     * 至少一次,有可能重复
     */
    LEAST_ONE(1),
    /**
     * 只有一次,确保消息只到达一次(用于比较严格的计费系统)
     */
    ONLY_ONE(2);

    private int mCode;

    MqttQos(int code) {
        mCode = code;
    }

    public int getCode() {
        return mCode;
    }
}
public class NetworkUtil {
    private NetworkUtil() {
    }

    /**
     * 当前是否有网络状态
     *
     * @param context  上下文
     * @param needWifi 是否只有连接上wifi才算是连接上网络
     */
    public static boolean hasNetWorkStatus(Context context, boolean needWifi) {
        NetworkInfo info = getActiveNetwork(context);
        if (info == null) {
            return false;
        }
        if (!needWifi) {
            return info.isAvailable();
        } else if (info.getType() == ConnectivityManager.TYPE_WIFI) {
            return info.isAvailable();
        }
        return false;
    }

    /**
     * 获取活动网络连接信息
     *
     * @param context 上下文
     * @return NetworkInfo
     */
    public static NetworkInfo getActiveNetwork(Context context) {
        ConnectivityManager mConnMgr = (ConnectivityManager) context
                .getSystemService(Context.CONNECTIVITY_SERVICE);
        if (mConnMgr == null) {
            return null;
        }
        // 获取活动网络连接信息
        return mConnMgr.getActiveNetworkInfo();
    }
}
public class MessagePublishStatus {
    /**
     * 是否发送完毕
     */
    private boolean isComplete;
    /**
     * 发送的消息内容
     */
    private String message;

    public MessagePublishStatus(boolean isComplete, String message) {
        this.isComplete = isComplete;
        this.message = message;
    }
    
    //省略get、set方法
}
public class ConnectionStatus {
    /**
     * 是否丢失连接
     */
    private boolean isLost;
    /**
     * 是否在重新连接
     */
    private boolean isRetry;
    /**
     * 丢失连接时的异常
     */
    private Throwable error;

    public ConnectionStatus(boolean isLost, Throwable error) {
        this.isLost = isLost;
        this.error = error;
    }

    public ConnectionStatus(boolean isRetry) {
        this.isRetry = isRetry;
    }

    //省略get、set方法
}
public class MqttImproperCloseException extends Exception {
    private static final long serialVersionUID = -4030414538155742302L;

    MqttImproperCloseException() {
    }

    public MqttImproperCloseException(String message) {
        super(message);
    }
}
  1. 构造方法初始化,保存Context上下文。创建MqttAndroidClient时需要用到。
public class MqttImpl implements MqttApi {
    /**
     * 上下文
     */
    private Context mContext;

    MqttImpl(Context context) {
        mContext = context.getApplicationContext();
    }
}
  1. 开启连接。

将传入的MqttOption配置转换为MQTT库提供的MqttConnectOptions配置对象并配置相关参数。创建一个MqttAndroidClient客户端,注意ClientId需要加上随机数保证多个账号在不同客户端中登录时,不会Id相同导致互抢连接。

因为连接可能会失败,所以加入了RxJava的retry()操作符,在连接失败时,手动Observable.error()抛出一个MqttImproperCloseException自定义异常,让retry操作符生效,再使用Observable.timer()操作符延时配置的重试间隔时间后重走可观察者的订阅方法进行重连。

由于很多情况都是手机网络不好或手动切换网络时,断开了MQTT的连接,所以加入了NetworkUtil工具类,检查当前网络情况,如果网络被关闭,则手动发送onNext(false),继续重走重试逻辑,避免不断地重试,调用MqttClient的connect()连接方法,不断的连接失败。

/**
 * 当前引用的配置
 */
private MqttOption mCurrentApplyOption;
/**
 * Mqtt客户端
 */
private MqttAndroidClient mMqttClient;
/**
 * 是否已连接
 */
private boolean isConnect;

@Override
public Observable<Boolean> connectServer(MqttOption option) {
    mCurrentApplyOption = option;
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            if (mMqttClient == null) {
                mMqttClient = new MqttAndroidClient(
                        mContext,
                        option.getServerUrl(),
                        option.getClientId(),
                        new MemoryPersistence());
            }
            if (isConnect) {
                emitter.onNext(true);
                return;
            }
            //如果没有网络,不开启连接
            if (!NetworkUtil.hasNetWorkStatus(mContext, false)) {
                emitter.onNext(false);
                return;
            }
            //进行链接配置
            MqttConnectOptions connectOptions = new MqttConnectOptions();
            //如果为false(flag=0),Client断开连接后,Server应该保存Client的订阅信息
            //如果为true(flag=1),表示Server应该立刻丢弃任何会话状态信息
            connectOptions.setCleanSession(true);
            //设置用户名和密码
            connectOptions.setUserName(option.getUsername());
            connectOptions.setPassword(option.getPassWord().toCharArray());
            //设置连接超时时间
            connectOptions.setConnectionTimeout(option.getConnectionTimeout());
            //设置心跳发送间隔时间,单位秒
            connectOptions.setKeepAliveInterval(option.getKeepAliveInterval());
            //设置遗嘱
            connectOptions.setWill("android-mqtt-offline-topic", "android-mqtt-is_offline".getBytes(), MqttQos.ONLY_ONE.getCode(), false);
            //开始连接
            mMqttClient.connect(connectOptions, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    emitter.onNext(true);
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    exception.printStackTrace();
                    emitter.onNext(false);
                }
            });
        }
    }).flatMap(new Function<Boolean, ObservableSource<Boolean>>() {
        @Override
        public ObservableSource<Boolean> apply(Boolean isSuccess) throws Exception {
            //连接成功,
            if (isSuccess) {
                return subscribeTopic(mCurrentApplyOption.getTopics());
            } else {
                //连接失败,重试
                return Observable.error(new MqttImproperCloseException());
            }
        }
    }).doOnNext(new Consumer<Boolean>() {
        @Override
        public void accept(Boolean isSuccess) throws Exception {
            isConnect = isSuccess;
        }
    }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
            return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Throwable throwable) throws Exception {
                    mMqttClient = null;
                    //所有异常都走重试,延时指定秒值进行重试
                    return Observable.timer(mCurrentApplyOption.getRetryIntervalTime(), TimeUnit.SECONDS)
                            .doOnNext(new Consumer<Long>() {
                                @Override
                                public void accept(Long aLong) throws Exception {
                                    if (mConnectionStatusListener != null) {
                                        mConnectionStatusListener.onRetryConnection();
                                    }
                                }
                            });
                }
            });
        }
    });
}
  1. 断开连接,将当前的链接断开,如果已经断开了,则不执行断开操作。
@Override
public Observable<Boolean> disconnectServer() {
    //已经断开连接了
    if (!isConnect) {
        return Observable.just(true);
    }
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            try {
                mMqttClient.disconnect(null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        mMqttClient = null;
                        emitter.onNext(true);
                    }

                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        exception.printStackTrace();
                        emitter.onNext(false);
                    }
                });
            } catch (MqttException e) {
                e.printStackTrace();
                emitter.onNext(false);
            }
        }
    }).doOnNext(new Consumer<Boolean>() {
        @Override
        public void accept(Boolean isSuccess) throws Exception {
            isConnect = !isSuccess;
        }
    });
}
  1. 重新连接,就是先断开连接,再调用连接。
@Override
public Observable<Boolean> restartConnectServer() {
    //先断开,再重新连接
    return disconnectServer().flatMap(new Function<Boolean, ObservableSource<Boolean>>() {
        @Override
        public ObservableSource<Boolean> apply(Boolean isSuccess) throws Exception {
            if (isSuccess) {
                return connectServer(mCurrentApplyOption);
            }
            return Observable.just(false);
        }
    });
}
  1. 发布消息,对特定Topic主题,发布消息。消息质量为MqttQos.ONLY_ONE,只有一次,确保消息只到达一次(用于比较严格的计费系统)
@Override
public Observable<Boolean> publishMessage(String topic, String message) {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            //连接未打开
            if (!isConnect) {
                emitter.onNext(false);
            }
            try {
                //发布消息
                mMqttClient.publish(topic, message.getBytes(), MqttQos.ONLY_ONE.getCode(), false);
                emitter.onNext(true);
            } catch (MqttException e) {
                e.printStackTrace();
                emitter.onNext(false);
            }
        }
    });
}
  1. 订阅消息,订阅回调对象为MqttMessage,MqttMessage中有消息的Topic主题和消息内容。
@Override
public Observable<MqttMessage> subscribeMessage() {
    return Observable.create(new ObservableOnSubscribe<MqttMessage>() {
        @Override
        public void subscribe(ObservableEmitter<MqttMessage> emitter) throws Exception {
            mMqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    //与服务器之间的连接失效
                    isConnect = false;
                    if (mConnectionStatusListener != null) {
                        mConnectionStatusListener.onConnectionLost(cause);
                    }
                    emitter.onError(cause);
                }

                @Override
                public void messageArrived(String topic, org.eclipse.paho.client.mqttv3.MqttMessage message) throws Exception {
                    //接收到消息
                    emitter.onNext(new MqttMessage(topic, message.toString()));
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    //发送完成
                    if (mMessagePublishStatusListener != null) {
                        try {
                            String message = token.getMessage().toString();
                            mMessagePublishStatusListener.onMessagePublishComplete(message);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
    }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
            return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Throwable throwable) throws Exception {
                    //延时重连
                    return Observable.timer(mCurrentApplyOption.getRetryIntervalTime(), TimeUnit.SECONDS).flatMap(new Function<Long, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Long aLong) throws Exception {
                            return connectServer(mCurrentApplyOption);
                        }
                    });
                }
            });
        }
    });
}
  1. 订阅连接状态,有2种情况,连接丢失和重连时的回调。
/**
 * 连接状态监听
 */
private OnConnectionStatusListener mConnectionStatusListener;

/**
 * 连接状态监听
 */
public interface OnConnectionStatusListener {
    /**
     * 连接丢失
     *
     * @param error 异常对象
     */
    void onConnectionLost(Throwable error);

    /**
     * 正在重试连接
     */
    void onRetryConnection();
}

private void setOnConnectionStatusListener(OnConnectionStatusListener connectionStatusListener) {
    mConnectionStatusListener = connectionStatusListener;
}

@Override
public Observable<ConnectionStatus> subscribeConnectionStatus() {
    return Observable.create(new ObservableOnSubscribe<ConnectionStatus>() {
        @Override
        public void subscribe(ObservableEmitter<ConnectionStatus> emitter) throws Exception {
            setOnConnectionStatusListener(new OnConnectionStatusListener() {
                @Override
                public void onConnectionLost(Throwable error) {
                    emitter.onNext(new ConnectionStatus(true, error));
                }

                @Override
                public void onRetryConnection() {
                    emitter.onNext(new ConnectionStatus(true));
                }
            });
        }
    });
}
  1. 订阅消息发送状态,暂时只有一个发送成功后的状态回调。
/**
 * 消息发布状态监听器
 */
private OnMessagePublishStatusListener mMessagePublishStatusListener;

/**
 * 消息发送状态监听
 */
public interface OnMessagePublishStatusListener {
    /**
     * 发送消息完成
     *
     * @param message 原始消息
     */
    void onMessagePublishComplete(String message);
}

private void setOnMessagePublishStatusListener(OnMessagePublishStatusListener messagePublishStatusListener) {
    mMessagePublishStatusListener = messagePublishStatusListener;
}

@Override
public Observable<MessagePublishStatus> subscribeMessagePublishStatus() {
    return Observable.create(new ObservableOnSubscribe<MessagePublishStatus>() {
        @Override
        public void subscribe(ObservableEmitter<MessagePublishStatus> emitter) throws Exception {
            setOnMessagePublishStatusListener((message)
                    -> emitter.onNext(new MessagePublishStatus(true, message)));
        }
    });
}
  1. 订阅主题,传入需要订阅的多个主题的数组即可。消息质量,我这里固定为MqttQos.ONLY_ONE,就是只有一次,确保消息只到达一次。有需要可以自行修改。
@Override
public Observable<Boolean> subscribeTopic(String... topic) {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            if (topic.length == 0) {
                emitter.onNext(true);
                return;
            }
            try {
                int size = topic.length;
                int[] qosArr = new int[size];
                for (int i = 0; i < size; i++) {
                    qosArr[i] = MqttQos.ONLY_ONE.getCode();
                }
                mMqttClient.subscribe(topic, qosArr);
                emitter.onNext(true);
            } catch (MqttException e) {
                e.printStackTrace();
                emitter.onNext(false);
            }
        }
    });
}
  1. 取消订阅主题,传入需要取消订阅的Topic数组即可。
@Override
public Observable<Boolean> unsubscribeTopic(String... topic) {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            try {
                mMqttClient.unsubscribe(topic);
                emitter.onNext(true);
            } catch (MqttException e) {
                e.printStackTrace();
                emitter.onNext(false);
            }
        }
    });
}

使用

我们新建一个Service,例如通知模块的MqttService,名为NoticeMqttService。在onStartCommand()方法回调时,使用MqttProxy类进行连接。后续则是消息接收订阅、连接状态订阅、发送状态订阅。最后是发送消息。

class NoticeMqttService : Service() {
    private val mDisposables: CompositeDisposable = CompositeDisposable()

    override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
        connectMqttService()
        return super.onStartCommand(intent, flags, startId)
    }

    override fun onDestroy() {
        super.onDestroy()
        if (!mDisposables.isDisposed) {
            mDisposables.clear()
        }
    }

    /**
     * 连接服务端Mqtt
     */
    private fun connectMqttService() {
        val userId = "10086"
        mDisposables.apply {
            MqttProxy.getInstance().apply {
                val topic = "/user/${userId}"
                //连接服务
                add(
                    connectServer(
                        MqttOption.Builder()
                            .setServerUrl("tcp://192.168.0.211:1883")
                            .setUsername("admin")
                            .setPassWord("admin")
                            .setClientId(userId)
                            .setTopics(mutableListOf(topic).toTypedArray())
                            .build()
                    ).ioToMain().subscribe({ isConnectSuccess ->
                        Logger.d("Mqtt连接是否成功: $isConnectSuccess")
                    }, { error ->
                        error.printStackTrace()
                        Logger.d("Mqtt连接失败,原因: ${error.message}")
                    })
                )
                //订阅消息接收
                add(
                    subscribeMessage()
                        .ioToMain()
                        .subscribe({ mqttMessage ->
                            val msg = "Mqtt收到消息: ${mqttMessage.message}"
                            //一般会在这里解析Json,并进行数据、UI处理
                            Logger.d(msg)
                        }, { error ->
                            error.printStackTrace()
                            Logger.d("Mqtt收到消息,但是抛出异常,原因: ${error.message}")
                        })
                )
                //订阅连接状态
                add(
                    subscribeConnectionStatus()
                        .ioToMain()
                        .subscribe({ status ->
                            if (status.isLost) {
                                Logger.d("Mqtt连接丢失,原因: ${status.error.message}")
                            } else if (status.isRetry) {
                                Logger.d("Mqtt重试连接中...")
                            }
                        }, { error ->
                            error.printStackTrace()
                        })
                )
                //订阅消息发送状态
                add(
                    subscribeMessagePublishStatus()
                        .ioToMain()
                        .subscribe({ status ->
                            if (status.isComplete) {
                                Logger.d("Mqtt消息发送完毕,消息内容: ${status.message}")
                            }
                        }, { error ->
                            error.printStackTrace()
                        })
                )
                //发送消息
                val msg = "我是测试消息"
                add(
                    publishMessage(topic, msg)
                        .subscribe({ result ->
                            Logger.d("Mqtt发送消息是否成功: $result")
                        }, { error ->
                            error.printStackTrace()
                            Logger.d("Mqtt发送消息失败,消息Topic${topic}, 消息内容: ${msg}")
                        })
                )
            }
        }
    }

    override fun onBind(intent: Intent?): IBinder? {
        return Binder()
    }
}

最后在清单文件中,配置Service

<manifest xmlns:android="http://schemas.android.com/apk/res/android"
        package="com.app.notice">

    <application>
        <!-- 通知模块的Mqtt消息接收服务 -->
        <service android:name=".service.NoticeMqttService" />
    </application>
</manifest>

总结

MQTT内容并不是太多,一般只有一个连接,如果不同模块需要不同的消息源,订阅不同的Topic主题即可。

上一篇 下一篇

猜你喜欢

热点阅读