Rx

RxSwift源码分析(13)——Scheduler调度者

2020-10-16  本文已影响0人  无悔zero

在前面的篇章中,多次遇见Scheduler,这是一个调度者,但是都没详细说,因为想在这篇章中详细说说。

Observable.of(1, 2, 3)
.observeOn(MainScheduler.instance)  //主线程
.subscribe(onNext: { (num) in
    print(num)
}).disposed(by: disposeBag)
Observable.of(1, 2, 3)
.observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "obOnSerial"))  //串行
.subscribe(onNext: { (num) in
    print(num)
}).disposed(by: disposeBag)
Observable.of(1, 2, 3)
.observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))  //并行
.subscribe(onNext: { (num) in
    print(num)
}).disposed(by: disposeBag)
  1. 我们从调度者的初始化入手,前3个都是继承了SchedulerType协议,SchedulerType又继承了ImmediateSchedulerType
(1)MainScheduler
public final class MainScheduler : SerialDispatchQueueScheduler {
    ...
    public init() {
        self._mainQueue = DispatchQueue.main
        super.init(serialQueue: self._mainQueue)
    }

    public static let instance = MainScheduler()
    ...
}
(2)SerialDispatchQueueScheduler
public class SerialDispatchQueueScheduler : SchedulerType {
    ...
    public convenience init(qos: DispatchQoS, internalSerialQueueName: String = "rx.global_dispatch_queue.serial", leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.init(queue: DispatchQueue.global(qos: qos.qosClass), 
            internalSerialQueueName: internalSerialQueueName, 
            leeway: leeway)
    }
    ...
}
(3)ConcurrentDispatchQueueScheduler

ConcurrentDispatchQueueSchedulerSerialDispatchQueueScheduler的思路其实是一样的:

public class ConcurrentDispatchQueueScheduler: SchedulerType {
    ...
    public convenience init(qos: DispatchQoS, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.init(queue: DispatchQueue(
            label: "rxswift.queue.\(qos)",
            qos: qos,
            attributes: [DispatchQueue.Attributes.concurrent], 
            target: nil),
            leeway: leeway
        )
    }
    ...
}
(4)OperationQueueScheduler

OperationQueueScheduler封装了NSOperationQueue

public class OperationQueueScheduler: ImmediateSchedulerType {
    ...
    public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
        self.operationQueue = operationQueue
        self.queuePriority = queuePriority
    }
    ...
}
  1. 我们走MainScheduler的流程,然后我们来看看下一步的函数observeOn,这里会判断是否为串行队列:
extension ObservableType {
    ...
    public func observeOn(_ scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
            if let scheduler = scheduler as? SerialDispatchQueueScheduler {
                return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
            }
            else {
                return ObserveOn(source: self.asObservable(), scheduler: scheduler)
            }
    }
}
  1. 判断自然是SerialDispatchQueueScheduler,然后返回了ObserveOnSerialDispatchQueue序列(ObserveOnSerialDispatchQueue继承了Producer)。ObserveOnSerialDispatchQueue保存了scheduler调度者source源序列
final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
    let scheduler: SerialDispatchQueueScheduler
    let source: Observable<Element>

    init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
        self.scheduler = scheduler
        self.source = source
        ...
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
    ...
}
final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
    ...
    init(scheduler: SerialDispatchQueueScheduler, observer: Observer, cancel: Cancelable) {
        self.scheduler = scheduler
        self.observer = observer
        self.cancel = cancel
        super.init()
        //默认初始化cachedScheduleLambda
        self.cachedScheduleLambda = { pair in
            ...
        }
    }
    ...
}

根据RxSwift核心逻辑,来到ObserveOnSerialDispatchQueuerun函数,run函数创建了ObserveOnSerialDispatchQueueSink(业务下沉),ObserveOnSerialDispatchQueueSink创建时默认初始化了self.cachedScheduleLambda。然后由保存的self.source源序列进行subscribe

  1. 继续根据核心逻辑就会来到ObserveOnSerialDispatchQueueSinkonCore函数:
final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
    ...
    override func onCore(_ event: Event<Element>) {
        _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
    }
    ...
}
  1. 接着一步步走到scheduleInternal函数:
public class SerialDispatchQueueScheduler : SchedulerType {
    ...
        public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        return self.scheduleInternal(state, action: action)
    }
    //不是它
    func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        return self.configuration.schedule(state, action: action)
    }
    ...
}
public final class MainScheduler : SerialDispatchQueueScheduler {
    ...
    override func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let previousNumberEnqueued = increment(self.numberEnqueued)

        if DispatchQueue.isMain && previousNumberEnqueued == 0 {
            let disposable = action(state)
            decrement(self.numberEnqueued)
            return disposable
        }

        let cancel = SingleAssignmentDisposable()

        self._mainQueue.async {
            if !cancel.isDisposed {
                _ = action(state)
            }

            decrement(self.numberEnqueued)
        }

        return cancel
    }
}
MainScheduler DispatchQueueConfiguration

action便是ObserveOnSerialDispatchQueueSink初始化保存的self.cachedScheduleLambda闭包:

final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
    ...
    init(scheduler: SerialDispatchQueueScheduler, observer: Observer, cancel: Cancelable) {
        self.scheduler = scheduler
        self.observer = observer
        self.cancel = cancel
        super.init()

        self.cachedScheduleLambda = { pair in
            guard !cancel.isDisposed else { return Disposables.create() }

            pair.sink.observer.on(pair.event)

            if pair.event.isStopEvent {
                pair.sink.dispose()
            }

            return Disposables.create()
        }
    }
    ...
}
  1. 最终调用了pair.sink.observer.on(pair.event),这语法让人看得有点懵,慢慢一一对应:

    pair原来是元组,pair.sink.observer就是保存的self.observer,所以
    pair.sink.observer.on=>
    self.observer.on=>
    AnonymousObserver.on
  2. 最后的最后根据RxSwift核心逻辑,便会调用外面的响应闭包:
.subscribe(onNext: { (num) in
    print(num)
})

其实调度者封装的GCD就是这么简单,非常好的帮我们控制在相应的队列执行任务。

上一篇下一篇

猜你喜欢

热点阅读