Rx

RxSwift源码分析(9)——combineLatest

2020-10-12  本文已影响0人  无悔zero

今天开始一起分析一些常用的高阶函数,今天先分析combineLatest。有时候我们需要同时监听多个响应,或者合并多个响应时,就可以用combineLatest
我们先看下面的例子,在登录界面,只有同时输入了账号和密码才能点击登录:

let userNameVaild = userNameTf.rx.text.orEmpty
.map { (text) -> Bool in
     return text.count > 0
}
let passwordVaild = passwordTf.rx.text.orEmpty
.map { (text) -> Bool in
     return text.count > 0
}
Observable.combineLatest(userNameVaild,passwordVaild) { $0 && $1 }
.bind(to: loginBtn.rx.isEnabled)
.disposed(by: disposeBag)
没有密码不能点击 没有账号不能点击 可以点击
  1. 前面只是创建序列,我们直接来看combineLatest,返回CombineLatest2序列(CombineLatest2继承了Producer):
extension ObservableType {
    public static func combineLatest<O1: ObservableType, O2: ObservableType>
        (_ source1: O1, _ source2: O2, resultSelector: @escaping (O1.E, O2.E) throws -> E)
            -> Observable<E> {
        return CombineLatest2(
            source1: source1.asObservable(), source2: source2.asObservable(),
            resultSelector: resultSelector
        )
    }
}
final class CombineLatest2<E1, E2, R> : Producer<R> {
    ...
    init(source1: Observable<E1>, source2: Observable<E2>, resultSelector: @escaping ResultSelector) {
        self._source1 = source1  //保存两个源序列
        self._source2 = source2

        self._resultSelector = resultSelector
    }
    
    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == R {
        let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}
  1. CombineLatest2run函数里创建CombineLatestSink2_(业务下沉,CombineLatestSink2_继承了CombineLatestSink),调用了非常熟悉的sink.run(),我们马上进入CombineLatestSink2_里看看:
final class CombineLatestSink2_<E1, E2, O: ObserverType> : CombineLatestSink<O> {
    ...
    func run() -> Disposable {
        let subscription1 = SingleAssignmentDisposable()
        let subscription2 = SingleAssignmentDisposable()
        //分别创建观察者
        let observer1 = CombineLatestObserver(lock: self._lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
        let observer2 = CombineLatestObserver(lock: self._lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
        //先订阅再setDisposable
        subscription1.setDisposable(self._parent._source1.subscribe(observer1))
        subscription2.setDisposable(self._parent._source2.subscribe(observer2))

        return Disposables.create([
                subscription1,
                subscription2
        ])
    }
}
  1. CombineLatestSink2_创建了两个CombineLatestObserver观察者分别用源序列进行订阅,根据RxSwift核心逻辑,最终来到CombineLatestObserveron函数里:
final class CombineLatestObserver<ElementType>
    : ObserverType
    , LockOwnerType
    , SynchronizedOnType {
    ...
    init(lock: RecursiveLock, parent: CombineLatestProtocol, index: Int, setLatestValue: @escaping ValueSetter, this: Disposable) {
        self._lock = lock
        self._parent = parent //保存CombineLatestSink2_
        self._index = index
        self._this = this
        self._setLatestValue = setLatestValue
    }
    
    func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }

    func _synchronized_on(_ event: Event<Element>) {
        switch event {
        case .next(let value):
            self._setLatestValue(value)
            self._parent.next(self._index)
        case .error(let error):
            self._this.dispose()
            self._parent.fail(error)
        case .completed:
            self._this.dispose()
            self._parent.done(self._index)
        }
    }
}
extension SynchronizedOnType {
    func synchronizedOn(_ event: Event<E>) {
        self.lock(); defer { self.unlock() }
        self._synchronized_on(event)
    }
}

CombineLatestObserver.on调用后,最终调用了self._parent.next(self._index)self._parent就是初始化保存下来的CombineLatestSink2_

  1. CombineLatestSink2_没有实现具体的方法,我们看父类CombineLatestSink
class CombineLatestSink<O: ObserverType>
    : Sink<O>
    , CombineLatestProtocol {
    ...
    func next(_ index: Int) {
        if !self._hasValue[index] {
            self._hasValue[index] = true
            self._numberOfValues += 1  //标记
        }
        //判断是否能够响应
        if self._numberOfValues == self._arity {
            do {
                let result = try self.getResult()
                self.forwardOn(.next(result))
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        }
        else {
            var allOthersDone = true

            for i in 0 ..< self._arity {
                if i != index && !self._isDone[i] {
                    allOthersDone = false
                    break
                }
            }
            
            if allOthersDone {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
if !self._hasValue[index] {
    self._hasValue[index] = true
    self._numberOfValues += 1  //标记
}
//判断是否能够响应
if self._numberOfValues == self._arity {
    ...
}
else {
    ...
}

首先将序列进行标记self._numberOfValues += 1,当两个序列或n个序列都进行了订阅时,self._numberOfValues等于self._arity,便可执行下一步self.getResult()获取结果。

  1. self.getResult()里面调用了self._parent._resultSelector
final class CombineLatestSink2_<E1, E2, O: ObserverType> : CombineLatestSink<O> {
    ...
    override func getResult() throws -> R {
        return try self._parent._resultSelector(self._latestElement1, self._latestElement2)
    }
}

self._parent._resultSelector是一开始就保存的外面的闭包,然后返回一个值:

Observable.combineLatest(userNameVaild,passwordVaild) { $0 && $1 }
  1. 最后才调用了self.forwardOn(.next(result))发送响应,实现通过两个序列来控制loginBtn是否能够点击。
上一篇下一篇

猜你喜欢

热点阅读