socket 实现完整im通讯
以流的方式处理IM通讯问题好处是无需多次建立连接,另外消息的网络开销少,而xmpp会有很多冗余的信息;
使用开源项目:CocoaAsyncSocket ,源码访问 git 地址:https://github.com/robbiehanson/CocoaAsyncSocket
然后引入下面这四个类:
asyncSocket : 是基于GCD构建的TCP/IP 套接字,支持TLS / SSL,是线程安全的
asyncUdpSocket : 对于GCD构建的UDP套接字,是线程安全的
框架会自动处理排队、缓冲、等待、检查等...
MTU:最大传输单元(Maximum Transmission Unit,MTU)是指一种通信协议的某一层上面所能通过的最大数据包大小(以字节为单位);
TCP的MTU通常是1500bytes,去掉头部信息,大概剩下1460字节;
下面用
CocoaAsyncSocket 来处理IM通讯,但是会有一些问题,如粘包,分包问题;
通常每个 TCP 包头两个字节要指定数据的长度;
**粘包
:
**如果包头两个字节指定长度小于真实返回数据的长度,称为粘包;
分包:
因为一次只能传输大约1400字节,如果要传输2000字节,就需要分成2个包来处理;
首先要实现代理:AsyncSocketDelegate
@interface SocketManager : NSObject <UIApplicationDelegate,AsyncSocketDelegate>
{
BOOL allowSelfSignedCertificates;
BOOL allowSSLHostNameMismatch;
}
//主要代理,可在回调中实现:接受数据,链接,写入数据,断开链接等等....
//主要代理,可在回调中实现:接受数据,链接,写入数据,断开链接等等....
@protocol AsyncSocketDelegate
@optional
/**
- 即将断开链接, 在断开链接前,可使用 unreadData 来接收最后的数据
**/
- (void)onSocket:(AsyncSocket *)sock willDisconnectWithError:(NSError *)err;
/**
- 已经断开链接:可在此方法内释放 socket;
**/
- (void)onSocketDidDisconnect:(AsyncSocket *)sock;
/**
- 链接到新的socket时候会被调用
**/
- (void)onSocket:(AsyncSocket *)sock didAcceptNewSocket:(AsyncSocket *)newSocket;
/**
- 这个方法应该返回新的 socket 的runloop ,调用 [NSRunLoop currentRunLoop]
**/
- (NSRunLoop *)onSocket:(AsyncSocket *)sock wantsRunLoopForNewSocket:(AsyncSocket *)newSocket;
/**
- 即将建立链接调用,返回yes 继续,返回no 取消链接
**/
- (BOOL)onSocketWillConnect:(AsyncSocket *)sock;
/**
- 准备读写操作
**/
- (void)onSocket:(AsyncSocket *)sock didConnectToHost:(NSString *)host port:(UInt16)port;
/**
- 读取新数据
**/
- (void)onSocket:(AsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag;
/**
- 读取进度
**/
- (void)onSocket:(AsyncSocket *)sock didReadPartialDataOfLength:(NSUInteger)partialLength tag:(long)tag;
/**
- 完成写数据
**/
- (void)onSocket:(AsyncSocket *)sock didWriteDataWithTag:(long)tag;
/**
- 写数据的过程中调用
**/
- (void)onSocket:(AsyncSocket *)sock didWritePartialDataOfLength:(NSUInteger)partialLength tag:(long)tag;
/**
- 读取数据超时,通常设置>0的值,如果设置<0的值,则会按照默认值计算,会被调用多次
**/
- (NSTimeInterval)onSocket:(AsyncSocket *)sock
shouldTimeoutReadWithTag:(long)tag
elapsed:(NSTimeInterval)elapsed
bytesDone:(NSUInteger)length;
/**
- Note that this method may be called multiple times for a single write if you return positive numbers.
- 写操作超时,通常设置>0
**/
- (NSTimeInterval)onSocket:(AsyncSocket *)sock
shouldTimeoutWriteWithTag:(long)tag
elapsed:(NSTimeInterval)elapsed
bytesDone:(NSUInteger)length;
/**
- 成功建立SSL/TLS链接
**/
- (void)onSocketDidSecure:(AsyncSocket *)sock;
@end
define MAX_DATALENGTH 2000000
define HeartBeat_Byte 1
define DataLength_Byte 2
define HEARTBEAT_INTERVAL 60
typedef NS_ENUM(SInt32, HeartBeatType) {
HeartBeatPong = 0xFE,
HeartBeatPing = 0xFF
};
/**
- 创建链接:
**/
- (BOOL)connect
{
//创建 socket 并设置socket携带信息:SocketOfflineByServer
self.socket.userData = SocketOfflineByServer;
self.socket = [[AsyncSocket alloc] initWithDelegate:self];
[self.socket setRunLoopModes:[NSArray arrayWithObject:NSRunLoopCommonModes]];
//
if (![self.socket isConnected])
{
NSError *error = nil;
BOOL flag = [self.socket connectToHost:kSOCKET_HOST onPort:SOCKET_PORT withTimeout:TIME_OUT error:&error];
if (!flag) {
self.socket.userData = SocketOfflineByWifiCut;
//可以在这里执行连接失败回调
// if(self.loginBlock){
// self.loginBlock(NO,@"socket连接服务器失败!");
// }
}else{
//可以在这里执行连接成功回调
}
}
return YES;
}
/**
- 连接成功后,会调用这个方法;
**/
- (void)onSocket:(AsyncSocket *)sock didConnectToHost:(NSString *)host port:(UInt16)port
{
NSLog(@"------socket didConnectToHost---------");
//登录操作
[self login:nil source:nil];
//连接成功后,设置心跳,确保和服务器的连接
_timer = [NSTimer scheduledTimerWithTimeInterval:HEARTBEAT_INTERVAL target:self selector:@selector(keepLongConnect) userInfo:nil repeats:YES];
[_timer fire];
}
pragma mark 心跳连接
-(void)keepLongConnect{
//循环向服务器发送ping心跳
[self.socket writeData:[self getDataWithInt:HeartBeatPing] withTimeout:TIME_OUT tag:MsgTypePing];
}
pragma mark 登陆
/**
- 登录之前要执行注册:向服务器获取token
- 获取完token保存到本地
- 然后执行登录操作,把注册获得的token发送给服务器
/
-(void)login:(NSString)sn source:(NSString)source
{
if(sn){
self.sn = sn;
self.source = source;
}
//token:是服务器分配的唯一码,类似userID,主要用来区分用户身份;
NSString *token = [[NSUserDefaults standardUserDefaults] stringForKey:kMY_USER_TOKEN];
if (token==nil) {//未注册过
[self regist:sn source:source];
return;
}
if (self.socket!=nil && [self.socket isConnected]) {
//用ProtocolBuffers创建登录
MsgBuilder builder = [Msg defaultInstance].builder;
builder.msgType = MsgTypeLogin;
Login oneLogin = [[[Login builder] setClientToken:token] build];
builder.login = oneLogin;
// 向socket中写入登录数据
[self.socket writeData:builder.build.data withTimeout:20 tag:MsgTypeLogin];
}else{
[self connect];
}
}
pragma mark 心跳连接
-(void)keepLongConnect{
//循环向服务器发送ping心跳
[self.socket writeData:[self getDataWithInt:HeartBeatPing] withTimeout:TIME_OUT tag:MsgTypePing];
}
发送普通消息
pragma mark ------收发消息-------
- (void)sendMessage:(NSData *)msgData
{
[self.socket writeData:msgData withTimeout:20 tag:MsgTypeChat];
}
发送消息回调:
pragma mark 发送消息回调
- (void)onSocket:(AsyncSocket *)sock didWriteDataWithTag:(long)tag
{
switch (tag) {
//成功发送ping消息
case MsgTypePing:
NSLog(@"保持心跳连接...");
break;
// 成功发送登录消息
case MsgTypeLogin:
//继续从socket流里读取数据,读取完会调用:onSocket:(AsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag
[self.socket readDataWithTimeout:20 buffer:nil bufferOffset:0 maxLength:MAX_BUFFER tag:0];
break;
//成功发送普通消息
case MsgTypeChat:
break;
default:
break;
}
}
pragma mark 接收服务器返回消息回调
- (void)onSocket:(AsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag
{
if(!data)return;
//是否继续等待,如果上一个data还没有满包,就会继续等待
if(continueWaitData){
//处理接收的data数据,不断累加data
[self handleMsgData:data isContinueData:YES];
}else{
//处理接收的data数据
[self handleReceiveData:data];
}
//继续从socket流里读取数据,读取完会调用:onSocket:(AsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag
[self.socket readDataWithTimeout:TIME_OUT buffer:nil bufferOffset:0 maxLength:MAX_BUFFER tag:0];
}
/*******处理MsgData******/
-(void) handleMsgData:(NSData *)msgData isContinueData:(BOOL)isContinueData
{
// msgContentLength:是data 头两个字节指明的数据长度,但实际不一定是这个长度
NSInteger msgContentLength = 0;
NSInteger length;
/**
- 每个完整的返回数据是这样的: 77—————————— —————————— ——————————
- 前2位77代表数据的长度,过2位之后为数据信息,数据可能一次返回不完,分成多段返回,但余下的分段数据头部不会包含长度信息
- 先判断 是否是继续等待的data如果是 那么此data前面无需去判断前两位字节
- length:表示数据头部长度信息的长度
- 如果 isContinueData==true,length==0
**/
if(isContinueData){
//remainDataLength是指上次没接收完,这次还剩下的数据长度
msgContentLength = remainDataLength;
length = 0;
}else{
length = DataLength_Byte;//DataLength_Byte为2,即用2个字节来指明数据段的长度
//取前2个字节,获取data前两个字节指定的data长度
NSData * lengthData = [msgData subdataWithRange:NSMakeRange(0, length)];
msgContentLength = [self getIntWithData:lengthData];
}
//用 msgContentLength 和获取到的真实长度作对比
//消息的长度和得到的长度正好,直接处理
if(msgContentLength == msgData.length-length){
//要去掉前两个字节(长度信息),余下的字节才是所需要的数据
[receiveData appendData:[msgData subdataWithRange:NSMakeRange(length, msgContentLength)]];
[self getCompleteMsgData:receiveData];
//处理粘包问题:有冗余字节,所以需要去掉多余的字节再处理
}else if(msgContentLength < msgData.length-length){
[receiveData appendData:[msgData subdataWithRange:NSMakeRange(length, msgContentLength)]];
[self getCompleteMsgData:receiveData];
//继续处理冗余的字节
[self handleReceiveData:[msgData subdataWithRange:NSMakeRange(length+msgContentLength, msgData.length-length-msgContentLength)]];
//处理分包问题:数据没有一次性返回,所以需要多次回调并累加data
}else if(msgContentLength > msgData.length-length){
//remainDataLength:代表还有多长的数据没有返回;需要多次回调
remainDataLength = msgContentLength - (msgData.length - length);
[receiveData appendData:[msgData subdataWithRange:NSMakeRange(length, msgData.length - length)]];
continueWaitData = YES;
}
}
/*******先处理收到的NSData 处理各种情况******/
-(void) handleReceiveData:(NSData *)handleData
{
//数据异常,直接丢弃
if( handleData.length > MAX_DATALENGTH){
//断开socke然后重连
[self cutOffSocketConnect];
//只有1字节,可能是心跳
}else if(handleData.length == HeartBeat_Byte){
[self handleHeartBeatPingPongWith:handleData];
//>2字节,是正常信息
}else if(handleData.length > DataLength_Byte){
//先判断第一个字节是否是心跳
if([self handleHeartBeatPingPongWith:handleData] == YES){
[self handleReceiveData:[handleData subdataWithRange:NSMakeRange(HeartBeat_Byte, handleData.length-HeartBeat_Byte)]];
}else{
[self handleMsgData:handleData isContinueData:NO];
}
}
}
/*******处理心跳的ping pong问题******/
-(BOOL) handleHeartBeatPingPongWith:(NSData*)data
{
Byte *intByte = (Byte *)[data bytes];
NSInteger heartBeat =intByte[0];
if ( heartBeat == HeartBeatPing) {
//收到ping 给服务器回一个Pong
[self.socket writeData:[self getDataWithInt:HeartBeatPong] withTimeout:TIME_OUT tag:MsgTypePong];
return YES;
}else if(heartBeat == HeartBeatPong){
//收到pong 什么都不用处理
return YES;
}else{
return NO;
}
}
/*******把int类型转成一个字节的二进制 然后转成NSData******/
- (NSData *) getDataWithInt:(NSInteger)value
{
//把整数存储到byte数组,再用byte数组创建NSData
Byte intByte[1];
intByte[0] = value;
NSData * intData = [NSData dataWithBytes:intByte length:1];
return intData;
}
/*********把NSData 前两位字节取出 转成NSData********/
-(NSInteger) getIntWithData:(NSData *)data
{
//把data转成bytes数组
Byte *intByte = (Byte *)[data bytes];
NSInteger intValue =0;
//先取出第一个字节(第一个字节存放的是整数)
intValue = intByte[0];
//把 intValue 左移八位如:FF00 再和第二个字节相加,这样可以保证两个字节同时存储
intValue = (intValue << 8) + intByte[1];
return intValue;
}
pragma mark socket连接失败
- (void)onSocket:(AsyncSocket *)sock willDisconnectWithError:(NSError *)err
{
NSData * unreadData = [sock unreadData]; //读取未处理的消息
if(unreadData.length > 0) {
[self onSocket:sock didReadData:unreadData withTag:0];
} else {
NSLog(@" DisconnectWithError %ld err = %@",sock.userData,[err description]);
if (err.code == 57) {
self.socket.userData = SocketOfflineByWifiCut;
}
}
}
pragma mark 处理收到的 NSData
/*******处理完整的MsgData******/
-(void) getCompleteMsgData:(NSData *)completeMsgData
{
Msg * msg = [Msg parseFromData:completeMsgData];
[self cleanAllDataMark];
switch (msg.msgType) {
case MsgTypeLoginAck:{//登陆成功
//存储用户id
[[NSUserDefaults standardUserDefaults] setObject:msg.loginAck.clientId forKey:kMY_USER_ID];
[[NSUserDefaults standardUserDefaults] synchronize];
[SocketEngine shareInstance].uid = msg.loginAck.clientId;
if (msg.loginAck.status) {//clientToken 失效
self.socket.userData = SocketOfflineByWifiCut;
NSLog(@"-------token失效,重新注册token-------");
[self regist:self.sn source:self.source];
}else{
NSLog(@"--------登陆成功-------");
if (self.loginBlock) {
self.loginBlock(YES,@"登陆成功");
}
}
}
break;
case MsgTypeChat:{//接收普通消息
NSString *clientId = [[NSUserDefaults standardUserDefaults] stringForKey:kMY_USER_ID];
//给服务器回执,确认收到消息,否则断开
MsgBuilder builder = [Msg defaultInstance].builder;
builder.msgType = MsgTypeAck;
Ack ack = [[[[Ack builder] setMsgId:msg.chat.msgId] setClientId:clientId] build];
builder.ack = ack;
[self.socket writeData:builder.build.data withTimeout:20 tag:MsgTypeAck];
//去重
BOOL isRepeat = [self checkRepeatMsg:msg.chat.pb_from time:msg.chat.createTime];
if (isRepeat) {
return;
}
if (_timer) {
[_timer setFireDate: [[NSDate date] dateByAddingTimeInterval:HEARTBEAT_INTERVAL]];//timer延后20秒心跳
}
//存储消息
LSMsgItemInfo item = [[LSMsgItemInfo alloc]init];
item.senderUserId = clientId;
item.peerUserId = msg.chat.pb_from;
item.msgBody = msg.chat.body;
item.createTime = msg.chat.createTime;
item.msgType = [self convertType:msg.chat.bodyType];
item.msgId = [[MsgEngine shareInstance] getMsgId];//[[NSDate date] timeIntervalSince1970]1000;
item.showTime = NO;
item.isGroupMsg = NO;
item.isSender = NO;
item.msgStatus = STATUS_TYPE_SUCCESS;
item.message_id = [NSString stringWithFormat:@"%d",(int)msg.chat.msgId] ;
item.hasReaded = FALSE;
[self saveMessage:item];
//保存未读消息
if (!_isChatMode) {
[self setUnreadMsg:msg.chat.body userId:clientId peerId:msg.chat.pb_from];
}
//通知视图更新ui
if (item.msgType == LS_MSG_TYPE_IMAGE ) {
//解析压缩图
NSString *url = msg.chat.body;
NSString *suffix = [[msg.chat.body componentsSeparatedByString:@"."] lastObject];
if (suffix) {
NSString *prefix = [msg.chat.body substringToIndex:(msg.chat.body.length-suffix.length-1)];
url = [[prefix stringByAppendingString:@"_s100X100."] stringByAppendingString:suffix];
}
[items setObject:item forKey:[NSString stringWithFormat:@"%llu",item.msgId]];
[self downLoadFile:url fileName:item.msgId tag:-1];
}else if (item.msgType == LS_MSG_TYPE_AUDIO || item.msgType == LS_MSG_TYPE_VIDEO){
if (msg.chat.ext) {
NSDictionary *dic = [msg.chat.ext JSONValue];
item.mediaSecond = [[dic objectForKey:@"voice_length"] longValue];
}
[items setObject:item forKey:[NSString stringWithFormat:@"%llu",item.msgId]];
[self downLoadFile:msg.chat.body fileName:item.msgId tag:-1];
}else{
if (self.updateNewMsg) {
self.updateNewMsg(item);
}
}
SAFELY_RELEASE(item);
break;
}
case MsgTypeAck://消息回执
if (!dataBase) {
dataBase = [[MsgDataBase alloc]initWithUserId:[[SocketEngine shareInstance] uid]];
}
//更新存储记录,所有msg.ack.msgId 的记录并且发送中的 最后一条,态置为成功状态,其余发送中的msg.ack.msgId为失败
[self updateMessageStatus:[NSString stringWithFormat:@"%d",(int)msg.ack.msgId] status:STATUS_TYPE_SUCCESS];
//更新ui状态
if(self.statusBlock){
self.statusBlock(true, [NSString stringWithFormat:@"%d",(int)msg.ack.msgId]);
}
break;
case MsgTypeOfflineChat://接收离线消息
break;
default:
break;
}
NSLog(@"---------msg:%@",msg);
}