RxSwift about ReplaySubject
功能说明
Broadcasts new events to all subscribers, and the specified bufferSize number of previous events to new subscribers.
看下官方示例:
example("ReplaySubject") {
let disposeBag = DisposeBag()
let subject = ReplaySubject<String>.create(bufferSize: 1)
subject.addObserver("1").disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")
subject.addObserver("2").disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")
}
// log
--- ReplaySubject example ---
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
继承体系
ReplaySubject Diagram.pngReplaySubject
: 基本上是一个空壳,等同于抽象类
ReplayOne
: 重播最近的一个信号
ReplayMany
: 重播最近的指定个数的信号
ReplayAll
: 重播所有信号,官方特意说明慎重使用
具体实现
ReplaySubject
等同于抽象类,就不讲了。
// ReplayBufferBase Class
func _synchronized_subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
if _isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let anyObserver = observer.asObserver()
replayBuffer(anyObserver)
if let stoppedEvent = _stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
else {
let key = _observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
}
func _synchronized_on(_ event: Event<E>) -> Observers {
_lock.lock(); defer { _lock.unlock() }
if _isDisposed {
return Observers()
}
if _isStopped {
return Observers()
}
switch event {
case .next(let element):
addValueToBuffer(element)
trim()
return _observers
case .error, .completed:
_stoppedEvent = event
trim()
let observers = _observers
_observers.removeAll()
return observers
}
}
这里区别于PublishSubject的地方在于,_synchronized_on的时候多了个replayBuffer
操作, 在Onnext的时候多了addValueToBuffer 和 trim 操作。
// ReplayOne
override func addValueToBuffer(_ value: Element) {
_value = value
}
override func replayBuffer<O: ObserverType>(_ observer: O) where O.E == Element {
if let value = _value {
observer.on(.next(value))
}
}
ReplayOne 只重播最近的一个信号,它是怎么实现的呢? Onnext 的时候会触发addValueToBuffer,然后使用_value 存储事件信息, 一旦ReplayOne 被新的Observer订阅则会触发replayBuffer, 立马为新的observer 发送OnNext事件,值得注意的是这里重播存储的都是next事件,complete 和error事件不重播也不存储
// ReplayManyBase
override func addValueToBuffer(_ value: Element) {
_queue.enqueue(value)
}
override func replayBuffer<O: ObserverType>(_ observer: O) where O.E == Element {
for item in _queue {
observer.on(.next(item))
}
}
ReplayManyBase 使用Queue存储容器,Queue在之前介绍过,遵循先进先出可扩展的循环队列。 一旦被订阅,会立马向新的observer, 重放所有存储的事件
fileprivate final class ReplayMany<Element> : ReplayManyBase<Element> {
private let _bufferSize: Int
init(bufferSize: Int) {
_bufferSize = bufferSize
super.init(queueSize: bufferSize)
}
override func trim() {
while _queue.count > _bufferSize {
_ = _queue.dequeue()
}
}
}
ReplayMany 继承于ReplayManyBase, 只存储指定个数的信号,一旦超过存储的数量会将最先入列的事件,从队列中移除,以达到保留最近指定个数信号的目的,结合ReplayBufferBase._synchronized_on 阅读该段代码。
fileprivate final class ReplayAll<Element> : ReplayManyBase<Element> {
init() {
super.init(queueSize: 0)
}
override func trim() {
}
}
ReplayAll 对存储的事件数量,没有做任何限制。
其他
BehaviorSubject
:必须给一个初始值,重播最近的一个next事件
Variable
被弃用, 官方推荐使用 RxCocoa.BehaviorRelay,
内部封装了BehaviorSubject, 在设置属性的时候,会触发BehaviorSubject.on事件