MQTT探索之路iOS开发高级

iOS MQTT - 2.连接以及连接过程解析

2018-03-03  本文已影响239人  Codepgq

之前一直都忘记这个东西了……其实就是懒癌发作了。

一、 连接

- (void)connectTo:(NSString *)host
                  port:(NSInteger)port
                   tls:(BOOL)tls
             keepalive:(NSInteger)keepalive
                 clean:(BOOL)clean
                  auth:(BOOL)auth
                  user:(NSString *)user
                  pass:(NSString *)pass
             willTopic:(NSString *)willTopic
                  will:(NSData *)will
               willQos:(MQTTQosLevel)willQos
        willRetainFlag:(BOOL)willRetainFlag
          withClientId:(NSString *)clientId;

映入眼帘就是一大把参数,不要被这个吓着了,我们一个参数一个参数的看

host : 地址
port : 端口
tls: 加密  ssl
keepalive: 保活时间
clean: 断开的时候是否清理
auth:授权
user: 用户名
pass: 密码
willTopic: 主题
will: 数据 订阅主题之后发送的数据
willQos: 消息级别在第一篇文章有介绍 
    MQTTQosLevelAtMostOnce = 0, 最多一次
    MQTTQosLevelAtLeastOnce = 1,最少一次
    MQTTQosLevelExactlyOnce = 2 只有一次
willRetainFlag:服务器是否重发
clientID:客户端ID如果不写会自己随机生成一个



调用这个就创建Session,这里需要我们设置代理去监听我们连接的状态以及消息发布和订阅都会在delegate中。

如果你点击进去看过源码就会发现内部会创建一个Session。
如果你不用系统给你的manager就需要自己创建session,并且切记设置代理。还需要注意的是Session的Transport是可能不一样的,假设我使用的WEBSocket的就需要把Transport设置为MQTTWebsocketTransport,否则就用MQTTCFSocketTransport

self.session = [[MQTTSession alloc] initWithClientId:clientId
                                            userName:auth ? user : nil
                                            password:auth ? pass : nil
                                           keepAlive:keepalive
                                        cleanSession:clean
                                                will:will
                                           willTopic:willTopic
                                             willMsg:willMsg
                                             willQoS:willQos
                                      willRetainFlag:willRetainFlag
                                       protocolLevel:protocolLevel
                                             runLoop:[NSRunLoop currentRunLoop]
                                             forMode:NSDefaultRunLoopMode
                                      securityPolicy:securityPolicy
                                        certificates:certificates
                                       transportType:MQTTTransportTypeWebSocket];
MQTTCoreDataPersistence *persistence = [[MQTTCoreDataPersistence alloc] init];
persistence.persistent = self.persistent;
persistence.maxWindowSize = self.maxWindowSize;
persistence.maxSize = self.maxSize;
persistence.maxMessages = self.maxMessages;
self.session.persistence = persistence;



现在再来看看delegate有哪些方法,这里拿重要的说:

1、收到订阅通道的消息就会来这个回调

- (void)newMessage:(MQTTSession *)session
              data:(NSData *)data
           onTopic:(NSString *)topic
               qos:(MQTTQosLevel)qos
          retained:(BOOL)retained
               mid:(unsigned int)mid;

2、MQTT的连接状态

- (void)handleEvent:(MQTTSession *)session event:(MQTTSessionEvent)eventCode error:(NSError *)error;

3、连接部分的代理

/** gets called when a connection has been successfully established
 @param session the MQTTSession reporting the connect
 
 */
- (void)connected:(MQTTSession *)session;

/** gets called when a connection has been successfully established
 @param session the MQTTSession reporting the connect
 @param sessionPresent represents the Session Present flag sent by the broker
 
 */
- (void)connected:(MQTTSession *)session sessionPresent:(BOOL)sessionPresent;

/** gets called when a connection has been refused
 @param session the MQTTSession reporting the refusal
 @param error an optional additional error object with additional information
 */
- (void)connectionRefused:(MQTTSession *)session error:(NSError *)error;

