Dart中的Stream初步研究
2021-09-02 本文已影响0人
xmb
官方文档中文翻译:异步编程:使用 stream
1、stream
是什么
Stream
是一些列异步事件的序列。
2、Stream
的接收
① 使用异步for
循环
/// 使用异步for循环来接收所有的stream
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for(var value in stream) {
sum += value;
}
return sum;
}
② 使用stream
的listen
来监听
3、stream
的种类
① 单订阅stream
(Single-Subscription
)
只包含一个事件序列,事件需要按顺序提供,不能丢失。比如读取一个文件,接收一个网页。
② 广播stream
(Broadcast
)
针对单个消息的,一次处理一个消息。比如浏览器的鼠标事件。
可以在任何时候监听,可以添加多个监听,还可以随时取消监听。
4、处理stream
的方法
Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object? needle);
Future<E> drain<E>([E? futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function()? orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = '']);
Future<T> lastWhere(bool Function(T element) test, {T Function()? orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function()? orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();
5、修改stream
的方法
修改stream
会产生一个新的stream
,在原来stream
上添加的监听,会转到新的stream
上,如果新的stream
结束了,会转到原来的stream
上。
详细参考文章:Flutter Stream简介及部分操作符使用
// 将一个Stream转为元素都为R类型的Stream
Stream<R> cast<R>();
// 把Stream中的每一个元素,转换为一个序列sequence,比如元素1转为[1, 1]
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
// 按map的实现规则,转换Stream中的每一个元素成为一个新的Stream
Stream<S> map<S>(S Function(T event) convert);
// 跳过前面count个事件
Stream<T> skip(int count);
// 根据传入的条件规则,进行跳过
Stream<T> skipWhile(bool Function(T element) test);
// 指定只发送count个事件
Stream<T> take(int count);
// 只发送指定条件的事件
Stream<T> takeWhile(bool Function(T element) test);
// 用条件丢弃一些元素,创建一个新的Stream
Stream<T> where(bool Function(T event) test);
...
6、listen
方法
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});
7、stream
的创建
① 从其他stream
转换
以上提到的修改stream
的方法
② 使用异步生成器(async*
)生成stream
/// 异步生成器(async*) 生成 stream
/// 通过 yield 和 yield* 向stream提交事件
Stream<int> createAsyncStream() async* {
int num = 0;
while (true) {
// 间隔1秒钟
await Future.delayed(Duration(seconds: 1));
// 将num运算后的值放入stream
yield num++;
// 终止条件
if (num == 10) break;
}
}
③ 使用StreamController
来创建
var streamController = StreamController<int>(
onListen: () {},
onResume: () {},
onPause: () {},
onCancel: () {},
);
比如,event_bus
的实现,使用广播stream
EventBus({bool sync = false})
: _streamController = StreamController.broadcast(sync: sync);
8、实际应用场景
① event_bus 插件的使用
相关文章:Flutter EventBus 的使用和底层实现分析
- 初始化
EventBus
,会创建一个通过广播方式初始化的StreamController
。- 订阅监听,
on
方法返回的是一个stream
,stream
的listen
方法传入的对监听到事件的处理方法。fire
方法实现,是通过往StreamController
中添加自定义的事件。
② Bloc插件的使用
③ StreamBuilder
组件
9、相关的语法关键字(await、async、sync、async、yield、yield*)
① await
用于等待异步方法返回数据
② async
用于异步方法
③ sync*
多元素同步函数生成器,返回Iterable<T>
④ async*
多元素异步函数生成器,返回Stream<T>
⑤ yield
发送的是一个元素
⑥ yield*
操作的是一个Iterable
或Stream
具体使用例子,如下:
import 'dart:async';
import 'package:flutter/material.dart';
void main() {
runApp(MyApp());
}
class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
theme: ThemeData(
primarySwatch: Colors.blue,
),
home: StreamPage(),
);
}
}
class StreamPage extends StatefulWidget {
const StreamPage({Key? key}) : super(key: key);
@override
_StreamPageState createState() => _StreamPageState();
}
class _StreamPageState extends State<StreamPage> {
late Stream<String> _stream;
@override
void initState() {
super.initState();
_stream = fetchEmojiStream(10);
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(),
body: Column(
children: [
Column(
children: [
MaterialButton(
onPressed: test1,
child: Text('测试1'),
),
MaterialButton(
onPressed: test2,
child: Text('测试2'),
),
MaterialButton(
onPressed: test3,
child: Text('测试3'),
),
MaterialButton(
onPressed: test4,
child: Text('测试4'),
),
MaterialButton(
onPressed: test5,
child: Text('测试5'),
),
],
),
StreamBuilder(
stream: _stream,
builder: _builder,
),
],
),
);
}
Widget _builder(BuildContext context, AsyncSnapshot snapshot) {
switch (snapshot.connectionState) {
case ConnectionState.none:
break;
case ConnectionState.waiting:
return CircularProgressIndicator();
case ConnectionState.active:
return Text(snapshot.requireData);
case ConnectionState.done:
return Text(snapshot.requireData);
}
return Container();
}
void test1() async {
getEmoji(10).forEach(print);
/*
flutter: 👿
flutter: 💀
flutter: 💁
flutter: 💂
flutter: 💃
flutter: 💄
flutter: 💅
flutter: 💆
flutter: 💇
flutter: 💈
*/
}
void test2() {
getEmojiWithTime(10).forEach(print);
/*
flutter: 👿-2021-09-02T16:41:24.557020
flutter: 💀-2021-09-02T16:41:24.558755
flutter: 💁-2021-09-02T16:41:24.559007
flutter: 💂-2021-09-02T16:41:24.559160
flutter: 💃-2021-09-02T16:41:24.559338
flutter: 💄-2021-09-02T16:41:24.559515
flutter: 💅-2021-09-02T16:41:24.559663
flutter: 💆-2021-09-02T16:41:24.559846
flutter: 💇-2021-09-02T16:41:24.560122
flutter: 💈-2021-09-02T16:41:24.560372
*/
}
void test3() {
fetchEmoji(0).then(print);
/*
flutter: 👿
*/
}
void test4() {
fetchEmojiStream(10).listen(print);
/* 每个一秒生成一个
flutter: 👿
flutter: 💀
flutter: 💁
flutter: 💂
flutter: 💃
flutter: 💄
flutter: 💅
flutter: 💆
flutter: 💇
flutter: 💈
*/
}
void test5() {
fetchEmojiWithTime(10).forEach(print);
/*
flutter: 👿-2021-09-02T17:15:21.591821
flutter: 💀-2021-09-02T17:15:22.595980
flutter: 💁-2021-09-02T17:15:23.601100
flutter: 💂-2021-09-02T17:15:24.608355
flutter: 💃-2021-09-02T17:15:25.610364
flutter: 💄-2021-09-02T17:15:26.612006
flutter: 💅-2021-09-02T17:15:27.619303
flutter: 💆-2021-09-02T17:15:28.623427
flutter: 💇-2021-09-02T17:15:29.626712
flutter: 💈-2021-09-02T17:15:30.629052
*/
}
/// 多元素同步
/// 多元素同步函数生成器(sync*),返回Iterable
/// yield 发送一个元素
Iterable<String> getEmoji(int count) sync* {
Runes first = Runes('\u{1f47f}');
for (int i = 0; i < count; i++) {
yield String.fromCharCodes(first.map((e) => e + i));
}
}
/// 多元素同步
/// 多元素同步生成器(sync*),返回Iterable
/// yield* 操作的是一个Iterable
Iterable<String> getEmojiWithTime(int count) sync* {
yield* getEmoji(10).map((e) => '$e-${DateTime.now().toIso8601String()}');
}
/// 单元素异步
/// await async
Future<String> fetchEmoji(int count) async {
await Future.delayed(Duration(milliseconds: 1000));
return String.fromCharCodes(Runes('\u{1f47f}').map((e) => e + count));
}
/// 多元素异步
/// 多元素异步生成器(async*),返回Stream
/// yield操作的是一个元素
Stream<String> fetchEmojiStream(int count) async* {
for (int i = 0; i < count; i++) {
yield await fetchEmoji(i);
}
}
/// 多元素异步
/// 多元素异步生成器(async*),返回Stream
/// yield*操作的是一个Stream
Stream<String> fetchEmojiWithTime(int count) async* {
yield* fetchEmojiStream(count).map((event) => '$event-${DateTime.now().toIso8601String()}');
}
}