Flutterflutter

Flutter中基于TCP的Socket使用

2023-01-30  本文已影响0人  Mortaler

前段时间由于业务需要,使用到socket与PC端建立链接并传输文件,本篇文章主要记录在使用过程中涉及到的问题,包括本地socket服务、连接、消息发送与接收、拆包、大小端转换、文件切片、心跳、重连等内容。

启动本地socket服务

如果需要移动端作为socket服务端,可以使用 dart:io 库中的ServerSocket,通过IP和端口进行链接

startServer() async {
    try {
      _serverSocket =
          await ServerSocket.bind(InternetAddress.anyIPv4, serverPort);
      _serverSocket.listen(serverOnReceive);
    } catch (e, stackTrace) {
      LoggerUtil.e(e);
      LoggerUtil.e(stackTrace);
    }
  }

  serverOnReceive(Socket socket) {
    _socket = socket;

    _socket?.listen(serverReceiveMsg);
  }

serverReceiveMsg方法中收到消息进行拆包,这个下边会讲到。

Flutter连接其他Socket服务

这里我们需要使用到 dart:io 库中的socket.dart这个类中的Socket对象,使用.connect方法进行连接。

    Socket.connect(address, port,
            timeout: Duration(seconds: socketTimeout))
        .then((socket) async {
      _socket = socket;

      _socket?.listen(
        onReceivedMsg,
        onError: onError,
        onDone: onDone,
        cancelOnError: false,
      );
    }).catchError((error) {
      if (error is SocketException) {
        LoggerUtil.e(error);
      }
    });

接收socket消息并处理粘包

粘包简单来说就是多个socket消息连在了一起,每个socket消息前四个字节代表了这条消息的长度,我们根据前四个字节的内容来读取相应的长度,如此循环来读取所有的Socket消息内容

  //接收到socket消息
  onReceivedMsg(event) async {
    receiveList = receiveList + event;
    //当接收到的数据长度大于8读取消息头
    if (isPackReaded) {
      while (receiveList.length > 4) {
        isPackReaded = false;
        int headerLength = 4;
        //读取消息体长度
        int msgLength = byteToNum(receiveList.sublist(0, 4));

        //当收到的消息超过消息头描述的消息体长度时取出消息体并解码
        if (receiveList.length >= headerLength + msgLength) {
          List<int> bodyList =
              receiveList.sublist(headerLength, headerLength + msgLength);
          String bodyStr = utf8.decode(bodyList);
          //这里处理已经读取的Socket消息内容 进行解base64或者解密
          await analysisStr(bodyStr);
          //读取后删除已读取的消息
          receiveList = receiveList.sublist(headerLength + msgLength);
          if (receiveList.isEmpty) {
            isPackReaded = true;
          }
        } else {
          isPackReaded = true;
          break;
        }
      }
    }
  }

大小端转换

由于C中使用大端模式 所以要进行大小端的转换

  //小端转大端
  Uint8List int32BigEndianBytes(int value) {
    return Uint8List(4)..buffer.asByteData().setInt32(0, value, Endian.big);
  }

  //大端转小端
  int byteToNum(List<int> list) {
    Uint8List resultList = Uint8List.fromList(list);

    ByteData byteData = ByteData.view(resultList.buffer);

    return byteData.getInt32(0);
  }

消息发送

  //发送socket消息
  sendMsg(String msg) {
    try {
      Codec<String, String> stringToBase64 = utf8.fuse(base64);
      String base64Str = stringToBase64.encode(msg);

      //先告知消息长度-须转换为大端模式
      _socket?.add(int32BigEndianBytes(base64Str.length));

      //发送消息
      _socket?.write(base64Str);
      _socket?.flush();
    } catch (e) {
      debugPrint('========发送socket消息失败========$e');
    }
  }

消息类型

两端可自行约定socket消息类型

enum PackTypeEnum {
  packTypeUnknow,
  packTypeHeart,
  packTypeDisConnect,
}

心跳与重连

心跳可由服务端发起每隔30秒发送一次心跳消息,客户端收到后进行回应并将本地心跳变量重置。

        heartBeat = 0;
        String sendStr = jsonEncode({
          'packtype': PackTypeEnum.packTypeHeart.index,
          'data': 'pang',
        });
        debugPrint('===========接收到心跳=====');
        await sendMsg(sendStr);

本地定时任务进行检测,如果某一时间段内没有收到心跳,可能表示已经断开连接,尝试重新连接

  startHeartBeat() {
    _timer ??= Timer.periodic(const Duration(seconds: 1), (time) {
      heartBeat++;
      if (heartBeat > 40) {
        heartBeat = 0;
        //重连
        if (socketAddress != null && socketPort != null) {
          connectByAddress(socketAddress!, socketPort!);
        }
      }
    });
  }

文件切片传输

一些涉及业务的代码已经删除,核心就是拿到文件句柄,循环读取固定长度然后进行发送,所有片段发送完成后关闭

  //切片传输文件
  fileSlice(DocumentModel docModel, int docIndex, int docCount,
      int uploadTotal) async {
    debugPrint('=============发送文档===当前第-$fileIndex---共=$uploadTotal');
    try {
      for (DocumentFileModel docFile in docModel.fileList!) {
        String docFilePath = docFile.localFilePath!;

        File file = File(docFilePath);

        var handle = await file.open();
        var current = 0;
        var size = file.lengthSync();
        var chunkSize = 4096;
        int chunkIndex = 0;
        while (current < size) {
          var len = size - current >= chunkSize ? chunkSize : size - current;
          var section = handle.readSync(len); 
          current = current + len;
          // 处理数据块
          Map sendMap = {};
          chunkIndex += 1;
          String jsonStr = jsonEncode(sendMap);

          Codec<String, String> stringToBase64 = utf8.fuse(base64);
          String base64Str = stringToBase64.encode(jsonStr);

          _socket?.add(int32BigEndianBytes(base64Str.length));
          _socket?.write(base64Str);
          // 立即发送并清空缓冲区
          await _socket?.flush();
        }

        await handle.close();
        fileIndex += 1;
        EasyLoading.showProgress(
            (fileIndex / uploadTotal) > 1 ? 1 : fileIndex / uploadTotal,
            status: '已发送%s个'.trArgs(['$fileIndex/$uploadTotal']));
        if (fileIndex >= uploadTotal) {
          fileIndex = 0;
          EasyLoading.showSuccess('成功'.tr);
        }
      }
    } catch (e) {
      debugPrint('================${e.toString()}');
    }
  }
上一篇 下一篇

猜你喜欢

热点阅读