RxSwift中的Scheduler调度者(上)
初识
为了初步认识调度者,我们先来看下面一段代码的打印:
DispatchQueue.global().async {
print("global --- \(Thread.current)")
self.actionBtn.rx.tap
.subscribe(onNext: { () in
// 订阅回来打印的线程在主线程,是因为订阅就是订阅在主队列的
print("tap --- \(Thread.current)")
})
}
/*
打印结果:
global --- <NSThread: 0x600002696380>{number = 3, name = (null)}
tap --- <NSThread: 0x6000026f9900>{number = 1, name = main}
*/
从上面可以看出来,在全局并发线程number=3的线程中订阅的button点击事件,但是订阅回来的打印却在主线程。
所以到底发生了什么呢?为什么订阅回来会回到主线程了呢?
我们猜想RxSwift中肯定做了什么操作,才会使button点击的订阅事件回来在主线程了。于是我们点击tap
进去:
public var tap: ControlEvent<Void> {
return controlEvent(.touchUpInside)
}
发现tap
其实就是controlEvent
的一个点击事件,然后再点击controlEvent
进去:
public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
// 走这个代码的时候,就是发送事件的时候,上一个流程应该是订阅
MainScheduler.ensureRunningOnMainThread() // 确保在主线程,已经在主线程了
guard let control = control else {
observer.on(.completed)
return Disposables.create()
}
let controlTarget = ControlTarget(control: control, controlEvents: controlEvents) { _ in
observer.on(.next(()))
}
return Disposables.create(with: controlTarget.dispose)
}
.takeUntil(deallocated)
return ControlEvent(events: source)
}
发现里面有句关键代码MainScheduler.ensureRunningOnMainThread()
,这句代码确保了当前环境是主线程环境(注意现在已经是主线了),如果不是主线程,就会抛出错误。
因为这是用户交互的事件,也就是说是UI交互,所以必须要在主线程,可以理解。
但是是在哪一步切换到主线程中去的呢?
分析:通过前面的文章,我们已经知道了这个大括号里面,即序列创建的这个逃逸闭包里面是发送事件的流程,所以切换到主线程的操作,只可能是上一步订阅流程切换的(因为这里就是序列的创建,并没有在主线程中创建序列,所以只有可能是在订阅流程中切换到主线程的)。
点击进入返回的结构体ControlEvent
中:
public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element {
self._events = events.subscribeOn(ConcurrentMainScheduler.instance)
}
- 发现在
ControlEvent
初始化的时候,它先让它的事件订阅在了主线程ConcurrentMainScheduler.instance
,即是说,在events
订阅上面的打印事件之前,先订阅了一个在主线程这个事件,然后返回的序列才订阅的上面打印事件。 - 这里有两个关键词
subscribeOn
和ConcurrentMainScheduler
。 - 其实除了
subscribeOn
,还有一个关键方法observeOn
,下一篇文章再具体讲。 - 这篇文章我们先具体认识和
ConcurrentMainScheduler
一样的,一些关键类。
ConcurrentMainScheduler
并发主线程调度者:当一些操作需要在主线程执行,如果它的schedule
是在主线程调用的,那它就会立即执行,而不需要调度。
public final class ConcurrentMainScheduler : SchedulerType {
private let _mainScheduler: MainScheduler
private let _mainQueue: DispatchQueue
private init(mainScheduler: MainScheduler) {
self._mainQueue = DispatchQueue.main // 主队列
self._mainScheduler = mainScheduler
}
/// Singleton instance of `ConcurrentMainScheduler`
public static let instance = ConcurrentMainScheduler(mainScheduler: MainScheduler.instance)
}
- 初始化的时候
_mainQueue
绑定了主队列 - 传进来
MainScheduler
参数来初始化
MainScheduler
主线程调度者:主要用来处理UI相关的任务。
public final class MainScheduler : SerialDispatchQueueScheduler {
private let _mainQueue: DispatchQueue
let numberEnqueued = AtomicInt(0)
/// Initializes new instance of `MainScheduler`.
public init() {
self._mainQueue = DispatchQueue.main // 绑定主队列
super.init(serialQueue: self._mainQueue)
}
/// Singleton instance of `MainScheduler`
public static let instance = MainScheduler()
}
- 绑定了主队列
DispatchQueue.main
- 主队列其实就是一种串行队列,继承自串行调度者
SerialDispatchQueueScheduler
SerialDispatchQueueScheduler
串行调度者:封装了 GCD 的串行队列。在特定的队列执行,并且保证即使传进来的是一个并发队列,也会被转换成一个串行队列。
public class SerialDispatchQueueScheduler : SchedulerType {
let configuration: DispatchQueueConfiguration
// leeway :延迟执行时间
init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
// 初始化的时候,保存了一个串行队列
self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
}
public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
serialQueueConfiguration?(queue)
self.init(serialQueue: queue, leeway: leeway)
}
}
- 如果在使用前需要对其进行一些定制,内部串行队列可以使用
serialQueueConfiguration
回调来进行定制
ConcurrentDispatchQueueScheduler
串行调度者:封装了 GCD 的并发队列。用来执行一些并发任务。
/// Abstracts the work that needs to be performed on a specific `dispatch_queue_t`. You can also pass a serial dispatch queue, it shouldn't cause any problems.
/// This scheduler is suitable when some work needs to be performed in background.
public class ConcurrentDispatchQueueScheduler: SchedulerType {
let configuration: DispatchQueueConfiguration
public init(queue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.configuration = DispatchQueueConfiguration(queue: queue, leeway: leeway)
}
}
- 在特定的队列执行,并且即使传进来的是一个串行队列,也不会产生错误
- 可以用来执行在后台进行的任务
OperationQueueScheduler
Operation调度者:封装了NSOperationQueue
。有操作队列和操作优先级的属性
/// Abstracts the work that needs to be performed on a specific `NSOperationQueue`.
/// This scheduler is suitable for cases when there is some bigger chunk of work that needs to be performed in background and you want to fine tune concurrent processing using `maxConcurrentOperationCount`.
public class OperationQueueScheduler: ImmediateSchedulerType {
public let operationQueue: OperationQueue // 封装的Operation
public let queuePriority: Operation.QueuePriority // 优先级
/// Constructs new instance of `OperationQueueScheduler` that performs work on `operationQueue`.
/// - parameter operationQueue: Operation queue targeted to perform work on.
/// - parameter queuePriority: Queue priority which will be assigned to new operations.
public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
self.operationQueue = operationQueue
self.queuePriority = queuePriority
}
}
- 用于在特定的
NSOperationQueue
上执行 - 适合用于在后台处理大量任务的时候
- 适合用于设置最大并发数
CurrentThreadScheduler
当前线程调度者:表示当前线程的调度者,默认使用这个(当很多时候参数里面没有传调度者的时候,给的默认值就是这个)
public class CurrentThreadScheduler : ImmediateSchedulerType {
typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
public static let instance = CurrentThreadScheduler()
// isScheduleRequiredKey 就是一个默认的key值,字符串
private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
defer { key.deallocate() }
guard pthread_key_create(key, nil) == 0 else {
rxFatalError("isScheduleRequired key creation failed")
}
return key.pointee
}()
private static var scheduleInProgressSentinel: UnsafeRawPointer = { () -> UnsafeRawPointer in
return UnsafeRawPointer(UnsafeMutablePointer<Int>.allocate(capacity: 1))
}()
// 直接调CurrentThreadScheduler.queue方法,获取出来的,一般都是主线程
static var queue : ScheduleQueue? {
get {
// CurrentThreadSchedulerQueueKey.instance 也是一个key值字符串
// 里面是一个字典,获取这个key对应的value
return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
}
set {
// 将newValue和key,对应起来,存储在字典里
Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
}
}
/// Gets a value that indicates whether the caller must call a `schedule` method.
// 判断当前线程是否需要调度
public static fileprivate(set) var isScheduleRequired: Bool {
get {
// 判断key是否为nil,如果是nil,说明没有被调度过
return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
}
set(isScheduleRequired) {
// 三目运算,isScheduleRequired为false的时候,就是取后面的那个
// 也就是给isScheduleRequiredKey赋值
if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
rxFatalError("pthread_setspecific failed")
}
}
}
}
- 声明了一个默认的key值字符串
isScheduleRequiredKey
- 根据
isScheduleRequiredKey
是否有值,即isScheduleRequired
,来判断当前线程是否被调度过 -
queue
的set
,get
方法中通过扩展Thread
的方法,来获取当前线程,或者赋值一个新的线程
Thread的扩展
// 给Thread扩展了一个set和get方法
extension Thread {
static func setThreadLocalStorageValue<T: AnyObject>(_ value: T?, forKey key: NSCopying) {
let currentThread = Thread.current
let threadDictionary = currentThread.threadDictionary
if let newValue = value {
threadDictionary[key] = newValue // 赋值新的线程
}
else {
threadDictionary[key] = nil
}
}
static func getThreadLocalStorageValueForKey<T>(_ key: NSCopying) -> T? {
let currentThread = Thread.current // 获取当前线程
let threadDictionary = currentThread.threadDictionary
return threadDictionary[key] as? T
}
}
关于上面介绍的类的继承关系图:
继承关系图上面我们具体了解了一些关键的和重要的类,这些类里面其实有个重要的方法schedule
,这个方法和调度密切相关。由于篇幅原因,我并没有粘贴出来。但是具体是怎么调度的呢,我们就下一篇文章根据流程来具体分析。