【长链接】实现Flutter长链接通道

2023-08-14  本文已影响0人  24c41111e47b

背景: 团队中有同学实现长链接的时候,直接在主线程中进行发送消息和派发消息,导致消息延迟很严重
方案:应该在子线程中进行耗时操作


需求:
实现一个优雅的长链接功能,要求用系统自带的socket,要求有断线重连功能,要求有发送消息的函数,和注册监听消息的函数,要求发送消息的时候,先将消息放入一个队列中缓冲,然后有一个子线程事件循环,不断的检测发送队列是否有消息需要发送,有则发送,没有则休眠等待,直到消息队列有新的消息唤醒发送消息的子线程事件循环;要求收到消息后,将消息缓冲到一个派发队列进行缓冲,然后有一个派发消息的子线程事件循环,不断的检测派发队列是否有消息需要派发,有则取出消息,进行处理,处理完毕获取在主线程中获取注册的监听消息函数进行派发,没有消息则休眠,直到派发队列有消息开始唤醒派发线程。

Demo:
实现一个基于系统自带的 Socket 的优雅长连接功能,包括断线重连、发送消息、注册监听消息等功能,并通过队列和线程循环来实现异步消息的处理

import 'dart:io';
import 'dart:convert';
import 'dart:async';

void main() {
  final client = MySocketClient();
  client.start();
}

class MySocketClient {
  static const String host = 'your_server_host';
  static const int port = your_server_port;

  late Socket _socket;
  bool _isConnected = false;
  bool _isConnecting = false;
  bool _shouldReconnect = true;
  Queue<String> _sendQueue = Queue();
  Queue<String> _receiveQueue = Queue();

  MySocketClient() {
    _init();
  }

  Future<void> _init() async {
    await _connect();
    _listenForInput();
    _listenForOutput();
  }

  Future<void> _connect() async {
    if (_isConnected || _isConnecting) {
      return;
    }

    _isConnecting = true;

    try {
      _socket = await Socket.connect(host, port);
      _isConnected = true;
      _isConnecting = false;
      print('Connected to server');

      // Start the output loop
      _outputLoop();
    } catch (e) {
      print('Connection failed: $e');
      _isConnected = false;
      _isConnecting = false;

      if (_shouldReconnect) {
        await Future.delayed(Duration(seconds: 5));
        await _connect();
      }
    }
  }

  void _listenForInput() {
    _socket.listen(
      (List<int> data) {
        final message = utf8.decode(data);
        print('Received: $message');
        _receiveQueue.add(message);
      },
      onError: (error) {
        print('Error: $error');
        _socket.close();
        _isConnected = false;

        if (_shouldReconnect) {
          _connect();
        }
      },
      onDone: () {
        print('Connection closed by server');
        _socket.close();
        _isConnected = false;

        if (_shouldReconnect) {
          _connect();
        }
      },
    );
  }

  void _listenForOutput() {
    // Start the input loop in a new Isolate
    _startIsolate(_inputLoop);

    // Start the output loop in a new Isolate
    _startIsolate(_outputLoop);
  }

void _startIsolate(void Function() loopFunction) {
    Isolate.spawn(loopFunction, null);
  }

  void _inputLoop() {
    if (_isConnected) {
      if (_sendQueue.isNotEmpty) {
        final message = _sendQueue.removeFirst();
        _socket.write(message);
      }

      Future.delayed(Duration(milliseconds: 100), _inputLoop);
    } else {
      Future.delayed(Duration(seconds: 1), _inputLoop);
    }
  }

  void _outputLoop() {
    if (_isConnected) {
      if (_receiveQueue.isNotEmpty) {
        final message = _receiveQueue.removeFirst();
        // Process the received message here
        // Dispatch to registered listeners

        // Simulating a delay for processing
        Future.delayed(Duration(milliseconds: 500), () {
          print('Processed: $message');
        });
      }

      Future.delayed(Duration(milliseconds: 100), _outputLoop);
    } else {
      Future.delayed(Duration(seconds: 1), _outputLoop);
    }
  }

  void sendMessage(String message) {
    _sendQueue.add(message);
  }

  void stop() {
    _shouldReconnect = false;
    _socket.close();
  }

  void start() {
    _shouldReconnect = true;
    _connect();
  }
}

上一篇 下一篇

猜你喜欢

热点阅读