2021-10-14 Android MQTT 的管理类

2021-10-14  本文已影响0人  fjasmin
public class MqttManage {
    private static final String TAG = "MqttManage";
    private static final String CLIENT_ID = "123";
    private static final short KEEP_ALIVE = 30;//心跳
    private static final int PERIOD_RETRY = 10 * 1000;
    private static final String HIGHWAY = "highway";
    private static final String PUSH = HIGHWAY + "_device";

    private MqttAndroidClient mqttAndroidClient;
    private MqttConnectOptions mqttConnectOptions;
    private Context context;
    private boolean connected = false;
    private ScheduledExecutorService reconnectPool;//重连线程池
    private IMqttListener iMqttListener = null;

    private final String imei;

    public interface IMqttListener {
        void onMqttConnect();

        void onMqttDisConnect();

        void onMqttMessage(@NonNull Protocol message);
    }

    public MqttManage(@NonNull Context context, @NonNull String imei) {
        this.context = context;
        this.imei = imei;
        mqttAndroidClient = new MqttAndroidClient(context, BuildConfig.Host_Mqtt, CLIENT_ID);
        mqttAndroidClient.setCallback(mqttCallback);

        mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setConnectionTimeout(30);
        mqttConnectOptions.setKeepAliveInterval(KEEP_ALIVE);
        mqttConnectOptions.setCleanSession(true);
    }

    public void setMqttListener(IMqttListener iMqttListener) {
        this.iMqttListener = iMqttListener;
    }

    /**
     * 连接MQTT服务器
     */
    private synchronized void doClientConnection() {
        if (!mqttAndroidClient.isConnected()) {
            try {
                mqttAndroidClient.connect(mqttConnectOptions, context, iMqttActionListener);
                Log.d(TAG, "mqttAndroidClient-connecting-" + mqttAndroidClient.getClientId());
            } catch (MqttException e) {
                Log.e(TAG, e.toString());
            }
        }
    }

    private final IMqttActionListener iMqttActionListener = new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken asyncActionToken) {
            Log.d(TAG, "connect-" + "onSuccess");
            connected = true;
            closeReconnectTask();
            startSubscribe();
        }

        @Override
        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
            //connect-onFailure-MqttException (0) - java.net.UnknownHostException
            Log.e(TAG, "connect-" + "onFailure-" + exception);
            connected = false;
            if (iMqttListener != null) {
                iMqttListener.onMqttDisConnect();
            }
            startReconnectTask();
        }
    };

    public boolean isConnected(){
        return connected;
    }

    private MqttCallback mqttCallback = new MqttCallback() {
        @Override
        public void connectionLost(Throwable cause) {
            //close-connectionLost-等待来自服务器的响应时超时 (32000)
            //close-connectionLost-已断开连接 (32109)
            Log.d(TAG, "close-" + "connectionLost-" + cause);

            if (cause != null) {//null表示被关闭
                startReconnectTask();
            }
        }

        @Override
        public void messageArrived(String topic, MqttMessage message) {
            Log.d(TAG, String.format(Locale.getDefault(), "messageArrived topic[%s], message[%s]", topic, new String(message.getPayload())));
            Protocol msg = new Gson().fromJson(new String(message.getPayload()), Protocol.class);
            if(!topic.endsWith(msg.getImei())){
                Log.w(TAG, "消息imei错误,不予处理");
                return;
            }

            if(iMqttListener != null){
                iMqttListener.onMqttMessage(msg);
            }
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            try {
                Log.d(TAG, "deliveryComplete-" + token.getMessage().toString());
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    };

    public void connect() {
        doClientConnection();
    }

    private synchronized void startReconnectTask() {
        if (reconnectPool != null) {
            return;
        }
        reconnectPool = Executors.newScheduledThreadPool(1);
        reconnectPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                Log.d(TAG, "re-connect");
                doClientConnection();
            }
        }, 0, PERIOD_RETRY, TimeUnit.MILLISECONDS);
    }

    private synchronized void closeReconnectTask() {
        if (reconnectPool != null) {
            reconnectPool.shutdownNow();
            reconnectPool = null;
        }
    }

    public void startSubscribe(){
        final String topic = HIGHWAY + this.imei;
        try {
            mqttAndroidClient.unsubscribe(topic);
            mqttAndroidClient.subscribe(topic, 0, context, new IMqttActionListener() {

                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    if (iMqttListener != null) {
                        iMqttListener.onMqttConnect();
                    }
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.e(TAG, String.format(Locale.getDefault(), "subscribe topic[%s] failure", topic));
                    Log.e(TAG, exception.toString());
                }
            });
        } catch (MqttException e) {
            Log.e(TAG, e.toString());
        }
    }

    public void pushMessage(@NonNull Protocol.Builder builder){
        if(!mqttAndroidClient.isConnected()){
            return;
        }
        try {
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(new Gson().toJson(new Protocol(builder)).getBytes());
            mqttMessage.setQos(0);
            mqttMessage.setRetained(false);

            mqttAndroidClient.publish(PUSH, mqttMessage, context, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.v(TAG, "publish success");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.e(TAG, "publish failure");
                }
            });
        } catch (MqttException e) {
            Log.v(TAG, e.toString());
        }
    }

    public void release() {
        if (mqttAndroidClient != null) {
            mqttAndroidClient.close();
            mqttAndroidClient.unregisterResources();
        }
    }
}

上一篇下一篇

猜你喜欢

热点阅读