RxSwift源码解析

RxSwift about ReplaySubject

2018-10-11  本文已影响11人  狼性刀锋

功能说明

Broadcasts new events to all subscribers, and the specified bufferSize number of previous events to new subscribers.

看下官方示例:

example("ReplaySubject") {
    let disposeBag = DisposeBag()
    let subject = ReplaySubject<String>.create(bufferSize: 1)
    
    subject.addObserver("1").disposed(by: disposeBag)
    subject.onNext("🐶")
    subject.onNext("🐱")
    
    subject.addObserver("2").disposed(by: disposeBag)
    subject.onNext("🅰️")
    subject.onNext("🅱️")
}

// log
--- ReplaySubject example ---
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)


继承体系

ReplaySubject Diagram.png

ReplaySubject: 基本上是一个空壳,等同于抽象类

ReplayOne: 重播最近的一个信号

ReplayMany: 重播最近的指定个数的信号

ReplayAll: 重播所有信号,官方特意说明慎重使用

具体实现

ReplaySubject 等同于抽象类,就不讲了。

// ReplayBufferBase Class

    func _synchronized_subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
        if _isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
     
        let anyObserver = observer.asObserver()
        
        replayBuffer(anyObserver)
        if let stoppedEvent = _stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        else {
            let key = _observers.insert(observer.on)
            return SubscriptionDisposable(owner: self, key: key)
        }
    }
    
    
    func _synchronized_on(_ event: Event<E>) -> Observers {
        _lock.lock(); defer { _lock.unlock() }
        if _isDisposed {
            return Observers()
        }
        
        if _isStopped {
            return Observers()
        }
        
        switch event {
        case .next(let element):
            addValueToBuffer(element)
            trim()
            return _observers
        case .error, .completed:
            _stoppedEvent = event
            trim()
            let observers = _observers
            _observers.removeAll()
            return observers
        }
    }

这里区别于PublishSubject的地方在于,_synchronized_on的时候多了个replayBuffer操作, 在Onnext的时候多了addValueToBuffer 和 trim 操作。

// ReplayOne

    override func addValueToBuffer(_ value: Element) {
        _value = value
    }

    override func replayBuffer<O: ObserverType>(_ observer: O) where O.E == Element {
        if let value = _value {
            observer.on(.next(value))
        }
    }

ReplayOne 只重播最近的一个信号,它是怎么实现的呢? Onnext 的时候会触发addValueToBuffer,然后使用_value 存储事件信息, 一旦ReplayOne 被新的Observer订阅则会触发replayBuffer, 立马为新的observer 发送OnNext事件,值得注意的是这里重播存储的都是next事件,complete 和error事件不重播也不存储

// ReplayManyBase

    override func addValueToBuffer(_ value: Element) {
        _queue.enqueue(value)
    }

    override func replayBuffer<O: ObserverType>(_ observer: O) where O.E == Element {
        for item in _queue {
            observer.on(.next(item))
        }
    }


ReplayManyBase 使用Queue存储容器,Queue在之前介绍过,遵循先进先出可扩展的循环队列。 一旦被订阅,会立马向新的observer, 重放所有存储的事件

fileprivate final class ReplayMany<Element> : ReplayManyBase<Element> {
    private let _bufferSize: Int
    
    init(bufferSize: Int) {
        _bufferSize = bufferSize
        
        super.init(queueSize: bufferSize)
    }
    
    override func trim() {
        while _queue.count > _bufferSize {
            _ = _queue.dequeue()
        }
    }
}

ReplayMany 继承于ReplayManyBase, 只存储指定个数的信号,一旦超过存储的数量会将最先入列的事件,从队列中移除,以达到保留最近指定个数信号的目的,结合ReplayBufferBase._synchronized_on 阅读该段代码。


fileprivate final class ReplayAll<Element> : ReplayManyBase<Element> {
    init() {
        super.init(queueSize: 0)
    }
    
    override func trim() {
        
    }
}

ReplayAll 对存储的事件数量,没有做任何限制。

其他

BehaviorSubject:必须给一个初始值,重播最近的一个next事件

Variable 被弃用, 官方推荐使用 RxCocoa.BehaviorRelay,
内部封装了BehaviorSubject, 在设置属性的时候,会触发BehaviorSubject.on事件

上一篇下一篇

猜你喜欢

热点阅读