RxSwift高阶函数之combineLatest
combineLatest 操作符将多个
Observables
中最新的元素通过一个函数组合起来,然后将这个组合的结果发出来。这些源Observables
中任何一个发出一个元素,他都会发出一个元素(前提是,这些Observables
曾经发出过元素)。
示例:
let first = PublishSubject<String>()
let second = PublishSubject<String>()
Observable.combineLatest(first, second) {$0 + $1}
.subscribe(onNext: { print($0)})
.disposed(by: disposeBag)
first.onNext("0")
first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
second.onNext("C")
second.onNext("D")
first.onNext("3")
first.onNext("4")
first.onNext("5")
打印结果:
1A
2A
2B
2C
2D
3D
4D
5D
结果分析:
combineLatest函数需要两个序列都有信号,
并且将两个序列最近的的信号通过组合方法组合成一个信号,再发送给订阅者。
combineLatest 源码分析
查看combineLatest源码。
发现这只有一个协议函数的声明。老规矩,根据注释查找到函数的真正实现。因为示例中使用的是两个序列的组合,所以应该查看CombineLatest+arity.swift文件。
找到 combineLatest
函数。
combineLatest
函数是初始化了 CombineLatest2
对象。
查看
CombineLatest2
对象的初始化方法,其中保存了作为参数传递过来的两个序列和组合方法。
因为 CombineLatest2
对象继承自 Producer
。所以按照之前RxSwift核心逻辑简介的分析,最终会执行 run
函数。
在 run
函数中,又初始化了 CombineLatestSink2_
对象。
arity
参数传值为 2,表示有 2 个可观察序列。
在 CombineLatest2.run
函数中初始化CombineLatestSink2_
成功后,调用了 CombineLatestSink2_
的 run
函数。
run
函数中,初始化了两个组合观察者 CombineLatestObserver
。同时让两个源序列 subscribe
两个观察者。
参照RxSwift核心逻辑简介中的分析,将CombineLatestObserver
类比为 AnyObserver
对象。 最终会调用 CombineLatestObserver
的 on
函数。
最后会调用 self._parent.next(self._index)
。 就会调用到 CombineLatestSink2_
对象的 next
函数。但是通过源码可以看到,CombineLatestSink2_
对象并没有 next
函数,所以我们查找其父类 CombineLatestSink
。 在父类中找到 next
函数。
func next(_ index: Int) {
if !self._hasValue[index] {
self._hasValue[index] = true
self._numberOfValues += 1
}
if self._numberOfValues == self._arity {
do {
let result = try self.getResult()
self.forwardOn(.next(result))
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
}
else {
var allOthersDone = true
for i in 0 ..< self._arity {
if i != index && !self._isDone[i] {
allOthersDone = false
break
}
}
if allOthersDone {
self.forwardOn(.completed)
self.dispose()
}
}
}
这其中,就蕴含着 combineLatest
函数的关键。
- self._arity 的值为2
- self._hasValue 是在初始化时创建的一个2个元素均为false的数组。
- 当第0个序列发送第1个信号时。
self._hasValue[0]
为false。self._hasValue[0]
赋值为true,同时self._numberOfValues
的值加1。当前self._numberOfValues
值为1。判断self._numberOfValues == self._arity
值为false,所以不会调用self.forwardOn
发送组合信号。 - 当第0个序列发送第2个信号时。
self._hasValue[0]
为true。self._numberOfValues
的值不会加1。当前self._numberOfValues
值为1。判断self._numberOfValues == self._arity
值为false,所以仍然不会调用self.forwardOn
发送组合信号。 - 当发送第1个序列的第1个信号时。
self._hasValue[1]
为false。self._hasValue[1]
赋值为true,同时self._numberOfValues
的值加1。当前self._numberOfValues
值为2。判断self._numberOfValues == self._arity
值为true,所以会调用getResult()
函数,并且得到组合后的信号作为self.forwardOn
函数的参数发送出去。 所以会发送第一个组合信号。 - 当发送第0个序列的第3个信号时。
self._hasValue[0]
为true。self._numberOfValues
的值不会加1。当前self._numberOfValues
值为2。判断self._numberOfValues == self._arity
值为true,所以会调用getResult()
函数,并且得到组合后的信号作为self.forwardOn
函数的参数发送出去。 所以会发送第一个组合信号。
以此类推。直到所有序列发送完所有的信号。
以上分析则是 combineLatest
函数的底层原理。若有不足之处,请评论指正。
最后,因为文字叙述可能不是很直观,所以附上图解,小伙伴们可以参照图解更直观的理解 combineLatest
函数。