RxSwift中的Scheduler调度者(下)

2019-08-18  本文已影响0人  简_爱SimpleLove

上一篇文章我们提到了subscribeOnobserveOn两个重要函数:

下面我们先来探究observeOn的具体调度流程,来探究调度者的原理。

observeOn的实现流程

我们先来看下面一个简单的测试代码:

        print("当前测试方法中的线程:\(Thread.current)")
Observable.of(1,2,3,4,5)
         .observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "observeOnSerial"))
         .subscribe{print("observeOn",$0,Thread.current)}
         .disposed(by: self.bag)
        /*
         输出结果:
         当前测试方法中的线程:<NSThread: 0x600001486800>{number = 1, name = main}
         observeOn next(1) <NSThread: 0x6000014d9a40>{number = 3, name = (null)}
         observeOn next(2) <NSThread: 0x6000014d9a40>{number = 3, name = (null)}
         observeOn next(3) <NSThread: 0x6000014d9a40>{number = 3, name = (null)}
         observeOn next(4) <NSThread: 0x6000014d9a40>{number = 3, name = (null)}
         observeOn next(5) <NSThread: 0x6000014d9a40>{number = 3, name = (null)}
         observeOn completed <NSThread: 0x6000014d9a40>{number = 3, name = (null)}
         */

上面代码表示:一个of函数创建的源序列,订阅了一个打印元素和当前线程的任务,并且要求观察者回调,也就是说打印回来,必须要在名字为observeOnSerial的串行队列里面

代码运行流程:

1、来到Observable.of(1,2,3,4,5)序列的创建方法:

    public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
        return ObservableSequence(elements: elements, scheduler: scheduler)
    }

2、ObservableSequence序列的初始化:

final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
    fileprivate let _elements: Sequence
    fileprivate let _scheduler: ImmediateSchedulerType
    init(elements: Sequence, scheduler: ImmediateSchedulerType) {
        self._elements = elements    // 元素 1 2 3 4 5 6 7 8 9 10
        self._scheduler = scheduler  // 主队列 初始化时的默认值ImmediateSchedulerType = CurrentThreadScheduler.instance
    }
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

3、序列创建完成过后,就来到最外层的observeOn方法:

.observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "observeOnSerial"))

4、然后来到参数中SerialDispatchQueueScheduler的初始化

    init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        // 初始化的时候,保存了一个串行队列
        self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
    }
    public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
        serialQueueConfiguration?(queue)
        self.init(serialQueue: queue, leeway: leeway)
    }

5、点击observeOn函数进去:

public func observeOn(_ scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
            // 判断是串行队列,还是并行队列
            if let scheduler = scheduler as? SerialDispatchQueueScheduler {
                // 传进去两个参数 : 源序列和队列
                return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
            }
            else {
                return ObserveOn(source: self.asObservable(), scheduler: scheduler)
            }
    }

6、点击ObserveOnSerialDispatchQueue进去:

final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
    let scheduler: SerialDispatchQueueScheduler
    let source: Observable<Element>

    init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
        self.scheduler = scheduler  // 保存传进来的规定串行队列
        self.source = source        // 保存源序列 ObservableSequence
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink) // 源序列订阅 ObserveOnSerialDispatchQueueSink
        return (sink: sink, subscription: subscription)
    }
}

7、然后就来到了最外层的订阅:

            .subscribe{print("observeOn",$0,Thread.current)}

8、来到了ObservableTypesubscribe方法的实现:

    public func subscribe(_ on: @escaping (Event<Element>) -> Void)
        -> Disposable {
            let observer = AnonymousObserver { e in
                on(e)
            }
            return self.asObservable().subscribe(observer)
    }

9、由第6步知道:ObserveOnSerialDispatchQueue继承自Producer,所以来到Producer的订阅方法:

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    // 当前线程被调度过,也就是被赋过值,isScheduleRequired属性里面的key不为nil
    // 所以就在当前调度环境,也就是当前线程中执行括号中的代码
    if !CurrentThreadScheduler.isScheduleRequired {
        // The returned disposable needs to release all references once it was disposed.
        let disposer = SinkDisposer()
        let sinkAndSubscription = self.run(observer, cancel: disposer)
        disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

        return disposer
    }
    else {
        // 没有被调度过,就初始化当前线程,并在这里面调度
        return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
    }
}

10、CurrentThreadSchedulerschedule方法

/**
    Schedules an action to be executed as soon as possible on current thread.

    If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be
    automatically installed and uninstalled after all work is performed.
    */
    public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        // 没有被调度过,也就是没有被赋值过
        if CurrentThreadScheduler.isScheduleRequired {
            // 这次被调度过了,所以赋值为false,在Producer中就不需要走CurrentThreadScheduler的初始化了
            CurrentThreadScheduler.isScheduleRequired = false

            // 走前面传进来的闭包
            let disposable = action(state)

            defer {
                // 延迟执行  也就是说在 return scheduledItem 返回事物过后,即所有工作都完成过后,才会走这里,清空当前调度环境,因为下次任务的调度环境可能会不同
                CurrentThreadScheduler.isScheduleRequired = true
                CurrentThreadScheduler.queue = nil
            }

            guard let queue = CurrentThreadScheduler.queue else {
                return disposable
            }

            // 事物排队出队列
            while let latest = queue.value.dequeue() {
                if latest.isDisposed {
                    continue
                }
                latest.invoke() // 销毁
            }

            return disposable
        }

        let existingQueue = CurrentThreadScheduler.queue

        let queue: RxMutableBox<Queue<ScheduledItemType>>
        if let existingQueue = existingQueue {  // 判断 CurrentThreadScheduler 有队列,就赋值给queue
            queue = existingQueue
        }
        else {
            // 要是CurrentThreadScheduler 没有队列,就将赋值
            queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
            CurrentThreadScheduler.queue = queue
        }

        let scheduledItem = ScheduledItem(action: action, state: state)
        queue.value.enqueue(scheduledItem) // 事物排队进入队列

        return scheduledItem
    }
}

