RxSwift-Subject

2020-03-28  本文已影响0人  lmfei

Subject是Observable,也是Observer,所以它可以在有新值时发送消息,也可以订阅这些消息。

Subject的子类

Subject子类有PublishSubject、BehaviorSubject、ReplaySubject、AsyncSubject

let publishSub = PublishSubject<Int>()
publishSub.onNext(1)
_ = publishSub
    .subscribe{ print("订阅到了:\($0)")}
    .disposed(by: disposeBag)
publishSub.onNext(2)
publishSub.onNext(3)

打印

订阅到了:next(2)
订阅到了:next(3)
let behaviorSub = BehaviorSubject<Int>(value: 0)
behaviorSub.onNext(1)
behaviorSub.onNext(2)
behaviorSub.subscribe(onNext: {
    print("订阅了:\($0)")
}).disposed(by: disposeBag)
behaviorSub.onNext(3)
behaviorSub.onNext(4)
behaviorSub.onNext(5)

打印

订阅了:2
订阅了:3
订阅了:4
订阅了:5
let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
replaySub.onNext(0)
replaySub.onNext(1)
replaySub.onNext(2)
replaySub.subscribe(onNext: {
    print("ReplaySubject:订阅了\($0)")
}).disposed(by: disposeBag)
replaySub.onNext(4)
replaySub.onNext(5)

打印

ReplaySubject:订阅了1
ReplaySubject:订阅了2
ReplaySubject:订阅了4
ReplaySubject:订阅了5
let asyncSub = AsyncSubject<Int>()
asyncSub.onNext(0)
asyncSub.onNext(1)
asyncSub.subscribe(onNext: {
    print("AsyncSubject:订阅了\($0)")
}, onError:{
    print("AsyncSubject:订阅error:\($0)")
}).disposed(by: disposeBag)
asyncSub.onNext(2)
asyncSub.onNext(3)
asyncSub.onCompleted()
//asyncSub.onError(NSError.init(domain: "haha", code: -4, userInfo: nil))

打印

AsyncSubject:订阅了3

其他的既是Observable又是Observer的类

Variable、BehaviorRelay也有同样的性质

let variable = Variable<Int>.init(0)
variable.value = 1
variable.asObservable().subscribe(onNext: {
    print("Variable订阅到\($0)")
})
variable.value = 2
variable.value = 3

打印

Variable订阅到1
Variable订阅到2
Variable订阅到3

代码

let behaviorRelay = BehaviorRelay<Int>.init(value: 0)
behaviorRelay.accept(1)
print("BehaviorRelay.value:\(behaviorRelay.value)")
behaviorRelay.subscribe(onNext: {
    print("BehaviorRelay 订阅到\($0)")
}).disposed(by: disposeBag)
behaviorRelay.accept(2)
print("BehaviorRelay.value:\(behaviorRelay.value)")
behaviorRelay.accept(3)

打印

BehaviorRelay.value:1
BehaviorRelay 订阅到1
BehaviorRelay 订阅到2
BehaviorRelay.value:2
BehaviorRelay 订阅到3

下面针对PublishSubject进行源码解析

PublishSubject源码解析

public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType {
    public typealias SubjectObserverType = PublishSubject<Element>

    typealias Observers = AnyObserver<Element>.s
    typealias DisposeKey = Observers.KeyType
    ...    
    // state
    private var _isDisposed = false
    private var _observers = Observers()
    private var _stopped = false
    private var _stoppedEvent = nil as Event<Element>? 
    ...
    /// Notifies all subscribed observers about next event.
    ///
    /// - parameter event: Event to send to the observers.
    public func on(_ event: Event<Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        dispatch(self._synchronized_on(event), event)
    }

    func _synchronized_on(_ event: Event<Element>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        switch event {
        case .next:
            if self._isDisposed || self._stopped {
                return Observers()
            }
            
            return self._observers
        case .completed, .error:
            if self._stoppedEvent == nil {
                self._stoppedEvent = event
                self._stopped = true
                let observers = self._observers
                self._observers.removeAll()
                return observers
            }

            return Observers()
        }
    }
    
    /**
    Subscribes an observer to the subject.
    
    - parameter observer: Observer to subscribe to the subject.
    - returns: Disposable object that can be used to unsubscribe the observer from the subject.
    */
    public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

    func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        
        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }

   
    /// Returns observer interface for subject.
    public func asObserver() -> PublishSubject<Element> {
        return self
    }

}

我们在使用的时候,会先创建一个PublishSubject对象。
通过上面代码,可以看到PublishSubject遵循了Observable,即它是观察者,同时它实现了subscribe方法;也遵循了ObserverType,即也是监听者,实现了on方法。
在我们订阅PublishSubject既调用subscribe方法时,会执行Observable的subscribe方法,从而调用子类PublishSubject的subscribe,进而会执行_observers.insert(observer.on)方法,将observer.on插入_observers中。
当我们发送消息onNext时,会执行ObserverType的on,进而执行PublishSubject实现的on方法,这个方法会执行dispatch(self._synchronized_on(event), event)方法,其中_synchronized_on会返回_observers,在通过dispatch依次去执行observer去解析.next方法,进而执行subscribe的block,完成打印。

思维导图
PublishSubject
上一篇下一篇

猜你喜欢

热点阅读