RxSwift-Subject
2020-03-28 本文已影响0人
lmfei
Subject是Observable,也是Observer,所以它可以在有新值时发送消息,也可以订阅这些消息。
Subject的子类
Subject子类有PublishSubject、BehaviorSubject、ReplaySubject、AsyncSubject
- PublishSubject - 只会打印订阅后发送的消息
使用
let publishSub = PublishSubject<Int>()
publishSub.onNext(1)
_ = publishSub
.subscribe{ print("订阅到了:\($0)")}
.disposed(by: disposeBag)
publishSub.onNext(2)
publishSub.onNext(3)
打印
订阅到了:next(2)
订阅到了:next(3)
- BehaviorSubject - 会打印出来订阅前最后的一个消息,以及订阅后发送的消息
使用
let behaviorSub = BehaviorSubject<Int>(value: 0)
behaviorSub.onNext(1)
behaviorSub.onNext(2)
behaviorSub.subscribe(onNext: {
print("订阅了:\($0)")
}).disposed(by: disposeBag)
behaviorSub.onNext(3)
behaviorSub.onNext(4)
behaviorSub.onNext(5)
打印
订阅了:2
订阅了:3
订阅了:4
订阅了:5
- ReplaySubject - 会打印出来订阅前bufferSize个消息,以及订阅后发送的消息
使用
let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
replaySub.onNext(0)
replaySub.onNext(1)
replaySub.onNext(2)
replaySub.subscribe(onNext: {
print("ReplaySubject:订阅了\($0)")
}).disposed(by: disposeBag)
replaySub.onNext(4)
replaySub.onNext(5)
打印
ReplaySubject:订阅了1
ReplaySubject:订阅了2
ReplaySubject:订阅了4
ReplaySubject:订阅了5
- AsyncSubject - 在收到completed消息后,会打印最后一个消息,在收到error消息后,会移除所有观察者,并返回错误
使用
let asyncSub = AsyncSubject<Int>()
asyncSub.onNext(0)
asyncSub.onNext(1)
asyncSub.subscribe(onNext: {
print("AsyncSubject:订阅了\($0)")
}, onError:{
print("AsyncSubject:订阅error:\($0)")
}).disposed(by: disposeBag)
asyncSub.onNext(2)
asyncSub.onNext(3)
asyncSub.onCompleted()
//asyncSub.onError(NSError.init(domain: "haha", code: -4, userInfo: nil))
打印
AsyncSubject:订阅了3
其他的既是Observable又是Observer的类
Variable、BehaviorRelay也有同样的性质
- Variable - 已废弃
代码
let variable = Variable<Int>.init(0)
variable.value = 1
variable.asObservable().subscribe(onNext: {
print("Variable订阅到\($0)")
})
variable.value = 2
variable.value = 3
打印
Variable订阅到1
Variable订阅到2
Variable订阅到3
代码
let behaviorRelay = BehaviorRelay<Int>.init(value: 0)
behaviorRelay.accept(1)
print("BehaviorRelay.value:\(behaviorRelay.value)")
behaviorRelay.subscribe(onNext: {
print("BehaviorRelay 订阅到\($0)")
}).disposed(by: disposeBag)
behaviorRelay.accept(2)
print("BehaviorRelay.value:\(behaviorRelay.value)")
behaviorRelay.accept(3)
打印
BehaviorRelay.value:1
BehaviorRelay 订阅到1
BehaviorRelay 订阅到2
BehaviorRelay.value:2
BehaviorRelay 订阅到3
下面针对PublishSubject进行源码解析
PublishSubject源码解析
public final class PublishSubject<Element>
: Observable<Element>
, SubjectType
, Cancelable
, ObserverType
, SynchronizedUnsubscribeType {
public typealias SubjectObserverType = PublishSubject<Element>
typealias Observers = AnyObserver<Element>.s
typealias DisposeKey = Observers.KeyType
...
// state
private var _isDisposed = false
private var _observers = Observers()
private var _stopped = false
private var _stoppedEvent = nil as Event<Element>?
...
/// Notifies all subscribed observers about next event.
///
/// - parameter event: Event to send to the observers.
public func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
dispatch(self._synchronized_on(event), event)
}
func _synchronized_on(_ event: Event<Element>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
/**
Subscribes an observer to the subject.
- parameter observer: Observer to subscribe to the subject.
- returns: Disposable object that can be used to unsubscribe the observer from the subject.
*/
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
/// Returns observer interface for subject.
public func asObserver() -> PublishSubject<Element> {
return self
}
}
我们在使用的时候,会先创建一个PublishSubject对象。
通过上面代码,可以看到PublishSubject遵循了Observable,即它是观察者,同时它实现了subscribe方法;也遵循了ObserverType,即也是监听者,实现了on方法。
在我们订阅PublishSubject既调用subscribe方法时,会执行Observable的subscribe方法,从而调用子类PublishSubject的subscribe,进而会执行_observers.insert(observer.on)方法,将observer.on插入_observers中。
当我们发送消息onNext时,会执行ObserverType的on,进而执行PublishSubject实现的on方法,这个方法会执行dispatch(self._synchronized_on(event), event)方法,其中_synchronized_on会返回_observers,在通过dispatch依次去执行observer去解析.next方法,进而执行subscribe的block,完成打印。