Getx 之obs Obx刷新
当我们给观察者对象进行.obs后
var a = 0.obs
我们看下会发生什么?
a就被RxInt包裹,当Rx值变更是,所有使用a的widget都会更改
extension IntExtension on int {
/// Returns a `RxInt` with [this] `int` as initial value.
RxInt get obs => RxInt(this);
}
Rx<T>继承_RxImpl
class Rx<T> extends _RxImpl<T> {
Rx(T initial) : super(initial);
@override
dynamic toJson() {
try {
return (value as dynamic)?.toJson();
} on Exception catch (_) {
throw '$T has not method [toJson]';
}
}
}
接着看
/// Base Rx class that manages all the stream logic for any Type.
abstract class _RxImpl<T> extends RxNotifier<T> with RxObjectMixin<T> {
_RxImpl(T initial) {
_value = initial;
}
void addError(Object error, [StackTrace? stackTrace]) {
subject.addError(error, stackTrace);
}
Stream<R> map<R>(R Function(T? data) mapper) => stream.map(mapper);
void update(void Function(T? val) fn) {
fn(_value);
subject.add(_value);
}
void trigger(T v) {
var firstRebuild = this.firstRebuild;
value = v;
if (!firstRebuild && !sentToStream) {
subject.add(v);
}
}
}
mixin RxObjectMixin<T> on NotifyManager<T> {
late T _value;
void refresh() {
subject.add(value);
}
T call([T? v]) {
if (v != null) {
value = v;
}
return value;
}
bool firstRebuild = true;
bool sentToStream = false;
String get string => value.toString();
@override
String toString() => value.toString();
dynamic toJson() => value;
@override
int get hashCode => _value.hashCode;
set value(T val) {
if (subject.isClosed) return;
sentToStream = false;
if (_value == val && !firstRebuild) return;
firstRebuild = false;
_value = val;
sentToStream = true;
subject.add(_value);
}
T get value {
RxInterface.proxy?.addListener(subject);
return _value;
}
Stream<T> get stream => subject.stream;
StreamSubscription<T> listenAndPump(void Function(T event) onData,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
final subscription = listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);
subject.add(value);
return subscription;
}
}
class GetStream<T> {
List<LightSubscription<T>>? _onData = <LightSubscription<T>>[];
FutureOr<bool?> removeSubscription(LightSubscription<T> subs) async {
if (!_isBusy!) {
return _onData!.remove(subs);
} else {
await Future.delayed(Duration.zero);
return _onData?.remove(subs);
}
}
FutureOr<void> addSubscription(LightSubscription<T> subs) async {
if (!_isBusy!) {
return _onData!.add(subs);
} else {
await Future.delayed(Duration.zero);
return _onData!.add(subs);
}
}
void _notifyData(T data) {
_isBusy = true;
for (final item in _onData!) {
if (!item.isPaused) {
item._data?.call(data);
}
}
_isBusy = false;
}
void add(T event) {
assert(!isClosed, 'You cannot add event to closed Stream');
_value = event;
_notifyData(event);
}
LightSubscription<T> listen(void Function(T event) onData,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
final subs = LightSubscription<T>(
removeSubscription,
onPause: onPause,
onResume: onResume,
onCancel: onCancel,
)
..onData(onData)
..onError(onError)
..onDone(onDone)
..cancelOnError = cancelOnError;
addSubscription(subs);
onListen?.call();
return subs;
}
}
总结:
当我们给观察者对象添加.obs后,被封装成了Rx<T>,RX类继承_RxImpl,_RxImpl混入了RxObjectMixin,当我们取值的时候(a.value)会调用get方法,get方法内部会添加对该变量的订阅(RxInterface.proxy?.addListen(subject)),当赋值的时候(a.value=xx)会调用set方法,如果该被订阅对象已经失效或者被订阅对象的值没有发生变化并且已经构建过了则不会刷新,否则会赋值新值,并且调用订阅者的add方法,然后调用GetStream类的_notifyData方法,遍历onData(onData是我们在Obx的时候订阅的集合,Obx继承ObxWidget继承StatefulWidget,initState方法里面绑定观察者(final _observer = RxNotifier();)和订阅者(late StreamSubscription subs;),RXNotifier混入了NotifyManager,NotifyManager里的(GetStream<T> subject = GetStream<T>();)里面有listen方法,调用subject.listen(),会调用addSubscription(subs),然后 _onData!.add(subs);添加到_onData集合),调用call方法通知RxNotifier。当RxNotifier接受到通知后会通过listen回调调用_updateTree,这里面调用的是setState({})更新UI;
注释:Obx原理
,obx继承自ObxWidget,ObxWidget在initstate方法中绑定了观察者订阅关系。build方法中调用RxInterface.notifyChildren 把_observer作为RxInterface.proxy 的临时属性,调用builder的后恢复原有的属性, 注意builder(controller)函数里一定要包含obs.value,否则在if (!observer.canUpdate) 检测时,由于没有观察对象,会抛出提示异常。
class Obx extends ObxWidget {
final WidgetCallback builder;
const Obx(this.builder, {Key? key}) : super(key: key);
@override
Widget build() => builder();
}
abstract class ObxWidget extends StatefulWidget {
const ObxWidget({Key? key}) : super(key: key);
@override
void debugFillProperties(DiagnosticPropertiesBuilder properties) {
super.debugFillProperties(properties);
properties.add(ObjectFlagProperty<Function>.has('builder', build));
}
@override
ObxState createState() => ObxState();
@protected
Widget build();
}
class ObxState extends State<ObxWidget> {
final _observer = RxNotifier();
late StreamSubscription subs;
@override
void initState() {
super.initState();
subs = _observer.listen(_updateTree, cancelOnError: false);
}
void _updateTree(_) {
if (mounted) {
setState(() {});
}
}
@override
void dispose() {
subs.cancel();
_observer.close();
super.dispose();
}
@override
Widget build(BuildContext context) =>
RxInterface.notifyChildren(_observer, widget.build);
}
class RxNotifier<T> = RxInterface<T> with NotifyManager<T>;
mixin NotifyManager<T> {
GetStream<T> subject = GetStream<T>();
final _subscriptions = <GetStream, List<StreamSubscription>>{};
bool get canUpdate => _subscriptions.isNotEmpty;
/// This is an internal method.
/// Subscribe to changes on the inner stream.
void addListener(GetStream<T> rxGetx) {
if (!_subscriptions.containsKey(rxGetx)) {
final subs = rxGetx.listen((data) {
if (!subject.isClosed) subject.add(data);
});
final listSubscriptions =
_subscriptions[rxGetx] ??= <StreamSubscription>[];
listSubscriptions.add(subs);
}
}
StreamSubscription<T> listen(
void Function(T) onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) =>
subject.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError ?? false,
);
/// Closes the subscriptions for this Rx, releasing the resources.
void close() {
_subscriptions.forEach((getStream, subscriptions) {
for (final subscription in subscriptions) {
subscription.cancel();
}
});
_subscriptions.clear();
subject.close();
}
}
abstract class RxInterface<T> {
static RxInterface? proxy;
bool get canUpdate;
StreamSubscription<T> listen(void Function(T event) onData,
{Function? onError, void Function()? onDone, bool? cancelOnError});
static T notifyChildren<T>(RxNotifier observer, ValueGetter<T> builder) {
final oldObserver = RxInterface.proxy;
RxInterface.proxy = observer;
final result = builder();
if (!observer.canUpdate) {
RxInterface.proxy = oldObserver;
throw """
[Get] the improper use of a GetX has been detected.
You should only use GetX or Obx for the specific widget that will be updated.
If you are seeing this error, you probably did not insert any observable variables into GetX/Obx
or insert them outside the scope that GetX considers suitable for an update
(example: GetX => HeavyWidget => variableObservable).
If you need to update a parent widget and a child widget, wrap each one in an Obx/GetX.
""";
}
RxInterface.proxy = oldObserver;
return result;
}
}
为什么要有这一步 RxInterface.proxy = observer;
T get value {
RxInterface.proxy?.addListener(subject);
return _value;
}
Rx 最终是混入了 RxObjectMixin 类,即在 Rx 的获取数据中调用了 RxInterface.proxy?.addListener ,那什么时候获取 Rx 的数据呢?
Obx(() {
return Text("${a.value}");
});
就是在 Obx 的 builder 方法里,这就清楚了为什么在 RxInterface.notifyChildren 方法里是先将传入的 observer 赋值给 proxy 然后再调用 builder 方法了,因为这样在调用 builder 方法时调用了 Rx.value ,而在 get value 中调用了 RxInterface.proxy?.addListener ,且 addListener 传入的 subject 是 Rx 的 GetStream, 而 proxy 是 _ObxState 里创建的 RxNotifier。