RxSwift<6>—— 调度者scheduler源码解析

2019-10-15  本文已影响0人  Gollum_

RxSwift体系中,四种角色不可获取:

今天来说说RxSwift的一重要角色,调度者scheduler

RxSwift 针对于GCD进行了一套scheduler的封装。

public final class MainScheduler : SerialDispatchQueueScheduler {
    private let _mainQueue: DispatchQueue
    let numberEnqueued = AtomicInt(0)

    public init() {
        self._mainQueue = DispatchQueue.main
        super.init(serialQueue: self._mainQueue)
    }
    public static let instance = MainScheduler()
}

public class SerialDispatchQueueScheduler : SchedulerType {
    let configuration: DispatchQueueConfiguration
    init(serialQueue: DispatchQueue, leeway:) {
        self.configuration = DispatchQueueConfiguration(queue: leeway:)
    }

public convenience init(internalSerialQueueName: serialQueueConfiguration: leeway: ) {
        let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
        serialQueueConfiguration?(queue)
        self.init(serialQueue: queue, leeway: leeway)
    }
 }

调度执行
调度器(Schedulers)是 RxSwift 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。

observeOn&subscribeOn
看一下这段代码:

DispatchQueue.global().async {
    self.actionBtn.rx.tap
        .subscribe(onNext: { () in
            print("点击了按钮 --- \(Thread.current)")
        })
        .disposed(by: self.bag)
}

线程打印情况:
点击了按钮 --- <NSThread: 0x600000c2d980>{number = 1, name = main}

WHY????
看这里:

public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
    let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
            // 调度主线程判断
            MainScheduler.ensureRunningOnMainThread()
        }
    return ControlEvent(events: source)
}
public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element {
    self._events = events.subscribeOn(ConcurrentMainScheduler.instance)
}

public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable {
    return self._events.subscribe(observer)
}
public func subscribeOn(_ scheduler: ImmediateSchedulerType)
    -> Observable<Element> {
    return SubscribeOn(source: self, scheduler: scheduler)
}
final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
    let source: Ob
    let scheduler: ImmediateSchedulerType
    
    init(source: Ob, scheduler: ImmediateSchedulerType) {
        self.source = source
        self.scheduler = scheduler
    }
    
    override func run(_ observer: cancel:) -> (sink:subscription:) {
        let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

总结
整个流程是比较复杂,其实如果你这个时候,整体看源码,不难得出:

就是两层序列订阅响应,我的第二层的 sink 就是源序列的观察者

上一篇 下一篇

猜你喜欢

热点阅读