RxSwiftRx

03. RxSwift源码解读:Sink 和 Queue Sch

2021-05-26  本文已影响0人  Oceanj

今天要解读的源码是队列调度,同时探讨下Sink的设计思想,我们可以指定订阅和序列发送操作在哪个队列上执行,以observe(on:)subscribe(on:)两个操作符为例,探究一下内部原理。
observe指定在哪个队列接受序列,而subscribe是指定创建序列的闭包在哪个队列执行。下面给出一个例子:

示例

        Observable<Int>.create { (anyObserver) -> Disposable in
            print("Subscribe Thread:", Thread.current)
            anyObserver.onNext(1)
            anyObserver.onCompleted()
            return Disposables.create()
        }
        .observe(on: MainScheduler.instance)
        .subscribe(on: SerialDispatchQueueScheduler(qos: .background))
        .subscribe(onNext: { ele in
            print("Observe Thread:", Thread.current)
            print(ele)
        }, onDisposed: {
            print("disposed2")
        })
        .disposed(by: bag)

打印结果:

Subscribe Thread: <NSThread: 0x600003c45380>{number = 5, name = (null)}
Observe Thread: <NSThread: 0x600003c047c0>{number = 1, name = main}
1
disposed2

可见因为指定subscribeSerialDispatchQueueScheduler(串行队列执行),所以第一条打印的线程不是主线程,而observe指定在主队列执行,所以第二条打印的线程是主线程。

源码解读

我们看一下.observe(on: MainScheduler.instance)的内部实现,在ObservableType的extension中可以找到代码:

    public func observe(on scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
        guard let serialScheduler = scheduler as? SerialDispatchQueueScheduler else {
            return ObserveOn(source: self.asObservable(), scheduler: scheduler)
        }

        return ObserveOnSerialDispatchQueue(source: self.asObservable(),
                                            scheduler: serialScheduler)
    }

如果是同步队列调度者,则返回ObserveOnSerialDispatchQueue对象,将当前Observablescheduler传入,否则返回ObserveOn对象,这两个类都是Observable,它们都继承了Producer,通过这种方式实现链式调用,可以继续调用其他操作符,每个操作符都内部都有对应的ObservableType实现类,它们一般会重写run方法,而且还有对应的Sink类,用来实现操作符的功能,比如ObserveOn类有一个对应的ObserveOnSink类,ObserveOnSink有自己的run方法,同样ObserveOnSerialDispatchQueue类有对应的ObserveOnSerialDispatchQueueSink类,这些Sink类继承自ObserverBase类,最终都实现了ObserverType,能发送序列。
在看看subscribe(on:)

    public func subscribe(on scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
        SubscribeOn(source: self, scheduler: scheduler)
    }

同理返回一个SubscribeOn对象,将当前对象和scheduler传入。SubscribeOn继承自Producer,对应有一个SubscribeOnSink类,它继承自Sink类;因为它需要使用SinkforwardOn方法。

当我们调用.subscribe(onNext方法时,程序依然会走到Producersubscribe方法,这个流程没有变,然后调用当前对象run方法,因为当前对象已经再是AnonymousObservable对象了,而是 SubscribeOn, 而且SubscribeOn重写了run方法,所以会调用SubscribeOnrun方法,这是面向对象的多态,然后我们进入SubscribeOnrun方法看看:

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
        let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }

这里依然会创建sink对象,但是不再是AnonymousObservableSink,而是SubscribeOnSink,它来完成这个操作符的功能;这个Sink保存了SubscribeOn对象,observercancelobserver依然还是刚开始创建的AnonymousObserver对象,然后调用了SubscribeOnSinkrun方法:

     func run() -> Disposable {
        let disposeEverything = SerialDisposable()
        let cancelSchedule = SingleAssignmentDisposable()
        
        disposeEverything.disposable = cancelSchedule
        
        let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
            let subscription = self.parent.source.subscribe(self)
            disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
            return Disposables.create()
        }

        cancelSchedule.setDisposable(disposeSchedule)
    
        return disposeEverything
    }

SerialDisposable: 表示一个可释放资源,其底层可释放资源可被另一个可释放资源替换,从而导致前一个底层可释放资源的自动释放。
SingleAssignmentDisposable: 表示只允许对其底层可释放资源进行一次赋值的可释放资源。如果已经设置了底层可释放资源,那么将来尝试设置底层可释放资源将抛出异常。
ScheduledDisposable:释放资源时会在对应的队列中调度执行。
关键代码:self.parent.scheduler.schedule, 这里parent是SubscribeOn对象,scheduler是调度者SerialDispatchQueueScheduler,然后调用它的schedule方法,去看看:

    public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        self.scheduleInternal(state, action: action)
    }

    func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        self.configuration.schedule(state, action: action)
    }

