RxSwift源码分析(11)——publish
2020-10-13 本文已影响0人
无悔zero
今天分析一下publish这个高阶函数,这个函数使用起来有点像网络请求,需要启动连接才会进行相关订阅、发送和响应。可以直接来看下面的例子:
let ob = Observable<Any>.create { (observer) -> Disposable in
observer.onNext("连接后才订阅、发送和响应")
observer.onCompleted()
return Disposables.create()
}.publish() //注意
ob.subscribe { (text) in
print(text)
}.disposed(by: disposeBag)
_ = ob.connect() //连接
- 我们先来看
publish()
源码:
extension ObservableType {
public func publish() -> ConnectableObservable<Element> {
return self.multicast { PublishSubject() }
}
}
-
PublishSubject
继承了SubjectType
协议,是一个既能发送响应又能接收响应的对象。(详细可以看RxSwift源码分析(12)——Subject)
public final class PublishSubject<Element>
: Observable<Element>
, SubjectType
, Cancelable
, ObserverType
, SynchronizedUnsubscribeType {
...
}
publish
函数其实就是创建了ConnectableObservableAdapter
序列返回:
extension ObservableType {
...
public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
-> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
}
}
ConnectableObservableAdapter
其实也继承了Observable
和ObservableType
,所以也是一个序列:
final private class ConnectableObservableAdapter<Subject: SubjectType>
: ConnectableObservable<Subject.Element> {
...
init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
self._source = source //保存源序列
self._makeSubject = makeSubject //保存PublishSubject
self._subject = nil
self._connection = nil
}
...
}
public class ConnectableObservable<Element>
: Observable<Element>
, ConnectableObservableType {
...
}
public class Observable<Element> : ObservableType {
...
}
- 然后序列开始订阅:
ob.subscribe { (text) in
...
}
根据RxSwift核心逻辑,来到ConnectableObservableAdapter
的subscribe
函数,可以看到有一个lazySubject
,lazySubject
只会创建一次,有利于节省内存和减少性能消耗:
final private class ConnectableObservableAdapter<Subject: SubjectType>
: ConnectableObservable<Subject.Element> {
...
//只创建一次
fileprivate var lazySubject: Subject {
if let subject = self._subject {
return subject
}
let subject = self._makeSubject()
self._subject = subject
return subject
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
return self.lazySubject.subscribe(observer)
}
}
-
lazySubject
订阅后来到:
public final class PublishSubject<Element>
: Observable<Element>
, SubjectType
, Cancelable
, ObserverType
, SynchronizedUnsubscribeType {
public typealias SubjectObserverType = PublishSubject<Element>
typealias Observers = AnyObserver<Element>.s
...
private var _observers = Observers() //默认初始化
...
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
...
//重点
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
...
}
_synchronized_subscribe
函数的代码很容易迷惑人,其实它的重点在self._observers.insert(observer.on)
。
-
self._observers
是默认初始化的,具有保存的作用,它把observer.on
保存下来了:
struct Bag<T> : CustomDebugStringConvertible {
...
mutating func insert(_ element: T) -> BagKey {
let key = _nextKey
_nextKey = BagKey(rawValue: _nextKey.rawValue &+ 1)
if _key0 == nil {
_key0 = key
_value0 = element
return key
}
_onlyFastPath = false
if _dictionary != nil {
_dictionary![key] = element
return key
}
if _pairs.count < arrayDictionaryMaxSize {
_pairs.append((key: key, value: element)) //保存了observer.on
return key
}
_dictionary = [key: element]
return key
}
...
}
- 保存之后,下一步就是在适当的时候调用
connect
:
ob.connect()
进入源码,我们可以看到先创建connection
,然后用self._source
源序列来进行subscribe
订阅connection
:
final private class ConnectableObservableAdapter<Subject: SubjectType>
: ConnectableObservable<Subject.Element> {
...
override func connect() -> Disposable {
return self._lock.calculateLocked {
if let connection = self._connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
let subscription = self._source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
}
}
...
}
final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
...
init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
self._parent = parent
self._subscription = subscription
self._lock = lock
self._subjectObserver = subjectObserver
}
...
}
- 根据RxSwift核心逻辑,最终来到
connection
的on
函数:
final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
...
func on(_ event: Event<Subject.Observer.Element>) {
if isFlagSet(self._disposed, 1) {
return
}
if event.isStopEvent {
self.dispose()
}
self._subjectObserver.on(event)
}
...
}
- 然后调用
self._subjectObserver.on(event)
=>
Connection._subjectObserver.on
=>
ConnectableObservableAdapter.lazySubject.on
=>
ConnectableObservableAdapter._makeSubject.on
=>
PublishSubject.on
:
public final class PublishSubject<Element>
: Observable<Element>
, SubjectType
, Cancelable
, ObserverType
, SynchronizedUnsubscribeType {
...
public func on(_ event: Event<Element>) {
...
dispatch(self._synchronized_on(event), event)
}
func _synchronized_on(_ event: Event<Element>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
...
return Observers()
}
}
...
}
然后通过_synchronized_on
函数拿到self._observers
后执行dispatch(self._synchronized_on(event), event)
。
- 最后遍历调用保存的
observer.on
:
@inline(__always)
func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) {
bag._value0?(event)
if bag._onlyFastPath {
return
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = bag._dictionary {
for element in dictionary.values {
element(event)
}
}
}
- 根据RxSwift核心逻辑,
observer.on
就是AnonymousObserver.on
,最终来到外面的响应闭包:
ob.subscribe { (text) in
print(text)
}
整个流程简单来说就是:
1.创建序列,用publish
包装一下;
2.然后通过subscribe
函数保存observer.on
;
3.最后调用connect
来进行源序列的订阅,以此唤起发送和响应。