RxSwift - Suject
前言
RxSwift中Subject
是一种非常特殊的序列协议,全局搜索class Subject / struct Subject都无结果,搜索protocol Subject获取到SubjectType
协议,其中有asObserver
方法,并且遵循ObservableType ->遵循->ObservableConvertibleType, 也就有了asObservable
方法。
所以遵循SubjectType的序列同时有了序列和观察者相应的技能。在实际开发中应用很频繁,这样就能自己订阅自己,自己响应自己的订阅了。
下面我们就来使用及分析几种常用的Subject。
1、PublishSubject
先来看一段使用的示例代码:
// 1:初始化序列
let publishSub = PublishSubject<Int>() //初始化一个PublishSubject 装着Int类型的序列
// 2:发送响应序列
publishSub.onNext(1)
// 3:订阅序列
publishSub.subscribe { print("订阅到了:", $0) }
.disposed(by: disposbag)
// 再次发送响应
publishSub.onNext(2)
publishSub.onNext(3)
//输出:订阅到了: next(2)
//订阅到了: next(3)
查看输出,只打印了2和3,那为什么1没有打印呢?
上一篇文章RxSwift - 高阶函数中关于publish
的使用分析,其中publish就是对PublishSubject的封装,证明了publish就是将多个观察者交由唯一的PublishSubject来管理,PublishSubject将所有观察者.on
方法保存,并顺序地响应外界的订阅事件。
那么此处关于1为响应的问题,还是得分析订阅.subscribe
及PublishSubject.on
关键代码:
PublishSubject.on
此处.onNext(1)执行时,暂未有过
.subscribe
订阅,所以暂不会生成任何观察者,即PublishSubject中的observers中啥也没有,也就不会有任何的observer.on响应任何的事件。那么.onNext(1)也就无从响应。当
publishSub.subscribe
之后,才有了观察者,observers中也就会将观察者.on保存以响应后续事件。
2、BehaviorSubject
let behaviorSub = BehaviorSubject.init(value: 100)
// 2:发送信号
//behaviorSub.onNext(2)
//behaviorSub.onNext(3)
// 3:订阅序列
behaviorSub.subscribe{ print("订阅到了1:", $0) }
.disposed(by: disposbag)
// 再次发送
//behaviorSub.onNext(4)
//behaviorSub.onNext(5)
// 再次订阅
behaviorSub.subscribe{ print("订阅到了:", $0) }
.disposed(by: disposbag)
//输出:订阅到了1: next(100)
//订阅到了: next(100)
此时代码中的2、3、4、5都是已注释状态,那么订阅之后会响应初始值100。
打开2、3的注释结果打印:
订阅到了1: next(3)
订阅到了: next(3)
说明此时响应了最新的序列元素3。
接着打开4、5的注释结果打印:
订阅到了1: next(3)
订阅到了1: next(4)
订阅到了1: next(5)
订阅到了: next(5)
此时可以理解为,在第一次订阅之前的只响应最新
的,但订阅之后的都会响应,第二次订阅响应了之前最新的序列5。
问题一
:为什么会有初始值,且会保存最新的值?
问题二
:相对于PublishSubject没有订阅就无法响应的问题,BehaviorSubject怎么能响应订阅之前的事件?
查看BehaviorSubject初始化方法:
发现利用一个属性变量
element
将初始化的值保存起来。那这个element是怎么保存最新值的?每当.onNext()时候就会来到BehaviorSubject.on方法中,接着在synchronized_on中将最新值给element:
element赋最新值
接着我们来看订阅之后都干了什么:
与PublishSubject不同的是,订阅发起时候,在保存完observer.on之后,默认会执行一次observer.on即会响应之前保存最新的序列元素值。每一次.onNext的作用就是
更新element
的值。
3、ReplaySubject
let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
// let replaySub = ReplaySubject<Int>.createUnbounded()
// 2:发送信号
replaySub.onNext(1)
replaySub.onNext(2)
replaySub.onNext(3)
replaySub.onNext(4)
// 3:订阅序列
replaySub.subscribe{ print("订阅到了:", $0) }
.disposed(by: disposbag)
// 再次发送
replaySub.onNext(7)
replaySub.onNext(8)
replaySub.onNext(9)
//输出:订阅到了: next(3)
//订阅到了: next(4)
//订阅到了: next(7)
//订阅到了: next(8)
//订阅到了: next(9)
能打印3、4以及对应bufferSize: 2
可知,会保存下最新的有效的两次序列元素。
我们直接来到.create方法:
其中还对缓存个数是否为1做了区分。ReplayOne与ReplayMany有共同的祖先类
ReplayBufferBase
,所以我们来到ReplayBufferBase:ReplayBufferBase
所以查看订阅以及响应可直接看ReplayBufferBase中的实现方法。重点看ReplayBufferBase中的
on
和subscribe
两个方法实现:synchronized_on
addValueToBuffer
字面意思就是将值存入缓存,重点!!!分别查看在子类ReplayOne和ReplayManyBase中的实现:
override func addValueToBuffer(_ value: Element) {
self.value = value
}
override func addValueToBuffer(_ value: Element) {
self.queue.enqueue(value)
}
ReplayOne只是将当前值保存在属性self.value
中,当订阅发起时就会调用.replayBuffer
方法,便会执行当先观察者.on触发事件,并将当前保存的value发射出去:
ReplayManyBase则是将值保存在值保存在一个集合
属性中,通过添加及更新集合中的元素来达到只缓存相应个数的元素。当订阅发起时,就会从集合中将观察者一个一个拿出来触发.on
方法。
缓存机制分析
当订阅发起之前,ReplayManyBase将值保存到集合中,集合中的
queue
容量大小是缓存大小加1
,订阅前一开始能存下1、2、3,此时会触发trim()
,即会调用删除queue中元素queue为nil、2、3,当要存4时候,按照替换index为0->1->2->0这样的循环顺序将后续数值更新
进queue中,所以此时的queue由是4、2、3,此时也会触发trim(),删除旧的值2,queue此时为4、nil、3,此时订阅发起,执行self.replayBuffer(anyObserver),所以会遍历queue中的item用于响应事件,断点调试为逆序遍历即先发射3再发射4。当后续7来到时直接走的.on方法,当7来时,此时queue为4、7、3,会直接走
dispatch(self.synchronized_on(event), event)
即:
dispatch
接着会queue为4、7、nil,然后发射当前的7出去。
当8来时,queue为4、7、8,然后调用dequeue()后queue为nil、7、8,也只会响应oberver.on(event),那么也只会发射8出去。
当9来时,queue为9、7、8,然后调用dequeue(),最后queue为9、nil、8。
所以queue变化为:
( 1, 2, 3) -> ( nil, 2, 3) -> ( 4, 2, 3) -> ( 4, nil, 3) -> ( 4, 7, 3) -> ( 4, 7, nil) -> ( 4, 7, 8) -> ( nil, 7, 8) -> ( 9, 7, 8) -> ( 9, nil, 8)
发射的顺序还是按响应的先后顺序发射。
4、AsyncSubject
// 1:创建序列
let asynSub = AsyncSubject<Int>.init()
// 2:发送信号
asynSub.onNext(1)
asynSub.onNext(2)
// 3:订阅序列
asynSub.subscribe{ print("订阅到了:", $0) }
.disposed(by: disposbag)
// 再次发送
asynSub.onNext(3)
asynSub.onNext(4)
asynSub.onError(NSError.init(domain: "LcrError", code: 10085, userInfo: nil))
asynSub.onCompleted()
//输出:订阅到了: error(Error Domain=LcrError Code=10085 "(null)")
发现当有错误响应时,前面的响应都无效了。
将错误注释,再次打印结果为:
订阅到了: next(4)
订阅到了: completed
然后再给onCompleted
添加注释,发现没有打印任何东西。
里面的工作原理是怎样的呢?
我们把目光聚集在.on
方法上:
synchronized_on
当
.onNext
来时,synchronized_on
中会将当前值设为最新值,意思就是只保留最新的值,然后判断.completed有没有来。没有的话就返回(Observers(),.completed)
到.on中,即此时没有有效观察者,即使dispatch进去了也啥也不做。当
.onError
来时,会将前面保存的观察者全部删除
,即使回到.on中也只会发射错误信息。当
.completed
来时,会将保存的最新的值返回放入.next返回到.on去的.next:
中去执行,最终也会调用.completed。
所以一定需要有.completed来且不能发射错误,才能正常响应订阅信息。
5、BehaviorRelay
Variable : 5.0已经废弃(BehaviorRelay 替换)
// 1:创建序列
let variableSub = BehaviorRelay.init(value: 1)
// 2:发送信号
variableSub.accept(100)
variableSub.accept(10)
// 3:订阅信号
variableSub.asObservable().subscribe{ print("订阅到了:", $0) }
.disposed(by: disposbag)
print("打印:\(variableSub.value)")
// 再次发送
variableSub.accept(1000)
//输出:订阅到了: next(10)
//打印:10
//订阅到了: next(1000)
BehaviorRelay不仅可以缓存最新的元素,还可以通过属性访问到最新缓存的元素。
BehaviorRelay
源码可知,BehaviorRelay就是内部封装了一个BehaviorSubject类型的序列subject,利用BehaviorSubject能缓存以及能访问属性value的特性来实现属性访问。