/** gets called when a connection has been closed
 @param session the MQTTSession reporting the close

 */
- (void)connectionClosed:(MQTTSession *)session;

/** gets called when a connection error happened
 @param session the MQTTSession reporting the connect error
 @param error an optional additional error object with additional information
 */
- (void)connectionError:(MQTTSession *)session error:(NSError *)error;



通常我们在连接成功之后就可以发布信息,以及订阅通道。

二、连接过程

先放一张我自己整理的图片

MQTT连接过程.png
先从左边看起吧
1、调用connectAndWaitTimeout这个方法
- (BOOL)connectAndWaitTimeout:(NSTimeInterval)timeout {
    NSDate *started = [NSDate date];
    self.synchronConnect = TRUE;
    
    [self connect];
    
    [[NSRunLoop currentRunLoop] addPort:[NSMachPort port] forMode:NSRunLoopCommonModes];
    
    while (self.synchronConnect && (timeout == 0 || started.timeIntervalSince1970 + timeout > [NSDate date].timeIntervalSince1970)) {
        DDLogVerbose(@"[MQTTSessionSynchron] waiting for connect");
        [[NSRunLoop currentRunLoop] runUntilDate:[NSDate dateWithTimeIntervalSinceNow:.1]];
    }
    
    DDLogVerbose(@"[MQTTSessionSynchron] end connect");
    
    return (self.status == MQTTSessionStatusConnected);
}
1、得到当前时间作为开始时间
2、把标志synchronConnect设置为true
3、开始连接
4、在当前runloop中提添加一个端口
5、等待连接反馈或者超时
2、调用connect方法

