RxSwift(7)-调度者
前言
scheduler
,也称为调度者,作为RxSwift
的四大核心之一,是RxSwift
实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。
下面我们先来看一个例子:
func schedulerTest01() {
DispatchQueue.global().async {
self.mButton.rx.tap.subscribe { (element) in
print("==currentThread==\(Thread.current)==")
}.disposed(by: self.disposeBag)
}
}
控制台输出:
==currentThread==<NSThread: 0x600002c44000>{number = 1, name = main}==
如果没有RxSwift
,我们在子线程操作UI
就会出错。而此处我们可以看到,虽然我们在子线程订阅了信号,但是还是在主线程输出了。那么在这其中,调度者做了什么?
extension Reactive where Base: UIButton {
/// Reactive wrapper for `TouchUpInside` control event.
public var tap: ControlEvent<Void> {
return controlEvent(.touchUpInside)
}
}
extension Reactive where Base: UIControl {
public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
// 线程判断 不是主线程就报错
MainScheduler.ensureRunningOnMainThread()
......
}
.takeUntil(deallocated)
return ControlEvent(events: source)
}
}
可以看出在Observable.create
的闭包里就已经做了主线程判断,也就是说在create
的时候,就已经把线程切换到了主线程。根据我们之前分析的事件流程可知,当代码执行到create
的闭包里面,也就是观察者发送的事件的时候,是在订阅观察事件之后,所以我们可以大胆猜测,切换线程的操作是在订阅subscribe
方法中,订阅方法最终会是ControlEvent
的重写的订阅方法。
public struct ControlEvent<PropertyType> : ControlEventType {
public typealias Element = PropertyType
let _events: Observable<PropertyType>
public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element {
self._events = events.subscribeOn(ConcurrentMainScheduler.instance)
// subscribeOn就是给序列指定运行的线程
}
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
return self._events.subscribe(observer)
}
}
public final class ConcurrentMainScheduler : SchedulerType {
public typealias TimeInterval = Foundation.TimeInterval
public typealias Time = Date
private let _mainScheduler: MainScheduler
private let _mainQueue: DispatchQueue
private init(mainScheduler: MainScheduler) {
self._mainQueue = DispatchQueue.main
self._mainScheduler = mainScheduler
}
public static let instance = ConcurrentMainScheduler(mainScheduler: MainScheduler.instance)
}
public final class MainScheduler : SerialDispatchQueueScheduler {
public init() {
self._mainQueue = DispatchQueue.main
super.init(serialQueue: self._mainQueue)
}
public static let instance = MainScheduler()
}
ControlEvent
在初始化的时候调用events.subscribeOn(ConcurrentMainScheduler.instance)
方法,而ConcurrentMainScheduler.instance
就是在主队列操作。
MainScheduler
MainScheduler
,主线程调度者,它抽取了需要在DispatchQueue.main
上执行的任务,通常用于UI
操作。是SerialDispatchQueueScheduler
的一种特殊化。如果一旦从主队列调用了schedule
方法,它将立即执行操作而不进行调度。
另外,MainScheduler
是为observeOn
运算符优化的。如果要使用subscribeOn
在主线程上订阅可观察的序列,建议使用ConcurrentMainScheduler
。
public final class MainScheduler : SerialDispatchQueueScheduler {
private let _mainQueue: DispatchQueue
let numberEnqueued = AtomicInt(0)
public init() {
self._mainQueue = DispatchQueue.main
super.init(serialQueue: self._mainQueue)
}
public static let instance = MainScheduler()
public static let asyncInstance = SerialDispatchQueueScheduler(serialQueue: DispatchQueue.main)
public class func ensureExecutingOnScheduler(errorMessage: String? = nil) {
if !DispatchQueue.isMain {
rxFatalError(errorMessage ?? "Executing on background thread. Please use `MainScheduler.instance.schedule` to schedule work on main thread.")
}
}
/// In case this method is running on a background thread it will throw an exception.
public class func ensureRunningOnMainThread(errorMessage: String? = nil) {
#if !os(Linux) // isMainThread is not implemented in Linux Foundation
guard Thread.isMainThread else {
rxFatalError(errorMessage ?? "Running on background thread.")
}
#endif
}
override func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let previousNumberEnqueued = increment(self.numberEnqueued)
if DispatchQueue.isMain && previousNumberEnqueued == 0 {
let disposable = action(state)
decrement(self.numberEnqueued)
return disposable
}
let cancel = SingleAssignmentDisposable()
self._mainQueue.async {
if !cancel.isDisposed {
_ = action(state)
}
decrement(self.numberEnqueued)
}
return cancel
}
}
CurrentThreadScheduler
当前线程调度者,是生成事件元素的默认调度者。
public class CurrentThreadScheduler : ImmediateSchedulerType {
typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
public static let instance = CurrentThreadScheduler()
......
// 获取当前线程的队列
static var queue : ScheduleQueue? {
get {
return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
}
set {
Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
}
}
/// 该值用来判断调用者是否必须调用`schedule`方法。
public static fileprivate(set) var isScheduleRequired: Bool {
get {
return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
}
set(isScheduleRequired) {
if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
rxFatalError("pthread_setspecific failed")
}
}
}
......
}
CurrentThreadScheduler
的queue
属性就是当前线程的队列。RxSwift
给系统的Thread
添加了两个扩展方法,用来存储和获取当前线程队列。
extension Thread {
static func setThreadLocalStorageValue<T: AnyObject>(_ value: T?, forKey key: NSCopying) {
let currentThread = Thread.current
let threadDictionary = currentThread.threadDictionary
if let newValue = value {
threadDictionary[key] = newValue
}
else {
threadDictionary[key] = nil
}
}
static func getThreadLocalStorageValueForKey<T>(_ key: NSCopying) -> T? {
let currentThread = Thread.current
let threadDictionary = currentThread.threadDictionary
return threadDictionary[key] as? T
}
}
SerialDispatchQueueScheduler
串行队列调度者。SerialDispatchQueueScheduler
会抽取出需要在特定的队列上执行的工作。而且它能够确保即使是并发队列,也会被转换为串行队列。也就是说不管任务在外部是以何种形式执行的,到这里来都会转为串行执行。
public class SerialDispatchQueueScheduler : SchedulerType {
let configuration: DispatchQueueConfiguration
init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
}
public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
serialQueueConfiguration?(queue)
self.init(serialQueue: queue, leeway: leeway)
}
public convenience init(queue: DispatchQueue, internalSerialQueueName: String, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
let serialQueue = DispatchQueue(label: internalSerialQueueName,
attributes: [],
target: queue)
self.init(serialQueue: serialQueue, leeway: leeway)
}
public convenience init(qos: DispatchQoS, internalSerialQueueName: String = "rx.global_dispatch_queue.serial", leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.init(queue: DispatchQueue.global(qos: qos.qosClass), internalSerialQueueName: internalSerialQueueName, leeway: leeway)
}
}
从代码可以看出来串行队列调度者的初始化方法要么就是外界传入一个队列,要么就是自己创建一个串行队列。
ConcurrentDispatchQueueScheduler
并发队列调度者,一般适用于需要在后台执行某些任务。可以传递一个串行调度队列。
public class ConcurrentDispatchQueueScheduler: SchedulerType {
public typealias TimeInterval = Foundation.TimeInterval
public typealias Time = Date
public var now : Date {
return Date()
}
let configuration: DispatchQueueConfiguration
public init(queue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.configuration = DispatchQueueConfiguration(queue: queue, leeway: leeway)
}
public convenience init(qos: DispatchQoS, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.init(queue: DispatchQueue(
label: "rxswift.queue.\(qos)",
qos: qos,
attributes: [DispatchQueue.Attributes.concurrent],
target: nil),
leeway: leeway
)
}
}
可以看出并发队列调度者和串行队列调度者的逻辑类似,初始化的时候要么外界传入一个队列,要么就是自己创建一个并发队列。
OperationQueueScheduler
操作队列调度者,是对NSOperationQueue
的封装。可以通过设置 maxConcurrentOperationCount
来控制同时执行并发任务的最大数量。
public class OperationQueueScheduler: ImmediateSchedulerType {
public let operationQueue: OperationQueue
public let queuePriority: Operation.QueuePriority
public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
self.operationQueue = operationQueue
self.queuePriority = queuePriority
}
public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
let operation = BlockOperation {
if cancel.isDisposed {
return
}
// 回调block
cancel.setDisposable(action(state))
}
operation.queuePriority = self.queuePriority
self.operationQueue.addOperation(operation)
return cancel
}
}
observeOn
observeOn
用于指定可观察序列发出通知的调度者。默认情况下,Observable
的创建,应用操作符以及发出通知都会在subscribe
方法调用的scheduler
执行,observeOn
将指定一个scheduler
来让Observable
通知观察者。
需要注意的是,一旦产生了onError
事件,observeOn
操作符将立即转发。它不会等待onError
之前的事件全部被收到。这意味着onError
事件可能会跳过一些元素提前发送出去。
串行队列调度者observeOn
func serialObserveOnTest() {
Observable.of(1, 2, 3, 4, 5)
.observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "serialObserveOn"))
.subscribe { (element) in
print("==\(element)==\(Thread.current)==")
}.disposed(by: disposeBag)
}
控制台输出:
==next(1)==<NSThread: 0x600001469ac0>{number = 5, name = (null)}==
==next(2)==<NSThread: 0x600001469ac0>{number = 5, name = (null)}==
==next(3)==<NSThread: 0x600001469ac0>{number = 5, name = (null)}==
==next(4)==<NSThread: 0x600001469ac0>{number = 5, name = (null)}==
==next(5)==<NSThread: 0x600001469ac0>{number = 5, name = (null)}==
==completed==<NSThread: 0x600001469ac0>{number = 5, name = (null)}==
下面我们来具体分析一下其流程:
- 首先调用
of
生成可观察序列的时候会将当前线程调度者作为参数传入
public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
return ObservableSequence(elements: elements, scheduler: scheduler)
}
- 将元素和当前线程调度者保存在
ObservableSequence
中:
final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
init(elements: Sequence, scheduler: ImmediateSchedulerType) {
self._elements = elements
self._scheduler = scheduler
}
}
- 调用
observeOn
方法的时候,会将我们创建的串行队列调度者传入ObserveOnSerialDispatchQueue
中保存。
public func observeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
// scheduler: SerialDispatchQueueScheduler
// self = <ObservableSequence<Array<Int>>
return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
}
final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
self.scheduler = scheduler
self.source = source
}
}
- 在调用
subscribe
方法的时候就会进行一系列的判断,首先是ObserveOnSerialDispatchQueue.subscribe
,然后会转交给ObservableSequence.subscribe
。
// ObserveOnSerialDispatchQueue
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
// self.scheduler = SerialDispatchQueueScheduler
// observer = AnonymousObserver
// self.source = ObservableSequence
let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
// ObservableSequence
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
// observer = ObserveOnSerialDispatchQueueSink
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
- 调度者的处理
// ObserveOnSerialDispatchQueueSink
override func onCore(_ event: Event<Element>) {
// self = ObserveOnSerialDispatchQueueSink
// self.scheduler = SerialDispatchQueueScheduler
_ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
}
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.schedule(state, action: action)
}
- 在队列中异步执行保存的
block
,终极回调
// DispatchQueueConfiguration
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
// 串行队列异步执行
self.queue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
- 执行回调的
block
,观察者发送事件。
// ObserveOnSerialDispatchQueueSink
self.cachedScheduleLambda = { pair in
guard !cancel.isDisposed else { return Disposables.create() }
// 发送事件
// pair.sink = ObserveOnSerialDispatchQueueSink
// pair.sink.observer = AnonymousObserver
pair.sink.observer.on(pair.event)
if pair.event.isStopEvent {
pair.sink.dispose()
}
return Disposables.create()
}
并发队列调度者observeOn
func concurrentObserveOnTest() {
let ob = Observable.of(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60,
61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80,
81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
ob.observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
.subscribe { (element) in
print("==\(element)==\(Thread.current)==")
}.disposed(by: disposeBag)
}
控制台输出:
// 输出较长 此处就不一一罗列
.....
==next(66)==<NSThread: 0x600002454140>{number = 6, name = (null)}==
==next(67)==<NSThread: 0x60000245f400>{number = 7, name = (null)}==
==next(68)==<NSThread: 0x600002456200>{number = 8, name = (null)}==
......
可以看到的是,虽然是并发队列,也存在多个子线程,但是我们发出的事件是按顺序执行的。
源码如下:
public func observeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
- 和串行队列不同的是,并发队列调用
ObserveOn
方法返回的是ObserveOn
对象,此时保存的是scheduler
是ConcurrentDispatchQueueScheduler
,source
是ObservableSequence
。
final private class ObserveOn<Element>: Producer<Element> {
let scheduler: ImmediateSchedulerType
let source: Observable<Element>
init(source: Observable<Element>, scheduler: ImmediateSchedulerType) {
self.scheduler = scheduler
self.source = source
}
}
- 调用订阅方法的时候就会执行
ObserveOn
的on
方法,此时会再次调用ObservableSequence
的subscribe
,也就是Producer
的方法。
// ObserveOn
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObserveOnSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
- 调用
ObservableSequence
的run
方法:
// ObservableSequence
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
// ObservableSequenceSink
func run() -> Disposable {
// self._parent = ObservableSequence
// self._parent._scheduler = CurrentThreadScheduler
// of函数默认的scheduler
return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator
if let next = mutableIterator.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
- 队列的相关操作:
public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
recursiveScheduler.schedule(state)
return Disposables.create(with: recursiveScheduler.dispose)
}
func schedule(_ state: State) {
let d = self._scheduler.schedule(state) { state -> Disposable in
......
if let action = action {
action(state, self.schedule)
}
......
}
......
}
- 在调用
action
的时候,会进入ObservableSequenceSink
中run
方法的闭包执行forwardOn
方法,接着调用Sink
的forwardOn
方法,接着ObserverBase
的on
,接着ObserveOnSink
的onCore
方法。此时会把所有的事件都装入_queue
中,而在条件允许的情况就会调用scheduleRecursive
方法,并将ObserveOnSink.run
作为闭包传入,一旦执行到这个闭包,就会把_queue
中的事件发出,然后接着递归。
final private class ObserveOnSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
.....
override func onCore(_ event: Event<Element>) {
let shouldStart = self._lock.calculateLocked { () -> Bool in
self._queue.enqueue(event)
switch self._state {
case .stopped:
self._state = .running
return true
case .running:
return false
}
}
if shouldStart {
self._scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
}
}
func run(_ state: (), _ recurse: (()) -> Void) {
let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<Element>?, Observer) in
if !self._queue.isEmpty {
// 取出一个事件
return (self._queue.dequeue(), self._observer)
}
}
if let nextEvent = nextEvent, !self._cancel.isDisposed {
// 观察者发送事件
observer.on(nextEvent)
if nextEvent.isStopEvent {
self.dispose()
}
}
......
let shouldContinue = self._shouldContinue_synchronized()
if shouldContinue {
// 接着递归
recurse(())
}
}
......
}
_queue
是RxSwift
自己封装的一个队列Queue<Event<Element>>(capacity: 10)
,由于队列先进先出的特性,所以虽然是并发事件,入队出队都有锁的保护,所以会按顺序入队、按顺序出队,只是入队和出队的操作是并发的,比如可以在4入队的时候1出队。
可以看出串行队列和并发队列还是有很多相似点的。
subscribeOn
subscribeOn
操作符指定Observable
在哪个scheduler
开始执行。默认情况下,Observable
创建,应用操作符以及发出通知都会在subscribe
方法调用的scheduler
执行。subscribeOn
可以指定一个不同的scheduler
来让Observable
执行。
源码如下:
public func subscribeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
return SubscribeOn(source: self, scheduler: scheduler)
}
可以看到SubscribeOn
是继承自Producer
类的,这也和我们之前分析过的流程一样。经过订阅方法之后,会调用SubscribeOn.on
方法,然后会走到SubscribeOnSink.run()
方法。
final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
let source: Ob
let scheduler: ImmediateSchedulerType
init(source: Ob, scheduler: ImmediateSchedulerType) {
self.source = source
self.scheduler = scheduler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
func run() -> Disposable {
let disposeEverything = SerialDisposable()
let cancelSchedule = SingleAssignmentDisposable()
disposeEverything.disposable = cancelSchedule
let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
// 订阅
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return Disposables.create()
}
cancelSchedule.setDisposable(disposeSchedule)
return disposeEverything
}
而self.parent.scheduler.schedule(()){}
就是线程调度的方法,最终会到DispatchQueueConfiguration.schedule()
:
extension DispatchQueueConfiguration {
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
self.queue.async {
if cancel.isDisposed {
return
}
// 回调
cancel.setDisposable(action(state))
}
return cancel
}
}
后面的逻辑就和ObserveOn
的逻辑类似,这里就不赘述了。
总结
RxSwift
的调度者就是其实现多线程的核心模块,是对GCD
和OperationQueue
的封装,主要用于控制任务在哪个线程或队列运行。其串行、并发队列的流程大致相同:
- 针对源序列进行处理、订阅
- 调度环境、观察者传值、保存
- 转移到子序列订阅、调度
- 队列异步回调,观察者响应、发送事件
此处附上一张总结的串行ObserveOn
流程图: