RxSwift高阶函数之publish
publish 会将 Observable 转换为 可被连接的Observable。可被连接的Observable 和 普通的 Observable 十分相似,不过在被订阅后不会发出元素,直到 connect 操作符被应用为止。这样,你就可以 Observable 在何时开始发出元素。
![]()
示例:
let netOb = Observable<String>.create { (observer) -> Disposable in
print("--模拟请求网络开始--")
sleep(2)// 模拟网络延迟
print("--模拟网络请求成功--")
observer.onNext("请求到网络数据")
return Disposables.create()
}
.publish()
_ = netOb.subscribe({ print("1-订阅到:\($0)") })
.disposed(by: disposeBag)
_ = netOb.subscribe({ print("2-订阅的:\($0)") })
.disposed(by: disposeBag)
_ = netOb.connect()
打印结果:
--模拟请求网络开始--
--模拟网络请求成功--
1-订阅到:next(请求到网络数据)
2-订阅的:next(请求到网络数据)
分析打印结果,可以发现,虽然对序列订阅了两次,但是只发送了一次信号,并且两个观察都有响应。如果没有对 Observable 执行 publish()
和 最后的 netOb.connect()
,那么按照RxSwift核心逻辑中的分析,就会在两个观察者订阅的时候,都会发送信号,就会发送多次信号。
那么我们就来分析一下,publish
是如何让多次订阅只发送一次信号的。
查看 publish
源码:

publish
函数内部,调用了 multicast
函数,并且有一个尾随闭包作为函数参数。而尾随闭包中的代码块作用是初始化一个 PublishSubject
实例对象。
再查看 multicast
函数源码。

ConnectableObservableAdapter
对象。
ConnectableObservableAdapter
初始化时仅将源序列保存在 self._source
和闭包保存在 self._makeSubject
中。
订阅信号 subscribe
对源序列执行 publish()
后,源序列变为 ConnectableObservableAdapter
,再对其进行订阅,会执行 ConnectableObservableAdapter.subscribe
函数。

subscribe
函数会执行 self.lazySubject.subscribe(observer)
,即会对 self.lazySubject
进行订阅。而
self.lazySubject
从命名上看就知道,这个一个懒加载属性。 因为初始化时 self._subject
为nil,所以会调用 self._makeSubject
闭包。回调结果是初始化一个 PublishSubject
的实例。
因此,self.lazySubject
其实是 PublishSubject
对象。 也即是对 PublishSubject
对象进行订阅 subscribe
。 又因为 PublishSubject
仅初始化,并没有创建序列,所以,并不会发出信号。同时,由于 PublishSubject
的特性,会把 观察者 保存起来,所以并不会丢失源序列的观察者。PublishSubject
具体分析请参阅RxSwift之Subject。
连接 connect
在需要发送信号的时候,则执行 connect()
函数开始发送信号。查看 connect
源码。

self._lock.calculateLocked
会直接执行闭包,所以会创建 Connection
对象,并对源序列 self._source
进行订阅。
根据 RxSwift核心逻辑简介 中的分析可以知道调用 self._source.subscribe(connection)
最终调用 connection.on
函数。

self._subjectObserver.on(event)
,分析可知,即调用 PublishSubject.on(event)
,实现对所有观察者发送信号。详细分析请参阅RxSwift之Subject。
以上即为 publish 的底层分析,其内部主要使用了 PublishSubject 保存多个观察者,实现多次订阅,一次发送的目的。
上面的分析中若有不足,请评论指正。