- (void)connect {

    if (MQTTStrict.strict &&
        self.clientId && self.clientId.length < 1 &&
        !self.cleanSessionFlag) {
        NSException* myException = [NSException
                                    exceptionWithName:@"clientId must be at least 1 character long if cleanSessionFlag is off"
                                    reason:[NSString stringWithFormat:@"clientId length = %lu", [self.clientId dataUsingEncoding:NSUTF8StringEncoding].length]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        !self.clientId) {
        NSException* myException = [NSException
                                    exceptionWithName:@"clientId must not be nil"
                                    reason:[NSString stringWithFormat:@"clientId length = %lu", [self.clientId dataUsingEncoding:NSUTF8StringEncoding].length]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        [self.clientId dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) {
        NSException* myException = [NSException
                                    exceptionWithName:@"clientId may not be longer than 65535 bytes in UTF8 representation"
                                    reason:[NSString stringWithFormat:@"clientId length = %lu", [self.clientId dataUsingEncoding:NSUTF8StringEncoding].length]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        ![self.clientId dataUsingEncoding:NSUTF8StringEncoding]) {
        NSException* myException = [NSException
                                    exceptionWithName:@"clientId must not contain non-UTF8 characters"
                                    reason:[NSString stringWithFormat:@"clientId = %@", self.clientId]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        [self.userName dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) {
        NSException* myException = [NSException
                                    exceptionWithName:@"userName may not be longer than 65535 bytes in UTF8 representation"
                                    reason:[NSString stringWithFormat:@"userName length = %lu", [self.userName dataUsingEncoding:NSUTF8StringEncoding].length]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        ![self.userName dataUsingEncoding:NSUTF8StringEncoding]) {
        NSException* myException = [NSException
                                    exceptionWithName:@"userName must not contain non-UTF8 characters"
                                    reason:[NSString stringWithFormat:@"password = %@", self.userName]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        !self.userName) {
        NSException* myException = [NSException
                                    exceptionWithName:@"password specified without userName"
                                    reason:[NSString stringWithFormat:@"password = %@", self.password]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.protocolLevel != MQTTProtocolVersion31 &&
        self.protocolLevel != MQTTProtocolVersion311 &&
        self.protocolLevel != MQTTProtocolVersion50) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Illegal protocolLevel"
                                    reason:[NSString stringWithFormat:@"%d is not 3, 4, or 5", self.protocolLevel]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        !self.willFlag &&
        self.willTopic) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Will topic must be nil if willFlag is false"
                                    reason:[NSString stringWithFormat:@"%@", self.willTopic]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        !self.willFlag &&
        self.willMsg) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Will message must be nil if willFlag is false"
                                    reason:[NSString stringWithFormat:@"%@", self.willMsg]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        !self.willFlag &&
        self.willRetainFlag) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Will retain must be false if willFlag is false"
                                    reason:[NSString stringWithFormat:@"%d", self.willRetainFlag]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        !self.willFlag &&
        self.willQoS != MQTTQosLevelAtMostOnce) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Will QoS Level must be 0 if willFlag is false"
                                    reason:[NSString stringWithFormat:@"%d", self.willQoS]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.willQoS != MQTTQosLevelAtMostOnce &&
        self.willQoS != MQTTQosLevelAtLeastOnce &&
        self.willQoS != MQTTQosLevelExactlyOnce) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Illegal will QoS level"
                                    reason:[NSString stringWithFormat:@"%d is not 0, 1, or 2", self.willQoS]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.willFlag &&
        !self.willTopic) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Will topic must not be nil if willFlag is true"
                                    reason:[NSString stringWithFormat:@"%@", self.willTopic]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.willTopic &&
        self.willTopic.length < 1) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Will topic must be at least 1 character long"
                                    reason:[NSString stringWithFormat:@"%@", self.willTopic]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.willTopic &&
        [self.willTopic dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) {
        NSException* myException = [NSException
                                    exceptionWithName:@"willTopic may not be longer than 65535 bytes in UTF8 representation"
                                    reason:[NSString stringWithFormat:@"willTopic length = %lu", [self.willTopic dataUsingEncoding:NSUTF8StringEncoding].length]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.willTopic &&
        ![self.willTopic dataUsingEncoding:NSUTF8StringEncoding]) {
        NSException* myException = [NSException
                                    exceptionWithName:@"willTopic must not contain non-UTF8 characters"
                                    reason:[NSString stringWithFormat:@"willTopic = %@", self.willTopic]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.willTopic &&
        ([self.willTopic containsString:@"+"] ||
         [self.willTopic containsString:@"#"])
        ) {
        NSException* myException = [NSException
                                    exceptionWithName:@"willTopic must not contain wildcards"
                                    reason:[NSString stringWithFormat:@"willTopic = %@", self.self.willTopic]
                                    userInfo:nil];
        @throw myException;
    }

    if (MQTTStrict.strict &&
        self.willFlag &&
        !self.willMsg) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Will message must not be nil if willFlag is true"
                                    reason:[NSString stringWithFormat:@"%@", self.willMsg]
                                    userInfo:nil];
        @throw myException;
    }

    DDLogVerbose(@"[MQTTSession] connecting");
    if (self.cleanSessionFlag) {
        [self.persistence deleteAllFlowsForClientId:self.clientId];
        [self.subscribeHandlers removeAllObjects];
        [self.unsubscribeHandlers removeAllObjects];
        [self.publishHandlers removeAllObjects];
    }
    [self tell];

    self.status = MQTTSessionStatusConnecting;

    self.decoder = [[MQTTDecoder alloc] init];
    self.decoder.runLoop = self.runLoop;
    self.decoder.runLoopMode = self.runLoopMode;
    self.decoder.delegate = self;
    [self.decoder open];

    self.transport.delegate = self;
    [self.transport open];
}
0、先做一系列的判断,如果不通过就输出异常
1、根据cleanSessionFlag状态清除数组中的数据
2、[self tell]
3、设置当前状态为连接状态
4、创建一个解码对象MQTTDecoder,并赋值
5、调用decoder的open 方法
6、设置transport代理
7、调用transport 的 open 方法
3、在tell中方法又做了什么
- (void)tell {
    NSUInteger incoming = [self.persistence allFlowsforClientId:self.clientId
                                                   incomingFlag:YES].count;
    NSUInteger outflowing = [self.persistence allFlowsforClientId:self.clientId
                                                     incomingFlag:NO].count;
    if ([self.delegate respondsToSelector:@selector(buffered:flowingIn:flowingOut:)]) {
        [self.delegate buffered:self
                      flowingIn:incoming
                     flowingOut:outflowing];
    }
    if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
        [self.delegate buffered:self
                         queued:0
                      flowingIn:incoming
                     flowingOut:outflowing];
    }
}
数据库查找未输入/输出的数据count,
然后看代理存不存在,存在就把内部遍历出来的数组的count调用出去
4、再然后open方法中
- (void)open {
    self.state = MQTTDecoderStateDecodingHeader;
}
设置标志
5、transportopen方法

这里就复杂点了,首先transport是在Session创建的时候指定的,这里Transport的类型有MQTTCFSocketTransportMQTTWebsocketTransportMQTTSSLSecurityPolicyTransport三种transport,每个都不一样。

先来看MQTTSSLSecurityPolicyTransport中做了什么

- (void)open {
    DDLogVerbose(@"[MQTTSSLSecurityPolicyTransport] open");
    self.state = MQTTTransportOpening;

    NSError* connectError;

    CFReadStreamRef readStream;
    CFWriteStreamRef writeStream;

    CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)self.host, self.port, &readStream, &writeStream);

    CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);
    CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);

    if (self.tls) {
        NSMutableDictionary *sslOptions = [[NSMutableDictionary alloc] init];
        
        // delegate certificates verify operation to our secure policy.
        // by disabling chain validation, it becomes our responsibility to verify that the host at the other end can be trusted.
        // the server's certificates will be verified during MQTT encoder/decoder processing.
        sslOptions[(NSString*)kCFStreamSSLLevel] = (NSString *)kCFStreamSocketSecurityLevelNegotiatedSSL;
        sslOptions[(NSString *)kCFStreamSSLValidatesCertificateChain] = @NO;
        
        if (self.certificates) {
            sslOptions[(NSString *)kCFStreamSSLCertificates] = self.certificates;
        }
        
        if(!CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
            connectError = [NSError errorWithDomain:@"MQTT"
                                               code:errSSLInternal
                                           userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl input stream!"}];
        }
        if(!CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
            connectError = [NSError errorWithDomain:@"MQTT"
                                               code:errSSLInternal
                                           userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl output stream!"}];
        }
    }
    
    if(!connectError){
        self.encoder = [[MQTTSSLSecurityPolicyEncoder alloc] init];
        self.encoder.stream = CFBridgingRelease(writeStream);
        self.encoder.securityPolicy = self.tls ? self.securityPolicy : nil;
        self.encoder.securityDomain = self.tls ? self.host : nil;
        self.encoder.runLoop = self.runLoop;
        self.encoder.runLoopMode = self.runLoopMode;
        self.encoder.delegate = self;
        if (self.voip) {
            [self.encoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
        }
        [self.encoder open];
        
        self.decoder = [[MQTTSSLSecurityPolicyDecoder alloc] init];
        self.decoder.stream =  CFBridgingRelease(readStream);
        self.decoder.securityPolicy = self.tls ? self.securityPolicy : nil;
        self.decoder.securityDomain = self.tls ? self.host : nil;
        self.decoder.runLoop = self.runLoop;
        self.decoder.runLoopMode = self.runLoopMode;
        self.decoder.delegate = self;
        if (self.voip) {
            [self.decoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
        }
        [self.decoder open];
        
    } else {
        [self close];
    }
}
1、设置 MQTTTransportState 为 MQTTTransportOpening
2、创建一个readStreamRef和一个writeStreamRef,以及一个NSError对象
3、判断是否加密,是就进行加密处理(内部如果出现了错误信息,就把上面创建的NSError对象实现)
4、判断是否有错误信息,如果有就close掉没有就继续执行
5、分别创建两个对象,一个encoder一个decoder,用于解编码。
6、依次调用encoder 和 decoder 的 open 方法,等待消息接收处理

在来看MQTTWebsocketTransport中做了什么

- (void)open {
    DDLogVerbose(@"[MQTTWebsocketTransport] open");
    self.state = MQTTTransportOpening;
    
    NSMutableURLRequest *urlRequest = [NSMutableURLRequest requestWithURL:[self endpointURL]];
    urlRequest.SR_SSLPinnedCertificates = self.pinnedCertificates;
    NSArray <NSString *> *protocols = @[@"mqtt"];
    
    self.websocket = [[SRWebSocket alloc] initWithURLRequest:urlRequest
                                                   protocols:protocols
                              allowsUntrustedSSLCertificates:self.allowUntrustedCertificates];
    
    self.websocket.delegate = self;
    [self.websocket open];
}
1、设置 MQTTTransportState 为 MQTTTransportOpening
2、创建一个URLRequest [self endPointURL]
3、设置URLRequest的证书
4、创建一个协议数组,并且添加协议MQTT [@“mqtt”]
5、创建SRWebsocket对象 websocket
6、设置SRWebcoket对象的代理 websocket.delegate = self
7、打开连接 websocket open, 最终和上面的类似,也是打开Stream通道

这里还有一个方法是需要介绍的:endpointURL

- (NSURL*) endpointURL {
    NSString *protocol = (self.tls) ? @"wss" : @"ws";
    NSString *portString = (self.port == 0) ? @"" : [NSString stringWithFormat:@":%d",(unsigned int)self.port];
    NSString *path = self.path;
    NSString *urlString = [NSString stringWithFormat:@"%@://%@%@%@",
                           protocol,
                           self.host,
                           portString,
                           path];
    NSURL *url = [NSURL URLWithString:urlString];
    return url;
}
拼接一个URL字符串
1、判断是否加密 self.tls 生成 @“wss” 或者 @“ws”
2、判断端口号是否为 0 ,生成端口
3、路径 默认为 @“/mqtt”
4、组合字符串 协议://地址端口路径
5、生成 URL
6、返回URL

最后一个就是MQTTCFSocketTransport

    DDLogVerbose(@"[MQTTCFSocketTransport] open");
    self.state = MQTTTransportOpening;

    NSError* connectError;

    CFReadStreamRef readStream;
    CFWriteStreamRef writeStream;

    CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)self.host, self.port, &readStream, &writeStream);

    CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);
    CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);
    
    if (self.tls) {
        NSMutableDictionary *sslOptions = [[NSMutableDictionary alloc] init];
        
        sslOptions[(NSString*)kCFStreamSSLLevel] = (NSString *)kCFStreamSocketSecurityLevelNegotiatedSSL;
        
        if (self.certificates) {
            sslOptions[(NSString *)kCFStreamSSLCertificates] = self.certificates;
        }
        
        if(!CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
            connectError = [NSError errorWithDomain:@"MQTT"
                                               code:errSSLInternal
                                           userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl input stream!"}];
        }
        if(!CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
            connectError = [NSError errorWithDomain:@"MQTT"
                                               code:errSSLInternal
                                           userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl output stream!"}];
        }
    }
    
    if(!connectError){
        self.encoder.delegate = nil;
        self.encoder = [[MQTTCFSocketEncoder alloc] init];
        self.encoder.stream = CFBridgingRelease(writeStream);
        self.encoder.runLoop = self.runLoop;
        self.encoder.runLoopMode = self.runLoopMode;
        self.encoder.delegate = self;
        if (self.voip) {
            [self.encoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
        }
        [self.encoder open];
        
        self.decoder.delegate = nil;
        self.decoder = [[MQTTCFSocketDecoder alloc] init];
        self.decoder.stream =  CFBridgingRelease(readStream);
        self.decoder.runLoop = self.runLoop;
        self.decoder.runLoopMode = self.runLoopMode;
        self.decoder.delegate = self;
        if (self.voip) {
            [self.decoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
        }
        [self.decoder open];
        
    } else {
        [self close];
    }
}
1、设置 MQTTTransportState 为 MQTTTransportOpening
2、创建一个readStreamRef和一个writeStreamRef,以及一个NSError对象
3、判断是否加密,是就进行加密处理(内部如果出现了错误信息,就把上面创建的NSError对象实现)
4、判断是否有错误信息,如果有就close掉没有就继续执行
5、分别创建两个对象,一个encoder一个decoder,用于解编码。
6、依次调用encoder 和 decoder 的 open 方法,等待消息接收处理

整个的流程和SSLSecurityPolicyTransport一样,
只是一个是加密的,一个是不加密的,所以两个编码、解码都不一样。



到这里之后连接部分就已经完成了,但是刚才说的是左边的连接,现在我们看看右边的又有什么不同。右边的就是根据MQTTSessionManager这个类进入的


在外面调用了connect....方法之后呢

    if (shouldReconnect) {
        DDLogVerbose(@"[MQTTSessionManager] reconnecting");
        [self disconnect];
        [self reconnect];
    } else {
        DDLogVerbose(@"[MQTTSessionManager] connecting");
        [self connectToInternal];
    }
如果还没连接,就先连接



而后调用

- (void)connectToInternal {
    if (self.session && self.state == MQTTSessionManagerStateStarting) {
        [self updateState:MQTTSessionManagerStateConnecting];
        [self.session connectToHost:self.host
                               port:self.port
                           usingSSL:self.tls];
    }
}
判断是否已经在连接了,如果没有,就标志正在连接,并且开始连接



调用连接方法

- (void)connectToHost:(NSString*)host port:(UInt32)port usingSSL:(BOOL)usingSSL {
    [self connectToHost:host port:port usingSSL:usingSSL connectHandler:nil];
}

- (void)connectToHost:(NSString *)host
                 port:(UInt32)port
             usingSSL:(BOOL)usingSSL
       connectHandler:(MQTTConnectHandler)connectHandler {
    DDLogVerbose(@"MQTTSessionLegacy connectToHost:%@ port:%d usingSSL:%d connectHandler:%p",
                 host, (unsigned int)port, usingSSL, connectHandler);
    
    
    if (self.transportType == MQTTTransportTypeWebSocket) {
        MQTTWebsocketTransport *transport = [[MQTTWebsocketTransport alloc] init];
        transport.host = host;
        transport.port = port;
        transport.tls = usingSSL;
        transport.runLoopMode = self.runLoopMode;
        transport.runLoop = self.runLoop;
        self.transport = transport;
    }else if (self.securityPolicy) {
        MQTTSSLSecurityPolicyTransport *transport = [[MQTTSSLSecurityPolicyTransport alloc] init];
        transport.host = host;
        transport.port = port;
        transport.tls = usingSSL;
        transport.securityPolicy = self.securityPolicy;
        transport.certificates = self.certificates;
        transport.voip = self.voip;
        transport.runLoop = self.runLoop;
        transport.runLoopMode = self.runLoopMode;
        self.transport = transport;
        
    } else {
        MQTTCFSocketTransport *transport = [[MQTTCFSocketTransport alloc] init];
        transport.host = host;
        transport.port = port;
        transport.tls = usingSSL;
        transport.certificates = self.certificates;
        transport.voip = self.voip;
        transport.runLoop = self.runLoop;
        transport.runLoopMode = self.runLoopMode;
        self.transport = transport;
    }
    
    [self connectWithConnectHandler:connectHandler];
}

1、判断连接类型,看需要哪种transport,然后创建并设置一些属性
2、开始连接,并且把回调也传入

调用connectWithConnectHandler方法

- (void)connectWithConnectHandler:(MQTTConnectHandler)connectHandler {
    DDLogVerbose(@"[MQTTSession] connectWithConnectHandler:%p", connectHandler);
    self.connectHandler = connectHandler;
    [self connect];
}
1、保存回调
2、开始连接

调用connect方法在这里就回到了左边的connect方法中

上一篇下一篇

猜你喜欢

热点阅读