RxSwift(3)-定时器
我们在日常使用系统的Timer
类的时候,如果页面有滚动操作就会影响到定时器的正常执行。当然如果放到RunLoop
的common
模式则不会。
func testTimer() {
let _ = Timer.scheduledTimer(withTimeInterval: 1, repeats: true, block: { (timer) in
print(timer)
})
}
那么RxSwift
封装的timer
呢?会不会也是这样?下面我们来看一个例子:
func rxTimerTest() {
let rxTimer = Observable<Int>.timer(1, period: 1, scheduler: MainScheduler.instance)
rxTimer.subscribe(onNext: { (num) in
print("==\(num)==")
})
.disposed(by: disposeBag)
}
运行上述代码,我们发现定时器并没有受到滑动事件的影响。我们知道平时使用的定时器,诸如NSTimer(Timer)
、CADisplayLink
都是需要加入到RunLoop
中的,这样的定时器都会收到RunLoop
的影响;而GCD
的定时器DispatchSource
则和RunLoop
无关,不会受到滑动事件的影响。那我们就可以大胆的猜测,RxSwift
封装的定时器也是对DispatchSource
的封装。
下面我们就具体来看看它的实现:
public static func timer(_ dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType)
-> Observable<E> {
return Timer(
dueTime: dueTime,
period: period,
scheduler: scheduler
)
}
-
dueTime
:从现在到初始化第一次的触发定时器的时间 -
period
:初始化之后每次触发的时间间隔 -
scheduler
:调度类型
上述方法返回了一个可观察的序列,这个观察序列在间隔一段时间dueTime
初始化之后,每隔period
时间产生一个值。这个Timer
是通过特殊的调度者来执行的。
Timer
是Producer
的子类,保存了外界传入的scheduler
、dueTime
和period
。另外,Timer
也重写了父类定义的run
方法,根据我们之前讨论的RxSwift
的核心事件流程,在订阅之后,最终会在调用Timer
的run
方法。
final private class Timer<E: RxAbstractInteger>: Producer<E> {
fileprivate let _scheduler: SchedulerType
fileprivate let _dueTime: RxTimeInterval
fileprivate let _period: RxTimeInterval?
// 保存调度者
// 保存dueTime
// 保存间隔时间
init(dueTime: RxTimeInterval, period: RxTimeInterval?, scheduler: SchedulerType) {
self._scheduler = scheduler
self._dueTime = dueTime
self._period = period
}
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
if self._period != nil {
let sink = TimerSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
else {
let sink = TimerOneOffSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
}
在Timer
的run
方法中,会判断外界是否传入了_period
,传入_period
,就有TimerSink
接管,否则由TimerOneOffSink
处理。我们先看看TimerSink
的处理流程。
TimerSink
final private class TimerSink<O: ObserverType> : Sink<O> where O.E : RxAbstractInteger {
typealias Parent = Timer<O.E>
private let _parent: Parent
private let _lock = RecursiveLock()
init(parent: Parent, observer: O, cancel: Cancelable) {
self._parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
// 传过去的state为0
return self._parent._scheduler.schedulePeriodic(0 as O.E, startAfter: self._parent._dueTime, period: self._parent._period!) { state in
self._lock.lock(); defer { self._lock.unlock() }
self.forwardOn(.next(state))
return state &+ 1
}
}
}
TimerSink
调用run
方法的时候,self._parent._scheduler
就是Timer
的_scheduler
,上述例子中,我们传入的是MainScheduler
,相当于``调用schedulePeriodic
方法,而MainScheduler
并没有对应的方法,所以我们需要去其父类SerialDispatchQueueScheduler
中找。
public class SerialDispatchQueueScheduler : SchedulerType {
public func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
}
}
-
state
:传给action
的状态 -
startAfter
:初始化之后多久时间开始第一次传值 -
action
:每次传值之后响应的动作
extension DispatchQueueConfiguration {
func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
let initial = DispatchTime.now() + dispatchInterval(startAfter)
var timerState = state
let timer = DispatchSource.makeTimerSource(queue: self.queue)
timer.schedule(deadline: initial, repeating: dispatchInterval(period), leeway: self.leeway)
var timerReference: DispatchSourceTimer? = timer
let cancelTimer = Disposables.create {
timerReference?.cancel()
timerReference = nil
}
timer.setEventHandler(handler: {
if cancelTimer.isDisposed {
return
}
// 回调action
timerState = action(timerState)
})
timer.resume()
return cancelTimer
}
}
在上述方法里面,我们就可以清晰的看到,RxSwift
的定时器其实是对GCD
的DispatchSource
的封装,在GCD
的定时回调里完成了action
的回调。我们再来看看action
里的代码:
self._lock.lock(); defer { self._lock.unlock() }
self.forwardOn(.next(state))
return state &+ 1
调用了父类Sink
的forwardOn
方法,其实这里就进入了发送事件的流程,这时候外界就会接收到发送的事件。最后将state
的值加1,这也就是为什么外界接收到的值一直在加1。前面TimerSink
在调用方法传入的state
为0,定时器初始化的时候发送的值为0。
当我们需要销毁定时器的时候,调用onError
或者onComplete
方法即可,也可以直接回收disposeBag
。
至此带有事件间隔,也就是重复定时器的响应就算结束了。下面我们再来看看一次性定时器的流程。
TimerOneOffSink
final private class TimerOneOffSink<O: ObserverType>: Sink<O> where O.E: RxAbstractInteger {
typealias Parent = Timer<O.E>
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Cancelable) {
self._parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
return self._parent._scheduler.scheduleRelative(self, dueTime: self._parent._dueTime) { [unowned self] _ -> Disposable in
self.forwardOn(.next(0))
self.forwardOn(.completed)
self.dispose()
return Disposables.create()
}
}
}
可以看到一次性定时器和重复定时器调用的方法并不一样。它调用的是MainScheduler
的scheduleRelative
方法,也就是其父类SerialDispatchQueueScheduler
的这个方法:
public final func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
}
最后进入DispatchQueueConfiguration
的方法:
func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
let deadline = DispatchTime.now() + dispatchInterval(dueTime)
let compositeDisposable = CompositeDisposable()
let timer = DispatchSource.makeTimerSource(queue: self.queue)
timer.schedule(deadline: deadline, leeway: self.leeway)
var timerReference: DispatchSourceTimer? = timer
let cancelTimer = Disposables.create {
timerReference?.cancel()
timerReference = nil
}
timer.setEventHandler(handler: {
if compositeDisposable.isDisposed {
return
}
// 回调出去之后直接销毁
_ = compositeDisposable.insert(action(state))
// 销毁定时器
cancelTimer.dispose()
})
timer.resume()
_ = compositeDisposable.insert(cancelTimer)
return compositeDisposable
}
回调的实现如下:
self.forwardOn(.next(0))
self.forwardOn(.completed)
self.dispose()
return Disposables.create()
首先给外界发送一个值为0的事件,然后又发送完成事件,接着销毁自己。所以外界只能接收到一次事件。
总结
RxSwift
的定时器其实是对GCD
的封装,订阅Timer
观察序列之后,如果是重复定时器,订阅者每隔设定的时间间隔就会收到一个值,该值从0开始每次加1;如果是单次定时器,则只会收到一个值0
,收到之后定时器就会销毁。整体流程如下: