Flutter 学习之旅(四十六) Rxdart 学习
本来这张想继续说一下flutter_bloc 这个框架,这里面会涉及到 Rxdart 一些东西,如果不说一下没有连贯性,看起代码来特别牵强,由于我也是现学现卖,所以我所描述的Rxdart 的 api 基本都是flutter_bloc 所用到的,在介绍的同时 ,同时会说一下部分的源码,让大家看起来更直接
这里我使用的是 rxdart: ^0.24.1 这个版本的
我从网上看了很多关于 rxdart 的文章,里面多少都有提到 Observable 的说明,但是我在使用的时候找不到这个类,分别引入了原来的0.18.1的版本,还有新的0.24.1 这个版本,发现在新的版本当中其实是没有Observable 这个类的,那么所有的功能又回归到Stream 了,
但是在Rxdart中Subject 这个类相当于controller 的功能, 既包含 Stream 这个事件源 ,也包含Subscription 这个管理对象,
我对于rxdart理解其实就是对Stream的转换,那么既然是转换,使用Stream 和Subject 他们有什么区别呢
我们先来看一下Stream 的简单应用
List<int> array = [1, 2, 3, 4, 5, 6, 7];
Stream<int> stream=Stream.fromIterable(array).asBroadcastStream().map((event) => event*event);
stream.interval(Duration(seconds: 1)).listen((event) {
printString('listen1:$event');
});
stream.listen((event) { printString('listen2:$event');});
打印结果
I/flutter (25410): tian.shm =listen2:1
I/flutter (25410): tian.shm =listen2:4
I/flutter (25410): tian.shm =listen2:9
I/flutter (25410): tian.shm =listen2:16
I/flutter (25410): tian.shm =listen2:25
I/flutter (25410): tian.shm =listen2:36
I/flutter (25410): tian.shm =listen2:49
I/flutter (25410): tian.shm =listen1:1
I/flutter (25410): tian.shm =listen1:4
I/flutter (25410): tian.shm =listen1:9
I/flutter (25410): tian.shm =listen1:16
I/flutter (25410): tian.shm =listen1:25
I/flutter (25410): tian.shm =listen1:36
I/flutter (25410): tian.shm =listen1:49
很简单的应用,创建一个Stream,并将它置位广播, 再对数据做一下简单的操作, 第一个listen 是 每一秒打印一个日志,第二个listen 则是直接打印日志,
那么我们介绍这个方法有什么意义吗,
这里主要是为了说明Stream的一些静态方法,就从这个Stream.fromIterable 来说,他是同步还是异步? 既然可以监听,那么我可以在后续添加数据吗,还有是否可以多次监听,这个在上面已经体现了,既然是Stream ,那么只有BroadCastStream 可以被多次监听的,所以如果想要多次监听,所有在添加数据后必须使用asBroadcastStream()后才可以
那么剩下的就来先看一下Stream.fromIterable 这个方法他到底是同步还是异步的,
factory Stream.fromIterable(Iterable<T> elements) {
return new _GeneratedStreamImpl<T>(
() => new _IterablePendingEvents<T>(elements));
}
class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
final _EventGenerator<T> _pending;
bool _isUsed = false;
_GeneratedStreamImpl(this._pending);
StreamSubscription<T> _createSubscription(void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError) {
if (_isUsed) throw new StateError("Stream has already been listened to.");
_isUsed = true;
return new _BufferingStreamSubscription<T>(
onData, onError, onDone, cancelOnError)
.._setPendingEvents(_pending());
}
}
看到这个_BufferingStreamSubscription类大家应该很不陌生, 他维护了一下消息队列,并提供了一个相当于沙盒的运行环境, 对于是否是异步则看这个消息是怎么处理,那么只要看一下_IterablePendingEvents这个类是否维护了 Microtask ,下面我们来看一下源码,
void handleNext(_EventDispatch<T> dispatch) {
var iterator = _iterator;
if (iterator == null) {
throw new StateError("No events pending.");
}
bool movedNext = false;
try {
if (iterator.moveNext()) {
movedNext = true;
dispatch._sendData(iterator.current);
} else {
_iterator = null;
dispatch._sendDone();
}
} catch (e, s) {
if (!movedNext) {
}
dispatch._sendError(e, s);
}
}
我们看到_IterablePendingEvents这个类中只是维护了一下这个队列,并么有异步的过程,也就是说是否是异步需要只要大家看一下实现这个消息传递的event 就可以了
在学习Stream 的过程中,我们知道添加数据的过程中使用的是Sink调用controller 来添加数据,最后使用的是_BufferingStreamSubscription来管理这个队列的,而我们得到的只是Stream,并没有添加数据的功能,所以这就是直接使用Stream 相对于Subject 的缺陷,
PublishSubject
PublishSubject<int> publishSubject=PublishSubject<int>();
publishSubject.add(1);
publishSubject.add(2);
publishSubject.add(3);
publishSubject.interval(Duration(seconds: 1)).listen((value) {
printString('listen1:$value');
});
publishSubject.add(4);
publishSubject.listen((value) {
printString('listen2:$value');
});
publishSubject.add(5);
publishSubject.add(6);
publishSubject.add(7);
结果
I/flutter (27171): tian.shm =listen2:5
I/flutter (27171): tian.shm =listen2:6
I/flutter (27171): tian.shm =listen2:7
I/flutter (27171): tian.shm =listen1:4
I/flutter (27171): tian.shm =listen1:5
I/flutter (27171): tian.shm =listen1:6
I/flutter (27171): tian.shm =listen1:7
从这里可以看出来 先添加进去的数据如果没有监听那么这个消息就被消费掉了,至于为什么,先来说一下PublishSubject他的controller,
class PublishSubject<T> extends Subject<T> {
PublishSubject._(StreamController<T> controller, Stream<T> stream)
: super(controller, stream);
factory PublishSubject(
{void Function() onListen, void Function() onCancel, bool sync = false}) {
final controller = StreamController<T>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
return PublishSubject<T>._(
controller,
controller.stream,
);
}
}
使用的是_AsyncBroadcastStreamController,默认是异步的, 记得在说BroadcastStreamController 的时候说过,每次调用listen 的时候则生成一个subscription,消息转发的时候遍历subscription
void _sendData(T data) {
for (var subscription = _firstSubscription;
subscription != null;
subscription = subscription._next) {
subscription._addPending(new _DelayedData<T>(data));
}
}
从这里可以看到,如果没有添加监听,这个消息就浪费了,
BehaviorSubject
同样使用上面的代码,将PublishSubject 变成BehaviorSubject ,
BehaviorSubject<int> publishSubject=BehaviorSubject<int>();
publishSubject.add(1);
publishSubject.add(2);
publishSubject.add(3);
publishSubject.interval(Duration(seconds: 1)).listen((value) {
printString('listen1:$value');
});
publishSubject.add(4);
publishSubject.listen((value) {
printString('listen2:$value');
});
publishSubject.add(5);
publishSubject.add(6);
publishSubject.add(7);
打印结果,
I/flutter (27171): tian.shm =listen2:4
I/flutter (27171): tian.shm =listen2:5
I/flutter (27171): tian.shm =listen2:6
I/flutter (27171): tian.shm =listen2:7
I/flutter (27171): tian.shm =listen1:3
I/flutter (27171): tian.shm =listen1:4
I/flutter (27171): tian.shm =listen1:5
I/flutter (27171): tian.shm =listen1:6
I/flutter (27171): tian.shm =listen1:7
相对于使用 PublishSubject ,打印结果都多打印了添加监听之前的一个数据,至于为什么,我们来看一下,
factory BehaviorSubject({
void Function() onListen,
void Function() onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController<T>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
final wrapper = _Wrapper<T>();
return BehaviorSubject<T>._(
controller,
Rx.defer<T>(_deferStream(wrapper, controller, sync), reusable: true),
wrapper);
}
在创建BehaviorSubject的时候,同时初始化了一个_Wrapper这个对象,而这个对象是什么呢,
class _Wrapper<T> {
T latestValue;
Object latestError;
StackTrace latestStackTrace;
bool latestIsValue = false, latestIsError = false;
_Wrapper();
_Wrapper.seeded(this.latestValue) : latestIsValue = true;
void setValue(T event) {
latestIsValue = true;
latestIsError = false;
latestValue = event;
latestError = null;
latestStackTrace = null;
}
void setError(Object error, [StackTrace stackTrace]) {
latestIsValue = false;
latestIsError = true;
latestValue = null;
latestError = error;
latestStackTrace = stackTrace;
}
}
这个_Wrapper保存了一些操作的数据,那么什么时候对这个数据做操作呢,
class BehaviorSubject<T> extends Subject<T> implements ValueStream<T> {
...
@override
void onAdd(T event) => _wrapper.setValue(event);
...
}
每次添加数据的时候,保存添加的数据,也就是说,多打印的这个数据就是这个_Wrapper保存的数据,
既然数据保存了,而Stream 对于数据时不可以操作的,那么如何将保存的数据添加到原有监听呢,这里就归功于BehaviorSubject他的Stream的延迟创建的过程Rx.defer<T>(_deferStream(wrapper, controller, sync), reusable: true),每次添加监听后重新创建这个Stream,将这个缓存的数据重新添加入Stream 中,至于是怎么实现的,来看一下源码,
class DeferStream<T> extends Stream<T> {
final Stream<T> Function() _factory;
final bool _isReusable;
@override
bool get isBroadcast => _isReusable;
/// Constructs a [Stream] lazily, at the moment of subscription, using
/// the [streamFactory]
DeferStream(Stream<T> Function() streamFactory, {bool reusable = false})
: _isReusable = reusable,
_factory = reusable
? streamFactory
: (() {
Stream<T> stream;
return () => stream ??= streamFactory();
}());
@override
StreamSubscription<T> listen(void Function(T event) onData,
{Function onError, void Function() onDone, bool cancelOnError}) =>
_factory().listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
入参就是是传入一个构造stream 的方法,每次监听的时候则调用该方法重新创建Stream ,这样就可以将默认_Wrapper缓存的数据添加到Stream,从而实现每次缓存一个数据
ReplaySubject
看过了BehaviorSubject,后再看ReplaySubject后就比较简单了,就是将_Wrapper 变成了一个队列,这个队列可以设置数据个数的上限,再添加监听后,如果超过上限则删除第一个后再将新的数据添加入这个队列中,
添加数据的方法,
@override
void onAdd(T event) {
if (_queue.length == _maxSize) {
_queue.removeFirst();
}
_queue.add(_Event(false, event: event));
}
Stream重新创建的方法,根据队列重新创建Stream
Rx.defer<T>(
() => queue.toList(growable: false).reversed.fold(controller.stream,
(stream, event) {
if (event.isError) {
return stream.transform(StartWithErrorStreamTransformer(
event.errorAndStackTrace.error,
event.errorAndStackTrace.stackTrace,
sync));
} else {
return stream
.transform(StartWithStreamTransformer(event.event, sync: sync));
}
}),
reusable: true,
)
我们再来一个简单的例子
ReplaySubject<int> publishSubject=ReplaySubject<int>();
publishSubject.add(1);
publishSubject.add(2);
publishSubject.add(3);
publishSubject.interval(Duration(seconds: 1)).listen((value) {
printString('listen1:$value');
});
publishSubject.add(4);
publishSubject.listen((value) {
printString('listen2:$value');
});
publishSubject.add(5);
publishSubject.add(6);
publishSubject.add(7);
打印结果,
I/flutter (27171): tian.shm =listen2:1
I/flutter (27171): tian.shm =listen2:2
I/flutter (27171): tian.shm =listen2:3
I/flutter (27171): tian.shm =listen2:4
I/flutter (27171): tian.shm =listen2:5
Reloaded 4 of 966 libraries in 726ms.
I/flutter (27171): tian.shm =listen2:6
I/flutter (27171): tian.shm =listen2:7
I/flutter (27171): tian.shm =listen1:1
I/flutter (27171): tian.shm =listen1:2
I/flutter (27171): tian.shm =listen1:3
I/flutter (27171): tian.shm =listen1:4
I/flutter (27171): tian.shm =listen1:5
I/flutter (27171): tian.shm =listen1:6
I/flutter (27171): tian.shm =listen1:7
如果给ReplaySubject 设置一个最大值,效果大家肯定能想象的到,这里就不多做介绍了,
关于rxdart的操作符,如果像我这么讲起来,感觉再写个几十篇都不行,因为每一个操作符都有他的意义,就不多做介绍了
我学习flutter的整个过程都记录在里面了
https://www.jianshu.com/c/36554cb4c804
最后附上demo 地址