03. RxSwift源码解读:Sink 和 Queue Sch
今天要解读的源码是队列调度,同时探讨下Sink的设计思想,我们可以指定订阅和序列发送操作在哪个队列上执行,以observe(on:)
和subscribe(on:)
两个操作符为例,探究一下内部原理。
observe
指定在哪个队列接受序列,而subscribe
是指定创建序列的闭包在哪个队列执行。下面给出一个例子:
示例
Observable<Int>.create { (anyObserver) -> Disposable in
print("Subscribe Thread:", Thread.current)
anyObserver.onNext(1)
anyObserver.onCompleted()
return Disposables.create()
}
.observe(on: MainScheduler.instance)
.subscribe(on: SerialDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { ele in
print("Observe Thread:", Thread.current)
print(ele)
}, onDisposed: {
print("disposed2")
})
.disposed(by: bag)
打印结果:
Subscribe Thread: <NSThread: 0x600003c45380>{number = 5, name = (null)}
Observe Thread: <NSThread: 0x600003c047c0>{number = 1, name = main}
1
disposed2
可见因为指定subscribe
在SerialDispatchQueueScheduler
(串行队列执行),所以第一条打印的线程不是主线程,而observe
指定在主队列执行,所以第二条打印的线程是主线程。
源码解读
我们看一下.observe(on: MainScheduler.instance)
的内部实现,在ObservableType的extension中可以找到代码:
public func observe(on scheduler: ImmediateSchedulerType)
-> Observable<Element> {
guard let serialScheduler = scheduler as? SerialDispatchQueueScheduler else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
return ObserveOnSerialDispatchQueue(source: self.asObservable(),
scheduler: serialScheduler)
}
如果是同步队列调度者,则返回ObserveOnSerialDispatchQueue
对象,将当前Observable
和scheduler
传入,否则返回ObserveOn
对象,这两个类都是Observable
,它们都继承了Producer
,通过这种方式实现链式调用,可以继续调用其他操作符,每个操作符都内部都有对应的ObservableType实现类,它们一般会重写run方法,而且还有对应的Sink
类,用来实现操作符的功能,比如ObserveOn
类有一个对应的ObserveOnSink
类,ObserveOnSink
有自己的run
方法,同样ObserveOnSerialDispatchQueue
类有对应的ObserveOnSerialDispatchQueueSink
类,这些Sink
类继承自ObserverBase
类,最终都实现了ObserverType
,能发送序列。
在看看subscribe(on:)
public func subscribe(on scheduler: ImmediateSchedulerType)
-> Observable<Element> {
SubscribeOn(source: self, scheduler: scheduler)
}
同理返回一个SubscribeOn
对象,将当前对象和scheduler
传入。SubscribeOn
继承自Producer
,对应有一个SubscribeOnSink
类,它继承自Sink
类;因为它需要使用Sink
的forwardOn
方法。
当我们调用.subscribe(onNext
方法时,程序依然会走到Producer
的subscribe
方法,这个流程没有变,然后调用当前对象run
方法,因为当前对象已经再是AnonymousObservable
对象了,而是 SubscribeOn
, 而且SubscribeOn
重写了run
方法,所以会调用SubscribeOn
的run
方法,这是面向对象的多态,然后我们进入SubscribeOn
的run
方法看看:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
这里依然会创建sink
对象,但是不再是AnonymousObservableSink
,而是SubscribeOnSink
,它来完成这个操作符的功能;这个Sink
保存了SubscribeOn
对象,observer
和cancel
,observer
依然还是刚开始创建的AnonymousObserver
对象,然后调用了SubscribeOnSink
的run
方法:
func run() -> Disposable {
let disposeEverything = SerialDisposable()
let cancelSchedule = SingleAssignmentDisposable()
disposeEverything.disposable = cancelSchedule
let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return Disposables.create()
}
cancelSchedule.setDisposable(disposeSchedule)
return disposeEverything
}
SerialDisposable
: 表示一个可释放资源,其底层可释放资源可被另一个可释放资源替换,从而导致前一个底层可释放资源的自动释放。
SingleAssignmentDisposable
: 表示只允许对其底层可释放资源进行一次赋值的可释放资源。如果已经设置了底层可释放资源,那么将来尝试设置底层可释放资源将抛出异常。
ScheduledDisposable
:释放资源时会在对应的队列中调度执行。
关键代码:self.parent.scheduler.schedule
, 这里parent是SubscribeOn对象,scheduler是调度者SerialDispatchQueueScheduler,然后调用它的schedule方法,去看看:
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
self.configuration.schedule(state, action: action)
}
调到self.configuration.schedule
:
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
self.queue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
终于看到队列调用了self.queue.async
将action异步派发到quene
中执行,而queue
是在创建SerialDispatchQueueScheduler
时创建的, SerialDispatchQueueScheduler
明显会创建串行队列。这里有个小细节,如果资源已经被释放了则不执行。
cancel.setDisposable(action(state))
设置释放资源对象,如果已经设置则抛出异常。
action 在哪里, 回到action定义的地方?:
let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return Disposables.create()
}
接着执行self.parent.source.subscribe(self)
, 这个source是subscibe(:on,的调用者,即在链式调用序列中,先调用observe(:on)
再调用subscibe(:on)
所以source
是ObserveOnSerialDispatchQueue
对象。
很多observable都会保存它的上一个observable,即source, 以此实现链式调用
ObserveOnSerialDispatchQueue对象现在要执行subscribe方法,进入看看:
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
又到这里来了,然又进入run,这里会进入ObserveOnSerialDispatchQueue的run方法:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
这个地方代码不太一样了,不再是调用sink
的run
,因为ObserveOnSerialDispatchQueueSink
只能处理observe
而不能处理subscribe
,只能转发给source
处理subscribe
,所以调用self.source.subscribe
, 这里的source
是最初创建的未变形的AnonymousObservable
对象,这相当于绕了一圈又回到了原来的流程,调用subscribe(sink)
,但是这里的sink是ObserveOnSerialDispatchQueueSink
对象,它是Observer
,看看这个会怎么影响后面的流程:
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
这里调到了前面的if分支:因为
isScheduleRequired
用来标示在当前线程是否正在通过schedule
执行action,如果
action内部又在当前线程执行了
subscribe,则无需再调度到当前线程执行,即CurrentThreadScheduler.instance.schedule
不会也没必要嵌套调用。
继续调用AnonymousObservable
的run
方法,然后调用创建AnonymousObservableSink
,调用run
,最后执行subscribeHandler(AnyObserver(self))
,这里回到了熟悉的流程,不过因为Observer
是ObserveOnSerialDispatchQueueSink
对象,所以发送序列时会调用ObserveOnSerialDispatchQueueSink
的onCore
方法:
override func onCore(_ event: Event<Element>) {
_ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
}
这里会做队列调度,在对应的队列(主队列)中执行action :
self.cachedScheduleLambda = { pair in
guard !cancel.isDisposed else { return Disposables.create() }
pair.sink.observer.on(pair.event)
if pair.event.isStopEvent {
pair.sink.dispose()
}
return Disposables.create()
}
接着调用observer.on(event)
,这里的sink
就是当前这个ObserveOnSerialDispatchQueueSink
对象,而observer
是在创建ObserveOnSerialDispatchQueueSink
对象时传入的SubscribeOnSink
,SubscribeOnSink
的observer
对象是AnonymousObserver
,因为它们都遵循了ObserverType
,所以都可以作为ObserverType
被其他Sink
持有。
各类
Sink
遵循了ObserverType
又持有了ObserverType
,这样Sink
之间可以相互持有,调用协议方法时又可以调用observer的相同协议方法,这样可以一直调用下去,跟装饰器模式很像,当我需要在现有操作符基础上再增加操作,无需修改原有操作符的逻辑代码,通过扩展方式增加新的功能,不过这里的设计更复杂。
我们继续看代码,SubscribeOnSink
实现了on
方法,进去看看:
func on(_ event: Event<Element>) {
self.forwardOn(event)
if event.isStopEvent {
self.dispose()
}
}
然后进入Sink
类的forwardOn
:在forwardOn
中又又调用了observer.on(event)
, 这里observer
是AnonymousObserver
,最终调用AnonymousObserver
的onCore
,完成最后一击:
调用self.eventHandler(event)
如果将
observe(:on)
和subscribe(:on)
互换位置,subscribe(:on)
先调用,observe(:on)
后调用结果会怎么样? 结果依然不变,但是内部调用流程会不一样,大家可以自己试试!
Scheduler
上面的例子中已经见到了两个Scheduler:
-
SerialDispatchQueueScheduler
: 串行队列调度者, -
MainScheduler
: 主队列调度者,它继承自SerialDispatchQueueScheduler
还有一个并行队列: -
ConcurrentDispatchQueueScheduler
: 并行队列调度者
每个调度者都会维护自己的队列,MainScheduler
维护一个主队列DispatchQueue.main
,SerialDispatchQueueScheduler
维护一个串行队列,ConcurrentDispatchQueueScheduler
维护一个并行队列; 当需要执行action时,调度对应的队列执行action即可。
这三个调度者都遵循了SchedulerType协议,SchedulerType
继承ImmediateSchedulerType
,ImmediateSchedulerType
有一个协议方法:
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable
用来执行队列调度。
Scheduler包含一个DispatchQueueConfiguration对象,DispatchQueueConfiguration对象持有队列,当执行调度方法时,会转发到这个类的schedule进行实际的调度。