org.eclipse.paho.client.mqttv3 源
2017-02-09 本文已影响741人
TragedyGo
前面介绍了发送,我们回顾下:
总体接收简单点,就是控制Qos的响应复杂点
在connect的时候开启了Task:
public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
synchronized (conLock) {
MqttConnect connect = new MqttConnect(client.getClientId(),
conOptions.getMqttVersion(),
conOptions.isCleanSession(),
conOptions.getKeepAliveInterval(),
conOptions.getUserName(),
conOptions.getPassword(),
conOptions.getWillMessage(),
conOptions.getWillDestination());
this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval());
this.clientState.setCleanSession(conOptions.isCleanSession());
this.clientState.setMaxInflight(conOptions.getMaxInflight());
tokenStore.open();
ConnectBG conbg = new ConnectBG(this, token, connect);
conbg.start();
}
.......
}
private class ConnectBG implements Runnable {
........
public void run() {
try {
NetworkModule networkModule = networkModules[networkModuleIndex];
networkModule.start();
receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
receiver.start("MQTT Rec: "+getClient().getClientId());
sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
sender.start("MQTT Snd: "+getClient().getClientId());
callback.start("MQTT Call: "+getClient().getClientId());
internalSend(conPacket, conToken);
}
........
}
}
然后启动CommsSender:
public class CommsReceiverimplements Runnable {
private boolean running = false;
private Object lifecycle = new Object();
private ClientState clientState = null;
private ClientComms clientComms = null;
private MqttInputStream in;
private CommsTokenStore tokenStore = null;
private Thread recThread = null;
private volatile boolean receiving;
........
/**
* Run loop to receive messages from the server.
*/
public void run() {
final String methodName = "run";
MqttToken token = null;
while (running && (in != null)) {
try {
receiving = in.available() > 0;
MqttWireMessage message = in.readMqttWireMessage(); // 【: 1】
receiving = false;
if (message instanceof MqttAck) {
token = tokenStore.getToken(message); // 【: 2】
if (token!=null) {
synchronized (token) {
// Ensure the notify processing is done under a lock on the token
// This ensures that the send processing can complete before the
// receive processing starts! ( request and ack and ack processing
// can occur before request processing is complete if not!
clientState.notifyReceivedAck((MqttAck)message); // 【: 3】
}
} else {
// It its an ack and there is no token then something is not right.
// An ack should always have a token assoicated with it.
throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
}
} else {
// A new message has arrived
clientState.notifyReceivedMsg(message); // 【: 4】
}
}
catch (MqttException ex) {
running = false;
// Token maybe null but that is handled in shutdown
clientComms.shutdownConnection(token, ex);
}
catch (IOException ioe) {
running = false;
// An EOFException could be raised if the broker processes the
// DISCONNECT and ends the socket before we complete. As such,
// only shutdown the connection if we're not already shutting down.
if (!clientComms.isDisconnecting()) {
clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
}
}
finally {
receiving = false;
}
}
}
}
异常处理我就先不介绍了,细节要自己看下。
1.MqttInputStream来自前面介绍TcpNetworkModule,这个类就是socket底层代码。
2.tokenStore.getToken(message),因为是确认ack的,他重连和ping的关键,特殊message,在自己发出去接受到服务器应答的时候已经被持久化了。然后同步下完成ACK确认就3。
3.看下ACK确认实现
/**
* Called by the CommsReceiver when an ack has arrived.
*
* @param message
* @throws MqttException
*/
protected void notifyReceivedAck(MqttAck ack) throws MqttException {
this.lastInboundActivity = System.currentTimeMillis();
MqttToken token = tokenStore.getToken(ack);
MqttException mex = null;
if (token == null) {
// @TRACE 662=no message found for ack id={0}
log.fine(CLASS_NAME, methodName, "662", new Object[] {
new Integer(ack.getMessageId())});
} else if (ack instanceof MqttPubRec) {
// Complete the QoS 2 flow. Unlike all other
// flows, QoS is a 2 phase flow. The second phase sends a
// PUBREL - the operation is not complete until a PUBCOMP
// is received
MqttPubRel rel = new MqttPubRel((MqttPubRec) ack);
this.send(rel, token);
} else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {
// QoS 1 & 2 notify users of result before removing from
// persistence
notifyResult(ack, token, mex);
// Do not remove publish / delivery token at this stage
// do this when the persistence is removed later
} else if (ack instanceof MqttPingResp) {
synchronized (pingOutstandingLock) {
pingOutstanding = Math.max(0, pingOutstanding-1);
notifyResult(ack, token, mex);
if (pingOutstanding == 0) {
tokenStore.removeToken(ack);
}
}
} else if (ack instanceof MqttConnack) {
int rc = ((MqttConnack) ack).getReturnCode();
if (rc == 0) {
synchronized (queueLock) {
if (cleanSession) {
clearState();
// Add the connect token back in so that users can be
// notified when connect completes.
tokenStore.saveToken(token,ack);
}
inFlightPubRels = 0;
actualInFlight = 0;
restoreInflightMessages();
connected();
}
} else {
mex = ExceptionHelper.createMqttException(rc);
throw mex;
}
clientComms.connectComplete((MqttConnack) ack, mex);
notifyResult(ack, token, mex);
tokenStore.removeToken(ack);
// Notify the sender thread that there maybe work for it to do now
synchronized (queueLock) {
queueLock.notifyAll();
}
} else {
// Sub ack or unsuback
notifyResult(ack, token, mex);
releaseMessageId(ack.getMessageId());
tokenStore.removeToken(ack);
}
checkQuiesceLock(); // 这货就是确认队列是否空,否则就释放锁
}
根据不同的消息实现逻辑转换,MqttPubComp和MqttPubAck,处理 QoS 1 & 2持久化重连的遗留问题,MqttPubRec表示发布失败需要重发,MqttConnack自己处理链接重开,心跳包很重要
4.这个就是你自己实现和关注的消息类容。
/**
* Called by the CommsReceiver when a message has been received.
* Handles inbound messages and other flows such as PUBREL.
*
* @param message
* @throws MqttException
*/
protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
final String methodName = "notifyReceivedMsg";
this.lastInboundActivity = System.currentTimeMillis();
if (!quiescing) {
if (message instanceof MqttPublish) {
MqttPublish send = (MqttPublish) message;
switch (send.getMessage().getQos()) {
case 0:
case 1:
if (callback != null) {
callback.messageArrived(send); // 【: 5】
}
break;
case 2:
persistence.put(getReceivedPersistenceKey(message),
(MqttPublish) message);
inboundQoS2.put(new Integer(send.getMessageId()), send);
this.send(new MqttPubRec(send), null); // 【: 6】
break;
default:
//should NOT reach here
}
} else if (message instanceof MqttPubRel) {
MqttPublish sendMsg = (MqttPublish) inboundQoS2
.get(new Integer(message.getMessageId()));
if (sendMsg != null) {
if (callback != null) {
callback.messageArrived(sendMsg); // 【: 7】
}
} else {
// Original publish has already been delivered.
MqttPubComp pubComp = new MqttPubComp(message
.getMessageId());
this.send(pubComp, null); // 【: 8】
}
}
}
}
5.Qos1或者0 就直接通知外部callback消息盒子,然后单独线程回调界面消息了
/**
* This method is called when a message arrives on a topic. Messages are
* only added to the queue for inbound messages if the client is not
* quiescing.
*
* @param sendMessage
* the MQTT SEND message.
*/
public void messageArrived(MqttPublish sendMessage) {
final String methodName = "messageArrived";
if (mqttCallback != null || callbacks.size() > 0) {
// If we already have enough messages queued up in memory, wait
// until some more queue space becomes available. This helps
// the client protect itself from getting flooded by messages
// from the server.
synchronized (spaceAvailable) {
while (running && !quiescing && messageQueue.size() >= INBOUND_QUEUE_SIZE) {
try {
spaceAvailable.wait(200);
} catch (InterruptedException ex) {
}
}
}
if (!quiescing) {
messageQueue.addElement(sendMessage);
// Notify the CommsCallback thread that there's work to do...
synchronized (workAvailable) {
workAvailable.notifyAll();
}
}
}
}
全部加入消息盒子队列,如果满了或者正在处理,会wait一下
6.Qos为2 是精确只发一次需要通知服务器和客户端全部都收到了,不然重传
7.就是Qos为2的情况的到达确认。
8.合并消息,然后重发。
然后就是消息队列轮询,没有就阻塞,有就通知界面了。相对于发送,接收的类简单点。大家可以回顾下前面的发送过程。