Rx

RxSwift源码分析(12)——Subject

2020-10-15  本文已影响0人  无悔zero

今天说说上篇提到的Subject,这是一个特别的序列,它既是发送者也是响应者。其中常用的类型有:PublishSubjectBehaviorSubjectReplaySubjectAsyncSubjectBehaviorRelay等等。

(一)PublishSubject

PublishSubject订阅前不会响应:

let publishSub = PublishSubject<Int>()
        
publishSub.onNext(1)   //订阅之前的接收不到
 
publishSub.subscribe { (num) in
    print(num)
}.disposed(by: disposbag)
        
publishSub.onNext(2) //订阅之后正常接收
publishSub.onNext(3)

打印结果为:2,3。

  1. 具体流程就只看一下这个PublishSubjectPublishSubject继承了ObservableObserverType,所以既能发送又能响应:
public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType {

    typealias Observers = AnyObserver<Element>.s
    ...
    private var _observers = Observers()  //默认初始化
    ...
    public override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

    func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        ...
        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }
    ...
}

根据RxSwift核心逻辑PublishSubject订阅后会来到subscribe函数,接着下一步来到_synchronized_subscribe函数,这里很容易迷惑人,其重点在self._observers.insert(observer.on)

  1. self._observers是默认初始化的,具有保存的作用,它把observer.on保存下来了:
struct Bag<T> : CustomDebugStringConvertible {
    ...
    mutating func insert(_ element: T) -> BagKey {
        let key = _nextKey

        _nextKey = BagKey(rawValue: _nextKey.rawValue &+ 1)

        if _key0 == nil {
            _key0 = key
            _value0 = element
            return key
        }

        _onlyFastPath = false

        if _dictionary != nil {
            _dictionary![key] = element
            return key
        }

        if _pairs.count < arrayDictionaryMaxSize {
            _pairs.append((key: key, value: element))
            return key
        }
        
        _dictionary = [key: element]
        
        return key
    }
    ...
}
  1. 然后外面调用publishSub.onNext()时,根据RxSwift核心逻辑,来到PublishSubjecton函数:
public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType {
    ...
    public func on(_ event: Event<Element>) {
        ...
        dispatch(self._synchronized_on(event), event)
    }

    func _synchronized_on(_ event: Event<E>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        switch event {
        case .next:
            if self._isDisposed || self._stopped {
                return Observers()
            }
            
            return self._observers
        case .completed, .error:  
            ...
            return Observers()
        }
    }
    ...
}

然后通过_synchronized_on函数拿到self._observers后执行dispatch(self._synchronized_on(event), event)

  1. 最后遍历调用保存的observer.on
@inline(__always)
func dispatch<E>(_ bag: Bag<(Event<E>) -> Void>, _ event: Event<E>) {
    bag._value0?(event)

    if bag._onlyFastPath {
        return
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = bag._dictionary {
        for element in dictionary.values {
            element(event)
        }
    }
}
  1. 最后根据RxSwift核心逻辑完成发送响应流程:
publishSub.subscribe { (num) in
    print(num)
}
(二)BehaviorSubject

BehaviorSubjec会保存最新一个元素,订阅时会响应:

let behaviorSub = BehaviorSubject.init(value: 100)
        
behaviorSub.onNext(1)
behaviorSub.onNext(2)  //保存订阅前最新的一个元素,订阅时响应
        
behaviorSub.subscribe { (num) in
    print(num)
}.disposed(by: disposbag)
        
behaviorSub.onNext(3)  //订阅之后正常接收
behaviorSub.onNext(4)  

打印结果为:2,3,4。

BehaviorSubject BehaviorSubject BehaviorSubject
(三)ReplaySubject

ReplaySubject可以保存n个元素,订阅时都会响应:

let replaySub = ReplaySubject<Int>.create(bufferSize: 2)

replaySub.onNext(1)
replaySub.onNext(2)  
replaySub.onNext(3)  //保存订阅之前最新的2个元素,订阅时响应

replaySub.subscribe { (num) in
    print(num)
}.disposed(by: disposbag)

replaySub.onNext(4)  //订阅之后正常接收
replaySub.onNext(5)
replaySub.onNext(6)

打印结果为:2,3,4,5,6。

ReplaySubject ReplayBufferBase 1个元素 多个元素
(四)AsyncSubject

AsyncSubject发送完成时,才会响应最新的一个元素:

let asyncSub = AsyncSubject<Int>.init()
        
asyncSub.onNext(1)

asyncSub.subscribe { (num) in
    print(num)
}.disposed(by: disposbag)

asyncSub.onNext(2)
asyncSub.onNext(3)  //保存最新一个元素

asyncSub.onCompleted()  //只有发送完成时才会响应

打印结果为:3。

AsyncSubject AsyncSubject
(五)BehaviorRelay

用来代替原来的Variable,可以直接使用保存的最新一个元素:

let behaviorRelay = BehaviorRelay(value: 1)

print(behaviorRelay.value) //直接获取最新的元素

behaviorRelay.subscribe(onNext: { (num) in
    print(num)
}).disposed(by: disposbag)

behaviorRelay.accept(2) 

print(behaviorRelay.value)

打印结果为:1,1,2,2。

BehaviorRelay BehaviorSubject BehaviorSubject

具体流程分析都差不多,除了PublishSubject其他就不多啰嗦了。

上一篇下一篇

猜你喜欢

热点阅读