Flutter 学习之旅(四十二) Flutter 状态 Str
BroadcastStream
在上一篇我们发现 listen 这个方法只能被添加一次, 即 _varData 这个参数只有一个,是不是只要把他变成PendingData形式的队列, 每次回调的时候遍历这个队列就可以通知所有callback 回调呢,
在普通的StreamController 里判断了如果 Stream 是 _isInitialState 的,也就是订阅过的,就直接报错 "Stream has already been listened to." ,只有未订阅的才创建 StreamSubscription 。
而在BroadcastStreamController 中去掉了这个方法变成了close 判断
StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError,
void onDone()?, bool cancelOnError) {
if (isClosed) {
return new _DoneStreamSubscription<T>(onDone);
}
var subscription = new _BroadcastSubscription<T>(
this, onData, onError, onDone, cancelOnError);
_addListener(subscription);
if (identical(_firstSubscription, _lastSubscription)) {
// Only one listener, so it must be the first listener.
_runGuarded(onListen);
}
return subscription;
}
////_sendData 方法 遍历了每个subscription,并添加到任务队列
void _sendData(T data) {
if (_isEmpty) return;
if (_hasOneListener) {
_state |= _BroadcastStreamController._STATE_FIRING;
_BroadcastSubscription<T> firstSubscription =
_firstSubscription as dynamic;
firstSubscription._add(data);
_state &= ~_BroadcastStreamController._STATE_FIRING;
if (_isEmpty) {
_callOnCancel();
}
return;
}
_forEachListener((_BufferingStreamSubscription<T> subscription) {
subscription._add(data);
});
}
说到这里自己其实是有些质疑源码的,为什么我设置了两个监听创建了两个subscription,你就执行了subscription._add(data);两次,不能只添加一次而依次调用回调吗,带着这个疑问我后来我又仔细想了一下,是不是入参是可变的,不同的环境会造就不同的data,那么是不是可以将方法传入StreamSink 中呢,但是这也解释不了为什么添加多次,后来我仔细想了一下其实他们的目的都是为了维护一个在Microtask 中的队列,如果向我所描述的做法还需要增加一个callback队列,并将BroadCastStreamController 的subscription 由源码中的队列改为一个单利,两相抵消暂时这么做在性能上是没有差别的.
在上面我们说StreamSink 可以添加耗时的方法,那么在StreamSink 发送普通的String int List 数据有什么意义吗,只不过是在异步中转了一下,其实普通的数据可以作为状态通知来用,但是感觉这不是他的精髓,他的精髓应该是在异步执行耗时任务,onlisten得到结果,但是实际情况是如果将一个StreamSink 方法add 后,在listen又得到这个方法,并没有什么意义,后来又从网上找到了这样一句话
因为 microtask 的优先级又高于 event ,所以如果 microtask 太多就可能会对触摸、绘制等外部事件造成阻塞卡顿哦。
也就是说如果在microtask 这个任务中执行耗时方法可能造成卡顿,到此研究耗时任务就放弃了,
Stream 转换
///添加监听的方法 ,调用_createSubscription
StreamSubscription<T> listen(void onData(T value)?,
{Function? onError, void onDone()?, bool? cancelOnError}) {
return _createSubscription(onData, onError, onDone, cancelOnError ?? false);
}
///创建 _ForwardingStreamSubscription
StreamSubscription<T> _createSubscription(void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError) {
return new _ForwardingStreamSubscription<S, T>(
this, onData, onError, onDone, cancelOnError);
}
/// 转换的过程中将自身的_handleData 加入回调队列,
_ForwardingStreamSubscription(this._stream, void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError)
: super(onData, onError, onDone, cancelOnError) {
_subscription = _stream._source
.listen(_handleData, onError: _handleError, onDone: _handleDone);
}
///Stream 转换 通常是继承自 _ForwardingStream 来实现的,
class _WhereStream<T> extends _ForwardingStream<T, T> {
final bool Function(T) _test;
_WhereStream(Stream<T> source, bool test(T value))
: _test = test,
super(source);
///回调后,又继续添加了,这里看出来的嵌套
void _handleData(T inputEvent, _EventSink<T> sink) {
bool satisfies;
try {
satisfies = _test(inputEvent);
} catch (e, s) {
_addErrorWithReplacement(sink, e, s);
return;
}
if (satisfies) {
sink._add(inputEvent);
}
}
}
他的做法是添加listen ,在listen 回调的时候 调用自身的StreamSink.add(),将自身的_handleData()添加到事件源,在执行回调后重新添加一次,所谓的转换只不过是嵌套了一层
所以事件变化的本质就是,变换都是对 Stream 的 listen 嵌套调用组成的。
昨天看了一些面试题,上面经常会问什么是Stream ,那么你心里的Stream 是什么呢,我来说一下我写完这两篇后我对Stream 的理解,
Stream 是流或者管道,提供了一种同步或异步以队列的方式处理微任务的消息的机制.
我学习flutter的整个过程都记录在里面了
https://www.jianshu.com/c/36554cb4c804
最后附上demo 地址