MQTT基于安卓的使用
2019-07-28 本文已影响0人
Heweii
前言
上一篇提到了MQTT的通用方式,由于智能家居TV的项目网络波动频繁,通用的方式已经无法满足需求,经常会出现重复订阅导致收到多条消息,那就只能另辟蹊径了,最终找到了梦寐以求的MqttAndroidClient。
1.集成
集成方式和上一篇的MQTT简介和使用要新增配置,build.gradle新增
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
manifest文件里面需要注册服务
<!-- Mqtt Service -->
<service android:name="org.eclipse.paho.android.service.MqttService" />
2.MqttAndroidClient重要源码解析
MqttAndroidClient是专门对MQTTClient的再封装拓展类,包含了订阅、连接以及多线程的处理,直接看MqttAndroidClient对于连接的封装源码,非关键代码已省略
public IMqttToken connect(MqttConnectOptions options, Object userContext,
IMqttActionListener callback) throws MqttException {
if (mqttService == null) {
}
else {
pool.execute(new Runnable() {
@Override
public void run() {
doConnect();
}
});
}
return token;
}
doConnect()连接操作放在子线程,有效避免网络波动连接时间过长阻塞主线程
private void doConnect() {
...
mqttService.connect(clientHandle, connectOptions, null,
activityToken);
...
}
阿里专门针对安卓客户端写了一个MQTTService,方便统一管理,除了连接操作,重连,断开连接都是在MQTTService中完成。
public void connect(String clientHandle, MqttConnectOptions connectOptions,
String invocationContext, String activityToken)throws MqttSecurityException, MqttException {
MqttConnection client = getConnection(clientHandle);
client.connect(connectOptions, null, activityToken);
}
public void connect(MqttConnectOptions options, String invocationContext,
String activityToken) {
...
if (myClient != null) {
if (isConnecting ) {
}else if(!disconnected){
}
else {
service.traceDebug(TAG, "myClient != null and the client is not connected");
service.traceDebug(TAG,"Do Real connect!");
setConnectingState(true);
myClient.connect(connectOptions, invocationContext, listener);
}
}
...
}
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)throws MqttException, MqttSecurityException {
final String methodName = "connect";
if (comms.isConnected()) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
}
if (comms.isConnecting()) {
throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
}
if (comms.isDisconnecting()) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
}
if (comms.isClosed()) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
}
...
connectActionListener.connect();
return userToken;
}
源码自身对于isConnected、isConnecting、isDisconnecting、isClosed做了异常处理,避免正在连接或者断开连接时连接造成重复连接。后面的源码就没贴的必要了,就是开启一个连接线程。
3.MqttAndroidClient的使用
一行代码完成MQTT的连接
mqttAndroidClient.connect(mqttConnectOptions, null, iMqttActionListener);
当然,这样是远远不够的,在实际应用中发现有一个问题,断网一段时间后重连网络MQTT不会自动重连,所以还得我们来做手动优化。思路很简单,在断开的节点开启重连线程,连接成功后关闭重连线程。以下贴的是完整代码,主要注意重连机制和订阅前记得取消订阅再订阅。(部分包含自己的代码,可以忽略,注释很详细)
public class MQTTManager {
private Context mContext;
private MqttAndroidClient mqttAndroidClient;
private String clientId;//自定义
private MqttConnectOptions mqttConnectOptions;
private ScheduledExecutorService reconnectPool;//重连线程池
public MQTTManager(Context mContext) {
this.mContext = mContext;
}
public void buildClient() {
closeMQTT();//先关闭上一个连接
buildMQTTClient();
}
private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
TVLog.i("connect-"+"onSuccess");
closeReconnectTask();
subscribeToTopic();
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
//connect-onFailure-MqttException (0) - java.net.UnknownHostException
TVLog.i("connect-"+ "onFailure-"+exception);
startReconnectTask();
}
};
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
//close-connectionLost-等待来自服务器的响应时超时 (32000)
//close-connectionLost-已断开连接 (32109)
TVLog.i("close-"+"connectionLost-"+cause);
if (cause != null) {//null表示被关闭
startReconnectTask();
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String body = new String(message.getPayload());
TVLog.i("messageArrived-"+message.getId()+"-"+body);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try {
TVLog.i("deliveryComplete-"+token.getMessage().toString());
} catch (MqttException e) {
e.printStackTrace();
}
}
};
private void buildMQTTClient(){
mqttAndroidClient = new MqttAndroidClient(mContext, MQTTCons.Broker, clientId);
mqttAndroidClient.setCallback(mqttCallback);
mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setKeepAliveInterval(20);
mqttConnectOptions.setCleanSession(true);
try {
mqttConnectOptions.setUserName("Signature|" + MQTTCons.AcessKey + "|" + MQTTCons.instanceId);
mqttConnectOptions.setPassword(MacSignature.macAndSignature(clientId, MQTTCons.SecretKey).toCharArray());
} catch (Exception e) {
}
doClientConnection();
}
private synchronized void startReconnectTask(){
if (reconnectPool != null)return;
reconnectPool = Executors.newScheduledThreadPool(1);
reconnectPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
doClientConnection();
}
} , 0 , 5*1000 , TimeUnit.MILLISECONDS);
}
private synchronized void closeReconnectTask(){
if (reconnectPool != null) {
reconnectPool.shutdownNow();
reconnectPool = null;
}
}
/**
* 连接MQTT服务器
*/
private synchronized void doClientConnection() {
if (!mqttAndroidClient.isConnected()) {
try {
mqttAndroidClient.connect(mqttConnectOptions, null, iMqttActionListener);
TVLog.d("mqttAndroidClient-connecting-"+mqttAndroidClient.getClientId());
} catch (MqttException e) {
e.printStackTrace();
}
}
}
private void subscribeToTopic() {//订阅之前会取消订阅,避免重连导致重复订阅
try {
String registerTopic = "";//自定义
String controlTopic = "";//自定义
String[] topicFilter=new String[]{registerTopic , controlTopic };
int[] qos={0,0};
mqttAndroidClient.unsubscribe(topicFilter, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
TVLog.i("unsubscribe-"+"success");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
TVLog.i("unsubscribe-"+"failed-"+exception);
}
});
mqttAndroidClient.subscribe(topicFilter, qos, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {//订阅成功
TVLog.i("subscribe-"+"success");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// startReconnectTask();
TVLog.i("subscribe-"+"failed-"+exception);
}
});
} catch (MqttException ex) {
}
}
public void sendMQTT(String topicSep, String msg) {
try {
if (mqttAndroidClient == null)return;
MqttMessage message = new MqttMessage();
message.setPayload(msg.getBytes());
String topic = "";//自定义
mqttAndroidClient.publish(topic, message, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// TVLog.i("sendMQTT-"+"success:" + msg);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// startReconnectTask();
TVLog.i("sendMQTT-"+"failed:" + msg);
}
});
} catch (MqttException e) {
}
}
public void closeMQTT(){
closeReconnectTask();
if (mqttAndroidClient != null){
try {
mqttAndroidClient.unregisterResources();
mqttAndroidClient.disconnect();
TVLog.i("closeMQTT-"+mqttAndroidClient.getClientId());
mqttAndroidClient = null;
} catch (MqttException e) {
e.printStackTrace();
}
}
}
}
调用方式如下,想要做线程安全的单例的可以自己封装
if (mqttManager == null)
mqttManager = new MQTTManager(getApplicationContext());
mqttManager.buildClientId();
结语
基于安卓部分也算是完结了,里面也夹杂着一些源码解释,后续会写更多关于源码的解析。