RxSwift源码分析(12)——Subject
2020-10-15 本文已影响0人
无悔zero
今天说说上篇提到的Subject,这是一个特别的序列,它既是发送者也是响应者。其中常用的类型有:PublishSubject、BehaviorSubject、ReplaySubject、AsyncSubject、BehaviorRelay等等。
(一)PublishSubject
PublishSubject
订阅前不会响应:
let publishSub = PublishSubject<Int>()
publishSub.onNext(1) //订阅之前的接收不到
publishSub.subscribe { (num) in
print(num)
}.disposed(by: disposbag)
publishSub.onNext(2) //订阅之后正常接收
publishSub.onNext(3)
打印结果为:2,3。
- 源码分析
- 具体流程就只看一下这个
PublishSubject
,PublishSubject
继承了Observable
和ObserverType
,所以既能发送又能响应:
public final class PublishSubject<Element>
: Observable<Element>
, SubjectType
, Cancelable
, ObserverType
, SynchronizedUnsubscribeType {
typealias Observers = AnyObserver<Element>.s
...
private var _observers = Observers() //默认初始化
...
public override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
...
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
...
}
根据RxSwift核心逻辑,PublishSubject
订阅后会来到subscribe
函数,接着下一步来到_synchronized_subscribe
函数,这里很容易迷惑人,其重点在self._observers.insert(observer.on)
。
-
self._observers
是默认初始化的,具有保存的作用,它把observer.on
保存下来了:
struct Bag<T> : CustomDebugStringConvertible {
...
mutating func insert(_ element: T) -> BagKey {
let key = _nextKey
_nextKey = BagKey(rawValue: _nextKey.rawValue &+ 1)
if _key0 == nil {
_key0 = key
_value0 = element
return key
}
_onlyFastPath = false
if _dictionary != nil {
_dictionary![key] = element
return key
}
if _pairs.count < arrayDictionaryMaxSize {
_pairs.append((key: key, value: element))
return key
}
_dictionary = [key: element]
return key
}
...
}
- 然后外面调用
publishSub.onNext()
时,根据RxSwift核心逻辑,来到PublishSubject
的on
函数:
public final class PublishSubject<Element>
: Observable<Element>
, SubjectType
, Cancelable
, ObserverType
, SynchronizedUnsubscribeType {
...
public func on(_ event: Event<Element>) {
...
dispatch(self._synchronized_on(event), event)
}
func _synchronized_on(_ event: Event<E>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
...
return Observers()
}
}
...
}
然后通过_synchronized_on
函数拿到self._observers
后执行dispatch(self._synchronized_on(event), event)
。
- 最后遍历调用保存的
observer.on
:
@inline(__always)
func dispatch<E>(_ bag: Bag<(Event<E>) -> Void>, _ event: Event<E>) {
bag._value0?(event)
if bag._onlyFastPath {
return
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = bag._dictionary {
for element in dictionary.values {
element(event)
}
}
}
- 最后根据RxSwift核心逻辑完成发送响应流程:
publishSub.subscribe { (num) in
print(num)
}
(二)BehaviorSubject
BehaviorSubjec
会保存最新一个元素,订阅时会响应:
let behaviorSub = BehaviorSubject.init(value: 100)
behaviorSub.onNext(1)
behaviorSub.onNext(2) //保存订阅前最新的一个元素,订阅时响应
behaviorSub.subscribe { (num) in
print(num)
}.disposed(by: disposbag)
behaviorSub.onNext(3) //订阅之后正常接收
behaviorSub.onNext(4)
打印结果为:2,3,4。
- 源码重点
(三)ReplaySubject
ReplaySubject
可以保存n个元素,订阅时都会响应:
let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
replaySub.onNext(1)
replaySub.onNext(2)
replaySub.onNext(3) //保存订阅之前最新的2个元素,订阅时响应
replaySub.subscribe { (num) in
print(num)
}.disposed(by: disposbag)
replaySub.onNext(4) //订阅之后正常接收
replaySub.onNext(5)
replaySub.onNext(6)
打印结果为:2,3,4,5,6。
ReplaySubject ReplayBufferBase 1个元素 多个元素(四)AsyncSubject
AsyncSubject
发送完成时,才会响应最新的一个元素:
let asyncSub = AsyncSubject<Int>.init()
asyncSub.onNext(1)
asyncSub.subscribe { (num) in
print(num)
}.disposed(by: disposbag)
asyncSub.onNext(2)
asyncSub.onNext(3) //保存最新一个元素
asyncSub.onCompleted() //只有发送完成时才会响应
打印结果为:3。
- 源码重点
(五)BehaviorRelay
用来代替原来的Variable
,可以直接使用保存的最新一个元素:
let behaviorRelay = BehaviorRelay(value: 1)
print(behaviorRelay.value) //直接获取最新的元素
behaviorRelay.subscribe(onNext: { (num) in
print(num)
}).disposed(by: disposbag)
behaviorRelay.accept(2)
print(behaviorRelay.value)
打印结果为:1,1,2,2。
- 源码重点
具体流程分析都差不多,除了
PublishSubject
其他就不多啰嗦了。