iOS开发框架使用与解析

RxSwift - Suject

2023-03-09  本文已影响0人  Lcr111

前言

RxSwift中Subject是一种非常特殊的序列协议,全局搜索class Subject / struct Subject都无结果,搜索protocol Subject获取到SubjectType协议,其中有asObserver方法,并且遵循ObservableType ->遵循->ObservableConvertibleType, 也就有了asObservable方法。

SujectType
所以遵循SubjectType的序列同时有了序列和观察者相应的技能。在实际开发中应用很频繁,这样就能自己订阅自己,自己响应自己的订阅了。
下面我们就来使用及分析几种常用的Subject。
1、PublishSubject

先来看一段使用的示例代码:

      // 1:初始化序列
        let publishSub = PublishSubject<Int>() //初始化一个PublishSubject 装着Int类型的序列
        // 2:发送响应序列
        publishSub.onNext(1)
        // 3:订阅序列
        publishSub.subscribe { print("订阅到了:", $0) }
            .disposed(by: disposbag)
        // 再次发送响应
        publishSub.onNext(2)
        publishSub.onNext(3)
//输出:订阅到了: next(2)
//订阅到了: next(3)

查看输出,只打印了2和3,那为什么1没有打印呢?
上一篇文章RxSwift - 高阶函数中关于publish的使用分析,其中publish就是对PublishSubject的封装,证明了publish就是将多个观察者交由唯一的PublishSubject来管理,PublishSubject将所有观察者.on方法保存,并顺序地响应外界的订阅事件。
那么此处关于1为响应的问题,还是得分析订阅.subscribePublishSubject.on关键代码:

subscribe
PublishSubject.on
此处.onNext(1)执行时,暂未有过.subscribe订阅,所以暂不会生成任何观察者,即PublishSubject中的observers中啥也没有,也就不会有任何的observer.on响应任何的事件。那么.onNext(1)也就无从响应。
publishSub.subscribe之后,才有了观察者,observers中也就会将观察者.on保存以响应后续事件。
2、BehaviorSubject
        let behaviorSub = BehaviorSubject.init(value: 100)
        // 2:发送信号
        //behaviorSub.onNext(2)
        //behaviorSub.onNext(3)
        // 3:订阅序列
        behaviorSub.subscribe{ print("订阅到了1:", $0) }
            .disposed(by: disposbag)
        // 再次发送
        //behaviorSub.onNext(4)
        //behaviorSub.onNext(5)
        // 再次订阅
        behaviorSub.subscribe{ print("订阅到了:", $0) }
            .disposed(by: disposbag)
//输出:订阅到了1: next(100)
//订阅到了: next(100)

此时代码中的2、3、4、5都是已注释状态,那么订阅之后会响应初始值100。
打开2、3的注释结果打印:

订阅到了1: next(3)
订阅到了: next(3)

说明此时响应了最新的序列元素3。
接着打开4、5的注释结果打印:

订阅到了1: next(3)
订阅到了1: next(4)
订阅到了1: next(5)
订阅到了: next(5)

此时可以理解为,在第一次订阅之前的只响应最新的,但订阅之后的都会响应,第二次订阅响应了之前最新的序列5。
问题一:为什么会有初始值,且会保存最新的值?
问题二:相对于PublishSubject没有订阅就无法响应的问题,BehaviorSubject怎么能响应订阅之前的事件?
查看BehaviorSubject初始化方法:

BehaviorSubject初始化
发现利用一个属性变量element将初始化的值保存起来。
那这个element是怎么保存最新值的?每当.onNext()时候就会来到BehaviorSubject.on方法中,接着在synchronized_on中将最新值给element:
element赋最新值

接着我们来看订阅之后都干了什么:

.subscribe
与PublishSubject不同的是,订阅发起时候,在保存完observer.on之后,默认会执行一次observer.on即会响应之前保存最新的序列元素值。每一次.onNext的作用就是更新element的值。
3、ReplaySubject
let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
        // let replaySub = ReplaySubject<Int>.createUnbounded()

        // 2:发送信号
        replaySub.onNext(1)
        replaySub.onNext(2)
        replaySub.onNext(3)
        replaySub.onNext(4)

        // 3:订阅序列
        replaySub.subscribe{ print("订阅到了:", $0) }
            .disposed(by: disposbag)
        // 再次发送
        replaySub.onNext(7)
        replaySub.onNext(8)
        replaySub.onNext(9)
//输出:订阅到了: next(3)
//订阅到了: next(4)
//订阅到了: next(7)
//订阅到了: next(8)
//订阅到了: next(9)

能打印3、4以及对应bufferSize: 2可知,会保存下最新的有效的两次序列元素。
我们直接来到.create方法:

.create
其中还对缓存个数是否为1做了区分。ReplayOne与ReplayMany有共同的祖先类ReplayBufferBase,所以我们来到ReplayBufferBase:
ReplayBufferBase
所以查看订阅以及响应可直接看ReplayBufferBase中的实现方法。重点看ReplayBufferBase中的onsubscribe两个方法实现:
synchronized_on
addValueToBuffer字面意思就是将值存入缓存,重点!!!
分别查看在子类ReplayOne和ReplayManyBase中的实现:
override func addValueToBuffer(_ value: Element) {
        self.value = value
    }
override func addValueToBuffer(_ value: Element) {
        self.queue.enqueue(value)
    }

ReplayOne只是将当前值保存在属性self.value中,当订阅发起时就会调用.replayBuffer方法,便会执行当先观察者.on触发事件,并将当前保存的value发射出去:

ReplayOne

ReplayManyBase则是将值保存在值保存在一个集合属性中,通过添加及更新集合中的元素来达到只缓存相应个数的元素。当订阅发起时,就会从集合中将观察者一个一个拿出来触发.on方法。

ReplayManyBase.replayBuffer
缓存机制分析
当订阅发起之前,ReplayManyBase将值保存到集合中,集合中的queue容量大小是缓存大小加1,订阅前一开始能存下1、2、3,此时会触发trim(),即会调用删除queue中元素queue为nil、2、3,当要存4时候,按照替换index为0->1->2->0这样的循环顺序将后续数值更新进queue中,所以此时的queue由是4、2、3,此时也会触发trim(),删除旧的值2,queue此时为4、nil、3,此时订阅发起,执行self.replayBuffer(anyObserver),所以会遍历queue中的item用于响应事件,断点调试为逆序遍历即先发射3再发射4。
当后续7来到时直接走的.on方法,当7来时,此时queue为4、7、3,会直接走dispatch(self.synchronized_on(event), event)即: dispatch
接着会queue为4、7、nil,然后发射当前的7出去。
当8来时,queue为4、7、8,然后调用dequeue()后queue为nil、7、8,也只会响应oberver.on(event),那么也只会发射8出去。
当9来时,queue为9、7、8,然后调用dequeue(),最后queue为9、nil、8。
所以queue变化为:( 1, 2, 3) -> ( nil, 2, 3) -> ( 4, 2, 3) -> ( 4, nil, 3) -> ( 4, 7, 3) -> ( 4, 7, nil) -> ( 4, 7, 8) -> ( nil, 7, 8) -> ( 9, 7, 8) -> ( 9, nil, 8)
发射的顺序还是按响应的先后顺序发射。
4、AsyncSubject
// 1:创建序列
        let asynSub = AsyncSubject<Int>.init()
        // 2:发送信号
        asynSub.onNext(1)
        asynSub.onNext(2)
        // 3:订阅序列
        asynSub.subscribe{ print("订阅到了:", $0) }
            .disposed(by: disposbag)
        // 再次发送
        asynSub.onNext(3)
        asynSub.onNext(4)
        asynSub.onError(NSError.init(domain: "LcrError", code: 10085, userInfo: nil))
        asynSub.onCompleted()
//输出:订阅到了: error(Error Domain=LcrError Code=10085 "(null)")

发现当有错误响应时,前面的响应都无效了。
将错误注释,再次打印结果为:

订阅到了: next(4)
订阅到了: completed

然后再给onCompleted添加注释,发现没有打印任何东西。
里面的工作原理是怎样的呢?
我们把目光聚集在.on方法上:

.on
synchronized_on
.onNext来时,synchronized_on中会将当前值设为最新值,意思就是只保留最新的值,然后判断.completed有没有来。没有的话就返回(Observers(),.completed)到.on中,即此时没有有效观察者,即使dispatch进去了也啥也不做。
.onError来时,会将前面保存的观察者全部删除,即使回到.on中也只会发射错误信息。
.completed来时,会将保存的最新的值返回放入.next返回到.on去的.next:中去执行,最终也会调用.completed。

所以一定需要有.completed来且不能发射错误,才能正常响应订阅信息。

5、BehaviorRelay

Variable : 5.0已经废弃(BehaviorRelay 替换)

// 1:创建序列
        let variableSub = BehaviorRelay.init(value: 1)
        // 2:发送信号
        variableSub.accept(100)
        variableSub.accept(10)
        // 3:订阅信号
        variableSub.asObservable().subscribe{ print("订阅到了:", $0) }
            .disposed(by: disposbag)
        print("打印:\(variableSub.value)")
        // 再次发送
        variableSub.accept(1000)
//输出:订阅到了: next(10)
//打印:10
//订阅到了: next(1000)

BehaviorRelay不仅可以缓存最新的元素,还可以通过属性访问到最新缓存的元素。


BehaviorRelay

源码可知,BehaviorRelay就是内部封装了一个BehaviorSubject类型的序列subject,利用BehaviorSubject能缓存以及能访问属性value的特性来实现属性访问。

上一篇下一篇

猜你喜欢

热点阅读