RxSwift CombineLatest 解析

2019-08-02  本文已影响0人  zhongxiaoyue

CombineLatest

CombineLatest 方法可以将多个Observables的信号合并成一个信号然后发送给订阅者.因此CombineLatest 序列发送信号的前提条件是每个Observables都发出过信号.

当多个 Observables 中任何一个发出一个信号,就发出一个信号。这个信号是由这些Observables 中最新的信号,通过一个函数组合起来的

combineLatest.png

combineLatest 操作符将多个 Observables 中最新的信号通过一个函数组合起来,然后将这个组合的结果发出来。这些源 Observables 中任何一个发出一个信号,他都会发出一个信号(前提是,这些 Observables 曾经发出过信号)。

Demo

let disposeBag = DisposeBag()

let first = PublishSubject<String>()
let second = PublishSubject<String>()

Observable.combineLatest(first, second) { $0 + $1 }
          .subscribe(onNext: { print($0) })
          .disposed(by: disposeBag)

first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
second.onNext("C")
second.onNext("D")
first.onNext("3")
first.onNext("4")

1A
2A
2B
2C
2D
3D
4D

核心逻辑

注: 因为逻辑相似,这里只以合并2个序列信号,序列A和序列B为例.

    public static func combineLatest<O1: ObservableType, O2: ObservableType>
        (_ source1: O1, _ source2: O2)
            -> Observable<(O1.E, O2.E)> {
        return CombineLatest2(
            source1: source1.asObservable(), source2: source2.asObservable(),
            resultSelector: { ($0, $1) }
        )
    }
    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == R {
        let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
    func run() -> Disposable {
        let subscription1 = SingleAssignmentDisposable()
        let subscription2 = SingleAssignmentDisposable()

        let observer1 = CombineLatestObserver(lock: self._lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
        let observer2 = CombineLatestObserver(lock: self._lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)

         subscription1.setDisposable(self._parent._source1.subscribe(observer1))
         subscription2.setDisposable(self._parent._source2.subscribe(observer2))

        return Disposables.create([
                subscription1,
                subscription2
        ])
    }
    func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }

    func _synchronized_on(_ event: Event<Element>) {
        switch event {
        case .next(let value):
            self._setLatestValue(value)
            self._parent.next(self._index)
        case .error(let error):
            self._this.dispose()
            self._parent.fail(error)
        case .completed:
            self._this.dispose()
            self._parent.done(self._index)
        }
    }
    func next(_ index: Int) {
        if !self._hasValue[index] {
            self._hasValue[index] = true
            self._numberOfValues += 1
        }

        if self._numberOfValues == self._arity {
            do {
                let result = try self.getResult()
                self.forwardOn(.next(result))
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        }
        else {
            var allOthersDone = true

            for i in 0 ..< self._arity {
                if i != index && !self._isDone[i] {
                    allOthersDone = false
                    break
                }
            }
            
            if allOthersDone {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }

思维导图

上一篇下一篇

猜你喜欢

热点阅读