Flutter 之 Stream的生成

2020-06-17  本文已影响0人  SnoopPanda
Stream的生成

1、从零开始创建Stream
创建一个Stream可以通过异步生成器(async*)函数。当异步生成器函数被调用时会创建一个 Stream,而函数体则会在该 Stream 被监听时开始运行。当函数返回时,Stream 关闭。在函数返回前,你可以使用 yield 或 yield 语句向该 Stream 提交事件。
下面是一个周期性发送整数的函数例子:

void main() {
  var duration = Duration(seconds: 3);
  var stream = timedCounter(duration, 10);
  stream.listen((event) {
    print(event);
  });
}

Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
  var i = 0;
  while(true) {
    await Future.delayed(interval);
    yield i++;
    if(i == maxCount) break;
  }
}

2、转换现有的Stream
我们在创建 Stream 时常见的情形是根据现有 Stream 的事件创建一个新的 Stream。比如你已经有了一个可以提供字节事件的 Stream,然后你想将该 Stream 变为一个可以提供字符串的 Stream,并且该 Stream 中的字符串还经过 UTF-8 编码。对于这种情况,常用的办法是创建一个新的 Stream 去等待获取原 Stream 的事件,然后再将新 Stream 中的事件输出。例如:

/// 将连续的字符串 Stream 拆分为行。
///
/// 输入的字符串来自于"源" Stream 并以较小的 chunk 块提供。
Stream<String> lines(Stream<String> source) async* {
  // 存储从上一个数据块中分离出的字符串行。
  var partial = '';
  // 等到新的数据块可用时开始处理。
  await for (var chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0]; // 追加拼接行。
    partial = lines.removeLast(); // 删除剩余不完整的行。
    for (var line in lines) {
      yield line; // 将分离的每个字符串行添加至输出 Stream。
    }
  }
  // 最后如果最终的字符串行不为空则将其添加至输出流。
  if (partial.isNotEmpty) yield partial;
}

3、使用 StreamController
如果你 Stream 的事件不仅来自于异步函数可以遍历的 Stream 和 Future,还来自于你程序的不同部分,这种情况使用上述两种方式生成 Stream 就显得比较困难。面对这种情况,我们可以使用一个 StreamController 来创建和填充 Stream。
StreamController 可以为你生成一个 Stream,并提供在任何时候、任何地方将事件添加到该 Stream 的方法。该 Stream 具有处理监听器和暂停所需的所有逻辑。控制器对象你可以自行处理而只需返回调用者所需的 Stream 即可。

void main() {
  var duration = Duration(seconds: 3);
  var stream = timedCounter(duration, 10);
  stream.listen((event) {
    print(event);
  });
}

Stream<int> timedCounter(Duration interval, [int maxCount]) {
  var controller = StreamController<int>();
  var counter = 0;
  Timer timer;
  void tick(Timer timer) {
    counter++;
    controller.add(counter); // 请求 Stream 将计数器值作为事件发送。
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // 请求 Stream 关闭并告知监听器。
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    if(timer != null) {
      timer.cancel();
      timer = null;
    }
  }

  controller = StreamController<int> (
    onListen: startTimer,
    onPause: stopTimer,
    onResume: startTimer,
    onCancel: stopTimer,
  );

  return controller.stream;
}

不通过async*函数创建Stream时,请务必牢记以下几点:

上一篇 下一篇

猜你喜欢

热点阅读