RxSwift 中的循环引用
内存管理总是开发过程中难以绕开的问题, 在使用 RxSwift 的过程中, 避免不了写各种 .disposed(by: disposeBag)
来管理内存的释放时机. 那么究竟管理的是哪些对象的释放, 不写又会造成什么问题呢?
在探究这个问题之前, 我们先按照 RxSwift 的接口定义实现一套事件源&观察者. 对比一下在内存管理上跟 RxSwift 有何区别.
struct Observer {
func on() {
print("Helo")
}
}
struct Observable {
let subscription: (Observer) -> ()
init(_ subscription: @escaping (Observer) -> ()) {
self.subscription = subscription
}
func subscribe(_ observer: Observer) {
subscription(observer)
}
}
func heloFunc() {
let observer = Observer()
let observable = Observable { (observer) in
observer.on()
}
observable.subscribe(observer)
}
示例代码删除了事件类型, observable 对象无参调用 observer 的 on 方法. observer 也不区分事件类型, 响应事件打印 "Helo". 我们执行 heloFunc, 控制台输出 Helo
.
结构体(跟 RxSwift 保持一致, 所以先不使用 Class)没有 deinit 方法, 为了便于观察生命周期, 引入一个 Delete 类, 该类在释放的时候输出 log 信息. 同时为 Observable 跟 Observer 添加 Delete 属性.
class Delete {
let deinitCallBack: () -> ()
init(_ onDeinit: @escaping () -> ()) {
deinitCallBack = onDeinit
}
deinit {
deinitCallBack()
}
}
struct Observer {
let delete = Delete {
print("Observer deinit")
} ...
}
struct Observable {
let delete = Delete {
print("Observable deinit")
} ...
}
这个时候再次执行 heloFunc , 可以得到如下输出: Helo
Observable deinit
Observer deinit
可以看到由于出了作用域, Observable, Observer 对象都被释放.
接下来我们再使用同样的方式对 RxSwift 进行测试, 由于 RxSwift 调用层级较深, 无法确定某些类是否只初始化一次, 因此我们在 init 方法里同样添加日志信息.
let observer = AnyObserver<Void> { (_) in
print("on")
}
let observable = Observable<Void>.create { observer in
observer.on(Event.next(()))
return Disposables.create()
}
observable.subscribe(observer)
// 省略添加 Log 的代码
运行代码, 得到输出: AnyObserver init
AnonymousObservable init
AnyObserver init
on
AnyObserver deinit
AnonymousObservable deinit
可以看到 AnyObserver 生成两次却只释放一次, 对比我们自己实现的事件源&观察者, 在出了作用域之外, 依然存在对象生命周期没有结束, 导致内存泄漏.
引用循环
示例代码中只显式生成一个 AnonymousObservable 对象以及一个 AnyObserver 对象, 所以问题应该在于 subscribe 操作生成了另一个 AnyObserver
对象, 并对两个 AnyObserver
对象其中的一个形成了循环引用.
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
CurrentThreadScheduler 是用来保证线程安全的类, 我们先忽略它.
可以看到生成了一个 SinkDisposer 对象, 并且向该对象注入 sinkAndSubscription 的两个属性. 我们暂且不管 sinkAndSubscription 的类型, 将该持有关系记录下来.
SinkDisposer -> sinkAndSubscription.sink
SinkDisposer -> sinkAndSubscription.subscription
接下来到了 run 函数的实现, 该函数的入参为观察者 observer 以及刚刚创建的 SinkDisposer.
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
可以看到 AnonymousObservableSink 持有了传入的 observer 以及 SinkDisposer, 并且作为 sinkAndSubscription.sink 返回到上层. 用有向图记录该持有关系:
Observer 持有关系SinkDisposer 与 AnonymousObservableSink 构成循环引用, 从而导致 Observer 无法释放!
解决方案
回想起 Closure 循环引用的解决方案, 无非就两种: 弱引用以及手动打破持有关系. SinkDisposer 本身是作为 subscribe 的返回值返回到上层的, 所以最简单的方式当然是持有该 SinkDisposer, 在合适的方法调用 dispose.
该对象的 dispose 方法对 AnonymousObservableSink 置 nil, 从而手动打破了持有关系.
let diposer = observable.subscribe(observer)
disposer.disposed()
DisposableBag
然而这就需要用户去关心 observer 什么时候不再需要监听事件. 其实我们的需求只是不要造成内存泄漏, 完全可以接受内存的释放不那么及时. 因此 RxSwift 还提供了一个类 DisposeBag.
DisposeBag 就是一个 Disposable 对象的集合, 可以将订阅产生的 Disposable 对象加入到 DisposeBag 中, 由其统一在特定时机调用 dispose. 通过观察代码可以发现, 假如用户不手动清理 DisposeBag, 那么它将在 deinit 的时候 对所有 Disposable 对象调用 dispose.
deinit {
dispose()
}
private func dispose() {
let oldDisposables = _dispose()
for disposable in oldDisposables {
disposable.dispose()
}
}
onError/onCompleted
此外, 事件源内部也可以通过事件来管理 observer 的生命周期, 任意 .error/.Complete 事件都将触发 dispose.
在这个例子中是通过 AnonymousObservableSink 来实现这个功能的, Sink 可以理解为事件的管道, 所有发送的事件都是通过 Sink 转发给真正的 observer. AnonymousObservableSink 只要拦截到 .error/.Complete 事件则会触发自身的 dispose, 从而触发 SinkDisposer 打破循环引用.
func on(_ event: Event<E>) {
...
case .error, .completed:
if AtomicCompareAndSwap(0, 1, &_isStopped) {
forwardOn(event)
dispose()
}
}
}
TakeUntil
TakeUntil 则是将事件的生命周期跟另一个事件源相绑定, 只要另一个事件源发出.next/.error事件, 则触发 dispose.
RxSwift 使用 TakeUntilSink 这种类型的管道实现这个功能, TakeUntilSink 在为正常事件流提供事件转发功能的同时, 创建了另一个观察者 TakeUntilSinkOther, 该观察者对 TakeUntil 的参数进行观察, 一旦接收到.next/.error事件, 则对 TakeUntilSink 进行 dispose, 从而打破循环引用.
需要注意的是另一个事件源发出的.complete 事件并不会触发 dispose, 反而会造成 TakeUntilSinkOther 被释放, 失去生命周期绑定的效果. 因此, 除非对事件源的事件产生顺序非常有把握, 或者是为了实现一些特定的需求, 一般不推荐单纯使用 TakeUntil 做内存管理.