11、执行action(state)闭包

      return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }

12、ObserveOnSerialDispatchQueuerun方法

 override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink) // 源序列订阅 ObserveOnSerialDispatchQueueSink
        return (sink: sink, subscription: subscription)
    }

13、由第2步知道,源序列ObservableSequence也继承自Producer,所以又来到Producer的订阅方法

if !CurrentThreadScheduler.isScheduleRequired {
        // The returned disposable needs to release all references once it was disposed.
        let disposer = SinkDisposer()
        let sinkAndSubscription = self.run(observer, cancel: disposer)
        disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

        return disposer
    }

14、ObservableSequencerun方法

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }

15、ObservableSequenceSinkrun方法

    func run() -> Disposable {
        // scheduleRecursive 调度递归  self._parent._elements.makeIterator() 用iterator来装这些元素
        return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
            var mutableIterator = iterator   // 迭代器,里面包含了源序列的所以元素
            if let next = mutableIterator.next() { // 迭代器中还有下一个元素就递归
                // 源序列发送响应 源序列发送出来的时候,都是为主队列
                // 源序列发送响应 最后肯定去到ObserveOnSerialDispatchQueueSink的on方法
                self.forwardOn(.next(next))
                recurse(mutableIterator)     // 将迭代器传进去,再重新走这个闭包
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }

16、先进入scheduleRecursive方法

public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
        let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
        recursiveScheduler.schedule(state)  // 里面肯定有闭包action的执行,然后action闭包中源序列才能发送事件
        return Disposables.create(with: recursiveScheduler.dispose)
    }

17、RecursiveImmediateSchedulerschedule

 if let action = action {
                action(state, self.schedule)  // 执行外界递归调度那里的闭包
            }

18、来到ObserveOnSerialDispatchQueueSinkon方法,也就来到onCore方法

override func onCore(_ event: Event<Element>) {
        // self.scheduler 规定的串行队列  用这个队列去调度这个事件,就保证了observeOn的观察者回调是在特定线程
        _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
    }

19、SerialDispatchQueueSchedulerschedule方法

 public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        return self.scheduleInternal(state, action: action)
    }

    func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        return self.configuration.schedule(state, action: action)
    }

20、self.configurationschedule方法

extension DispatchQueueConfiguration {
    func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()

        // 传进来队列(可以是串行队列,也可以是并行队列)的异步执行
        // 将传进来的闭包即action异步执行
        // 就是说在特定的队列中执行闭包操作,也就是:observeOn的观察者回调,以及subscribeOn的订阅
        self.queue.async {
            if cancel.isDisposed {
                return
            }
            // action(state) 调用外面传进来的闭包
            // action 是ObserveOnSerialDispatchQueueSink中的cachedScheduleLambda
            cancel.setDisposable(action(state))
        }

        return cancel
    }
}

21、action闭包执行,也就是cachedScheduleLambda闭包执行

self.cachedScheduleLambda = { pair in
            guard !cancel.isDisposed else { return Disposables.create() }

            pair.sink.observer.on(pair.event) // 调用ObserveOnSerialDispatchQueueSink中的observer.on方法,即最外层的打印

            if pair.event.isStopEvent {
                pair.sink.dispose()
            }
            return Disposables.create()
        }

22、方法回收到源序列发送事件

return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
            var mutableIterator = iterator   // 迭代器,里面包含了源序列的所以元素
            if let next = mutableIterator.next() { // 迭代器中还有下一个元素就递归
                // 源序列发送响应 源序列发送出来的时候,都是为主队列
                // 源序列发送响应 最后肯定去到ObserveOnSerialDispatchQueueSink的on方法
                self.forwardOn(.next(next))
                recurse(mutableIterator)     // 将迭代器传进去,再重新走这个闭包
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }

至此流程分析结束

observeOn和subscribeOn的区别

上面我们知道observeOn有两种情况:

传进来的是串行队列:

传进来的是并发队列:

subscribeOn

subscribeOn的流程和observeOn基本上一样,只是它并没有区分传进来的队列是串行的还是并发的,都统一只是创建了一个中间序列subscribeOn

然后来到关键地方:subscribeOn并没有在中间序列subscribeOnrun放中进行源序列订阅SubscribeOnSink,而是在SubscribeOnSinkrun方法中订阅,代码如下:

 func run() -> Disposable {
        let disposeEverything = SerialDisposable()
        let cancelSchedule = SingleAssignmentDisposable()
        
        disposeEverything.disposable = cancelSchedule
        
        let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
            // 在这个闭包中进行 源序列订阅sink
            // SubscribeOn这样就保证了订阅在特定线程
            let subscription = self.parent.source.subscribe(self)
            disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
            return Disposables.create()
        }

        cancelSchedule.setDisposable(disposeSchedule)

        return disposeEverything
    }
self.queue.async {
            if cancel.isDisposed {
                return
            }
            cancel.setDisposable(action(state))
        }

特定队列下异步执行的闭包中执行:
self.parent.source.subscribe(self)源序列订阅SubscribeOnSink操作,就保证了订阅在特定线程。

总结:

调度者Scheduler就是对线程进行的一次封装。
subscribeOnobserveOn和前面的map函数类似,不是源序列直接订阅,而是源序列先订阅的中间层sink,在这层sink里面进行了在特定的队列中,用源序列响应给sink的元素来执行事件的响应event
即订阅里面又套了一个订阅,在套的那个订阅里面进行特定队列的处理

上一篇 下一篇

猜你喜欢

热点阅读