Flutter的StreamController 探究
一、StreamController 介绍
-
简介
StreamController 是具有控制流的控制器,它允许在其流上发送数据、错误和完成事件;可以检查流是否暂停、是否有订阅者,以及在其中任何发生改变时获取到回调。 -
定义
StreamController的定义代码如下:abstract class StreamController<T> implements StreamSink<T> { ... }
从上面代码知 StreamController 是个抽象类,实践类是 StreamSink。MultiStreamController 和 SynchronousStreamController 是它的实现类。
二、StreamController 的工厂初始化方法
-
工厂方法一
方法定义代码:
factory StreamController( {void onListen()?, void onPause()?, void onResume()?, FutureOr<void> onCancel()?, bool sync = false}) { return sync ? _SyncStreamController<T>(onListen, onPause, onResume, onCancel) : _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); }
方法参数介绍:
-
onListen
onListen 如果被实现,则当StreamController有数据事件发送时被回调。 -
onPause
onPause 如果被实现,则当StreamController的事件源的订阅者被暂停时被回调。 -
onResume
onResume 如果被实现,则当StreamController的事件源的订阅者被暂停时,然后又恢复时被调用。 -
onCancel
onCancel 如果被实现,则当StreamController的事件源的订阅者被取消时被回调。 -
sync
sync 默认 false, 是控制创建的控制器是同步的还是异步的。false 异步,true 同步。
-
-
工厂方法二
方法定义代码:
factory StreamController.broadcast( {void onListen()?, void onCancel()?, bool sync = false}) { return sync ? _SyncBroadcastStreamController<T>(onListen, onCancel) : _AsyncBroadcastStreamController<T>(onListen, onCancel); }
方法参数介绍:
-
onListen
onListen 如果被实现,则当StreamController有数据事件发送时被回调。 -
onCancel
onCancel 如果被实现,则当StreamController的事件源的订阅者被取消时被回调。 -
sync
sync 默认 false, 是控制创建的控制器是同步的还是异步的。false 异步,true 同步。
-
三、StreamController 的属性
-
stream -> Stream<T>
stream 是 StreamController 的控制是事件源。
实例代码:void testStream() { var streamController = StreamController(); streamController.stream.listen( (event) { print("StreamController -- stream -- listen -- $event"); }, onDone: () { print("StreamController -- stream -- onDone"); }, ); streamController.add("发送事件"); streamController.close(); }
日志输出:
flutter: StreamController -- stream -- listen -- 发送事件 flutter: StreamController -- stream -- onDone
注意: onDone 的调用时 streamController 调用 close 方法。
-
done -> Future<dynamic>
done 是StreamController的事件订阅对象调用
cancle
或者StreamController调用close
时调用。
实例代码:void testStreamDone() { var streamController = StreamController(); streamController.stream.listen( (event) { print("StreamController -- stream -- listen -- $event"); }, onDone: () { print("StreamController -- stream -- onDone"); }, ); streamController.done.then((value) { print("StreamController -- done -- $value"); }); streamController.add("发送事件"); streamController.close(); }
日志输出:
flutter: StreamController -- stream -- listen -- 发送事件 flutter: StreamController -- stream -- onDone flutter: StreamController -- done -- null
或者
void testStreamDone() { var streamController = StreamController(); var streamSubscription = streamController.stream.listen( (event) { print("StreamController -- stream -- listen -- $event"); }, onDone: () { print("StreamController -- stream -- onDone"); }, ); streamController.done.then((value) { print("StreamController -- done -- $value"); }); streamController.add("发送事件"); streamSubscription.cancel(); }
日志输出:
flutter: StreamController -- done -- null
-
isClosed -> bool
isClosed 判断控制器是否关闭。
实例代码:void testStreamisClosed() { var streamController = StreamController(sync: true); print(streamController.isClosed); // false streamController.close(); print(streamController.isClosed); // true }
-
isPaused -> bool
isPaused 是判断StreamController的订阅对象是否是暂停。
实例代码:void testStreamisPaused() { var streamController = StreamController(); print(streamController.isPaused); var su = streamController.stream.listen((event) {}); streamController.add(""); print(streamController.isPaused); su.pause(); su.cancel(); print(streamController.isPaused); streamController.close(); }
日志输出:
flutter: true flutter: false flutter: true
注意: StreamController 刚初始完成时, isPaused 的值 true; 然后 StreamController 监听后 isPaused 的值 false ,然后订阅对象调用
pause
方法后,isPaused 的值 true 。 -
hasListener -> bool
hasListener 是判断StreamController的stream是否被监听。
实例代码:void testStreamhasListener() { var streamController = StreamController(); print(streamController.hasListener); streamController.stream.listen((event) {}); print(streamController.hasListener); streamController.close(); }
日志输出:
flutter: false flutter: true
-
sink → StreamSink<T>
sink 是 StreamController 返回的 StreamSink的视图,sink 提供三个方法
add
、addError
、close
。
实例代码如下:void testStreamSink() { var streamController = StreamController(); streamController.stream.listen((event) { print("streamController.stream.listen -- $event"); }, onError: (e) { print("streamController.stream.onError -- $e"); }); streamController.sink.add("发送事件"); streamController.sink.addError("发生错误"); streamController.sink.close(); streamController.close(); }
日志输出:
flutter: streamController.stream.listen -- 发送事件 flutter: streamController.stream.onError -- 发生错误
sink 的方法
-
add(T event) → void
sink的事件发送方法。
-
addError(Object error, [StackTrace? stackTrace]) → void
sink的错误事件发送方法。
-
addStream(Stream<T> source, {bool? cancelOnError}) → Future
sink的其他事件源的发送方法。
-
close() → Future
sink的发送通道的关闭。
注释: StreamController 的方法和sink的方法相同。
-
四、StreamController 的方法
StreamController的主要方法有 add
、addError
、addStream
、close
等。它们的作用和 sink 的方法相同,具体查看 sink。
五、StreamController 实例
-
StreamController 与 StreamBuild 联合使用
class _MyHomePageState extends State<MyHomePage> { // 声明对象 StreamController _streamController; @override void initState() { super.initState(); // 初始化对象 _streamController = StreamController(); } @override void dispose() { // 对象的销毁 _streamController?.close(); _streamController = null; super.dispose(); } @override Widget build(BuildContext context) { return Scaffold( appBar: AppBar( title: Text("StreamController 测试"), ), body: Center( child: Column( mainAxisAlignment: MainAxisAlignment.center, children: <Widget>[ Text( 'You have pushed the button this many times:', ), StreamBuilder( stream: _streamController.stream, initialData: "", builder: (context, snapshot) { return Text(snapshot.data); }, ), ], ), ), floatingActionButton: FloatingActionButton( // 发出事件 onPressed: () => _streamController.add(DateTime.now().toString()), tooltip: 'Increment', child: Icon(Icons.add), ), ); } }
-
StreamController 的流的订阅对象的暂停、恢复、发送方法
class _SecondPageState extends State<SecondPage> { // 声明 StreamController _streamController; // 订阅者 StreamSubscription _streamSubscription; // 数据对象 String _string = ""; @override void initState() { super.initState(); _streamController = StreamController(); _streamSubscription = _streamController.stream.listen((event) { if (mounted) { setState(() { _string = event; }); } }); } @override void dispose() { // 销毁 _streamSubscription?.cancel(); _streamController?.close(); super.dispose(); } @override Widget build(BuildContext context) { return Scaffold( appBar: AppBar( title: Text("StreamController 的订阅暂停、恢复、发送测试"), ), body: Container( child: Column( children: [ Text(_string), Row( children: [ ElevatedButton( onPressed: () => _streamSubscription.pause(), child: Text("暂停")), ElevatedButton( onPressed: () => _streamSubscription.resume(), child: Text("恢复")), ElevatedButton( onPressed: () => _streamController.add(DateTime.now().toString()), child: Text("发送数据")), ], ) ], ), ), ); } }
注意: 在订阅对象暂停期间发送的数据事件,会在订阅对象恢复时一次发出。