调到self.configuration.schedule:

    func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()

        self.queue.async {
            if cancel.isDisposed {
                return
            }


            cancel.setDisposable(action(state))
        }

        return cancel
    }

终于看到队列调用了self.queue.async 将action异步派发到quene中执行,而queue是在创建SerialDispatchQueueScheduler时创建的, SerialDispatchQueueScheduler明显会创建串行队列。这里有个小细节,如果资源已经被释放了则不执行。
cancel.setDisposable(action(state)) 设置释放资源对象,如果已经设置则抛出异常。
action 在哪里, 回到action定义的地方?:

        let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
            let subscription = self.parent.source.subscribe(self)
            disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
            return Disposables.create()
        }

接着执行self.parent.source.subscribe(self), 这个source是subscibe(:on,的调用者,即在链式调用序列中,先调用observe(:on)再调用subscibe(:on) 所以sourceObserveOnSerialDispatchQueue对象。

很多observable都会保存它的上一个observable,即source, 以此实现链式调用
ObserveOnSerialDispatchQueue对象现在要执行subscribe方法,进入看看:

    return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }

又到这里来了,然又进入run,这里会进入ObserveOnSerialDispatchQueue的run方法:

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }

这个地方代码不太一样了,不再是调用sinkrun,因为ObserveOnSerialDispatchQueueSink只能处理observe而不能处理subscribe,只能转发给source处理subscribe,所以调用self.source.subscribe, 这里的source是最初创建的未变形的AnonymousObservable对象,这相当于绕了一圈又回到了原来的流程,调用subscribe(sink),但是这里的sink是ObserveOnSerialDispatchQueueSink对象,它是Observer,看看这个会怎么影响后面的流程:

if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }

这里调到了前面的if分支:因为isScheduleRequired用来标示在当前线程是否正在通过schedule执行action,如果action内部又在当前线程执行了subscribe,则无需再调度到当前线程执行,即CurrentThreadScheduler.instance.schedule不会也没必要嵌套调用。

继续调用AnonymousObservablerun方法,然后调用创建AnonymousObservableSink,调用run,最后执行subscribeHandler(AnyObserver(self)),这里回到了熟悉的流程,不过因为ObserverObserveOnSerialDispatchQueueSink对象,所以发送序列时会调用ObserveOnSerialDispatchQueueSinkonCore方法:

    override func onCore(_ event: Event<Element>) {
        _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
    }

这里会做队列调度,在对应的队列(主队列)中执行action :

        self.cachedScheduleLambda = { pair in
            guard !cancel.isDisposed else { return Disposables.create() }

            pair.sink.observer.on(pair.event)

            if pair.event.isStopEvent {
                pair.sink.dispose()
            }

            return Disposables.create()
        }

接着调用observer.on(event),这里的sink就是当前这个ObserveOnSerialDispatchQueueSink对象,而observer是在创建ObserveOnSerialDispatchQueueSink对象时传入的SubscribeOnSinkSubscribeOnSinkobserver对象是AnonymousObserver,因为它们都遵循了ObserverType,所以都可以作为ObserverType被其他Sink持有。

各类Sink遵循了ObserverType又持有了ObserverType,这样Sink之间可以相互持有,调用协议方法时又可以调用observer的相同协议方法,这样可以一直调用下去,跟装饰器模式很像,当我需要在现有操作符基础上再增加操作,无需修改原有操作符的逻辑代码,通过扩展方式增加新的功能,不过这里的设计更复杂。

我们继续看代码,SubscribeOnSink实现了on方法,进去看看:

    func on(_ event: Event<Element>) {
        self.forwardOn(event)
        
        if event.isStopEvent {
            self.dispose()
        }
    }

然后进入Sink类的forwardOn:在forwardOn又又调用了observer.on(event), 这里observerAnonymousObserver,最终调用AnonymousObserveronCore,完成最后一击:
调用self.eventHandler(event)

如果将observe(:on)subscribe(:on) 互换位置,subscribe(:on)先调用,observe(:on)后调用结果会怎么样? 结果依然不变,但是内部调用流程会不一样,大家可以自己试试!

Scheduler

上面的例子中已经见到了两个Scheduler:

Scheduler包含一个DispatchQueueConfiguration对象,DispatchQueueConfiguration对象持有队列,当执行调度方法时,会转发到这个类的schedule进行实际的调度。

上一篇下一篇

猜你喜欢

热点阅读