Android上简化使用Mqtt client
2016-03-28 本文已影响10627人
通常做法是要写很多类(通常是Boardcast)来实现确保service同Activity组件之间的通信,就像Eclipse paho的Android service一样。
本文基于Eclipse的paho框架的java client端,在Android上引入EventBus来做事件分发,简化了代码量,同时方便实现。
public class MqttManager {
// 单例
private static MqttManager mInstance = null;
// 回调
private MqttCallback mCallback;
// Private instance variables
private MqttClient client;
private MqttConnectOptions conOpt;
private boolean clean = true;
private MqttManager() {
mCallback = new MqttCallbackBus();
public static MqttManager getInstance() {
if (null == mInstance) {
mInstance = new MqttManager();
return mInstance;
/** * 释放单例, 及其所引用的资源 */
public static void release() {
try {
if (mInstance != null) {
mInstance = null;
} catch (Exception e) {
* 创建Mqtt 连接
* @param brokerUrl Mqtt服务器地址(tcp://xxxx:1863)
* @param userName 用户名
* @param password 密码
* @param clientId clientId
* @return
public boolean creatConnect(String brokerUrl, String userName, String password, String clientId) {
boolean flag = false;
String tmpDir = System.getProperty("");
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
try {
// Construct the connection options object that contains connection parameters
// such as cleanSession and LWT
conOpt = new MqttConnectOptions();
if (password != null) {
if (userName != null) {
// Construct an MQTT blocking mode client
client = new MqttClient(brokerUrl, clientId, dataStore);
// Set this wrapper as the callback handler
flag = doConnect();
} catch (MqttException e) {
return flag;
* 建立连接
* @return
public boolean doConnect() {
boolean flag = false;
if (client != null) {
try {
Logger.d("Connected to " + client.getServerURI() + " with client ID " + client.getClientId());
flag = true;
} catch (Exception e) {
return flag;
* Publish / send a message to an MQTT server
* @param topicName the name of the topic to publish to
* @param qos the quality of service to delivery the message at (0,1,2)
* @param payload the set of bytes to send to the MQTT server
* @return boolean
public boolean publish(String topicName, int qos, byte[] payload) {
boolean flag = false;
if (client != null && client.isConnected()) {
Logger.d("Publishing to topic \"" + topicName + "\" qos " + qos);
// Create and configure a message
MqttMessage message = new MqttMessage(payload); message.setQos(qos);
// Send the message to the server, control is not returned until
// it has been delivered to the server meeting the specified
// quality of service.
try {
client.publish(topicName, message);
flag = true;
} catch (MqttException e) {
return flag;
* Subscribe to a topic on an MQTT server
* Once subscribed this method waits for the messages to arrive from the server
* that match the subscription. It continues listening for messages until the enter key is
* pressed.
* @param topicName to subscribe to (can be wild carded)
* @param qos the maximum quality of service to receive messages at for this subscription
* @return boolean
public boolean subscribe(String topicName, int qos) {
boolean flag = false;
if (client != null && client.isConnected()) {
// Subscribe to the requested topic
// The QoS specified is the maximum level that messages will be sent to the client at.
// For instance if QoS 1 is specified, any messages originally published at QoS 2 will
// be downgraded to 1 when delivering to the client but messages published at 1 and 0
// will be received at the same level they were published at.
Logger.d("Subscribing to topic \"" + topicName + "\" qos " + qos);
try {
client.subscribe(topicName, qos);
flag = true;
} catch (MqttException e) {
return flag;
* 取消连接
* @throws MqttException
public void disConnect() throws MqttException {
if (client != null && client.isConnected()) {
在Mqtt Callback方法中发送Evenet事件:
public class MqttCallbackBus implements MqttCallback {
public void connectionLost(Throwable cause) {
public void messageArrived(String topic, MqttMessage message) {
Logger.d(topic + "====" + message.toString());
public void deliveryComplete(IMqttDeliveryToken token) {
- 由于EventBus不支持跨进程通信,所以当在service是独立进程时,无法在主进程中接收到EventBus分发的事件。
- 连接等操作是较费时的,为了避免ANR,应在子线程中完成操作,也可以考虑使用RxJava来简化。