selector

RxSwift FlatMap 解析

2019-08-02  本文已影响0人  zhongxiaoyue

flatMap

Observable 的信号转换成其他的 Observable,然后将这些 Observables 合并

flatMap 操作符将源 Observable 的每一个信号应用一个转换方法,将他们转换成 Observables。 然后将这些 Observables 的信号合并之后再发送出来。

这个操作符是非常有用的,例如,当 Observable 的信号本身拥有其他的 Observable 时,你可以将所有 Observables 的信号发送出来。

Demo

let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "👦🏻")
let second = BehaviorSubject(value: "🅰️")
let variable = Variable(first)

variable.asObservable()
        .flatMap { $0 }
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)

first.onNext("🐱")
variable.value = second
second.onNext("🅱️")
first.onNext("🐶")

👦🏻
🐱
🅰️
🅱️
🐶

核心逻辑

    public func flatMap<Source: ObservableConvertibleType>(_ selector: @escaping (Element) throws -> Source)
        -> Observable<Source.Element> {
            return FlatMap(source: self.asObservable(), selector: selector)
    }
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == SourceSequence.Element {
        let sink = FlatMapSink(selector: self._selector, observer: observer, cancel: cancel)
        let subscription = sink.run(self._source)
        return (sink: sink, subscription: subscription)
    }
    func subscribeInner(_ source: Observable<Observer.Element>) {
        let iterDisposable = SingleAssignmentDisposable()
        if let disposeKey = self._group.insert(iterDisposable) {
            let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
            let subscription = source.subscribe(iter)
            iterDisposable.setDisposable(subscription)
        }
    }
    func on(_ event: Event<Element>) {
        self._parent._lock.lock(); defer { self._parent._lock.unlock() } // lock {
            switch event {
            case .next(let value):
                self._parent.forwardOn(.next(value))
            case .error(let error):
                self._parent.forwardOn(.error(error))
                self._parent.dispose()
            case .completed:
                self._parent._group.remove(for: self._disposeKey)
                self._parent._activeCount -= 1
                self._parent.checkCompleted()
            }
        // }
    }

思维导图

上一篇下一篇

猜你喜欢

热点阅读