RxSwift-TakeUntil源码分析

2019-08-10  本文已影响0人  king_jensen

上文RxSwift-deallocating,deallocated源码解析,我们提到deallocating序列和takeUntil序列经常结合起来使用,本文将分析takeUntil,探索一下takeUntil内部是如何接收deallocating发送的响应非常有必要。

takeUntil源码解析

  public func takeUntil<Source: ObservableType>(_ other: Source)
        -> Observable<Element> {
        return TakeUntil(source: self.asObservable(), other: other.asObservable())
    }

返回的是一个TakeUntil序列,TakeUntil对象依然继承自我们熟悉的Producer

final private class TakeUntil<Element, Other>: Producer<Element> {
    
    fileprivate let _source: Observable<Element>
    fileprivate let _other: Observable<Other>
    
    init(source: Observable<Element>, other: Observable<Other>) {
        self._source = source
        self._other = other
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = TakeUntilSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

TakeUntil序列保存了原始序列_source(上文的例子中,_source保存的是vc.publicOB序列)以及序列_other(上文的例子中,_other保存的就是vc.rx.deallocating序列)
外界的subscribe的是TakeUntil序列。TakeUntil序列被订阅时,会执行TakeUntil.run,TakeUntil.run调用TakeUntilSink.run

 init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self._parent = parent
        super.init(observer: observer, cancel: cancel)
    }

TakeUntilSink保存TakeUntil_parent属性.

 func run() -> Disposable {
        let otherObserver = TakeUntilSinkOther(parent: self)
        let otherSubscription = self._parent._other.subscribe(otherObserver)
        otherObserver._subscription.setDisposable(otherSubscription)
        let sourceSubscription = self._parent._source.subscribe(self)
        
        return Disposables.create(sourceSubscription, otherObserver._subscription)
    }

TakeUntilSink.run中,
1.创建观察者TakeUntilSinkOther, TakeUntilSinkOther_parent保存TakeUntilSink
2.self._parent._other.subscribe(otherObserver)订阅序列,这个序列就是上文的例子的vc.rx.deallocating序列。

RxSwift-deallocating,deallocated源码解析中,我们提到当对象在dealloc时,会向DeallocatingProxy中存储ReplaySubject序列(就是vc.rx.deallocating序列)发送响应,会来到这里的观察者TakeUntilSinkOther.on(.next())

func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }

    func _synchronized_on(_ event: Event<Element>) {
        switch event {
        case .next:
            self._parent.forwardOn(.completed)
            self._parent.dispose()
        case .error(let e):
            self._parent.forwardOn(.error(e))
            self._parent.dispose()
        case .completed:
            self._subscription.dispose()
        }
    }

TakeUntilSinkOther.on(.next())----->Sink.forwardOn(.completed)----->TakeUntilSink.observer.(.completed).
最终会给TakeUntil序列的observer发送.completed信号,序列完成并销毁。

总结

1.takeUntil是一个中间层,在takeUntil被订阅的流程中,中间层takeUntil会订阅rx.deallocating()序列.
2.当对象被销毁,对rx.deallocating()发送响应时,会调用到观察者的TakeUntilSinkOther.on(.next()),最终会给TakeUntil序列的observer发送.completed信号,序列完成并销毁。

TakeUntil.png
上一篇下一篇

猜你喜欢

热点阅读