Rx

RxSwift源码分析(7)——定时器

2020-10-04  本文已影响0人  无悔zero

RxSwift还封装了定时器,下面是一个每隔1秒在主线程执行回调的定时器:

Observable.interval(1, scheduler: MainScheduler.instance)
.subscribe(onNext: { (num) in
    print("定时器执行")
}).disposed(by: disposbag)

第一个参数是时间间隔,第二个参数是执行回调的调用环境。

  1. 我们马上进入源码看一看这个定时器是怎么封装的:
extension ObservableType where Element : RxAbstractInteger {
    public static func interval(_ period: Foundation.TimeInterval, scheduler: SchedulerType)
        -> Observable<Element> {
        return interval(.milliseconds(Int(period * 1000.0)), scheduler: scheduler)
    }
}
extension ObservableType where Element : RxAbstractInteger {
    public static func interval(_ period: RxTimeInterval, scheduler: SchedulerType)
        -> Observable<Element> {
        return Timer(
            dueTime: period,
            period: period,
            scheduler: scheduler
        )
    }
}

在这个Timer内部可以看到它继承了Producer,所以Timer也是一个序列

final private class Timer<Element: RxAbstractInteger>: Producer<Element> {
    ...
    init(dueTime: RxTimeInterval, period: RxTimeInterval?, scheduler: SchedulerType) {
        self._scheduler = scheduler
        self._dueTime = dueTime
        self._period = period
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        if self._period != nil {
            let sink = TimerSink(parent: self, observer: observer, cancel: cancel)
            let subscription = sink.run()
            return (sink: sink, subscription: subscription)
        }
        else {
            let sink = TimerOneOffSink(parent: self, observer: observer, cancel: cancel)
            let subscription = sink.run()
            return (sink: sink, subscription: subscription)
        }
    }
}
  1. 根据RxSwift核心逻辑,定时器序列在subscribe订阅之后便会来到run函数。在run函数里面得知,下沉业务由TimerSinkTimerOneOffSink负责,我们先看TimerSink,看懂了TimerSink自然能懂TimerOneOffSink
final private class TimerSink<Observer: ObserverType> : Sink<Observer> where Observer.Element : RxAbstractInteger  {
    ...
    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self._parent = parent
        super.init(observer: observer, cancel: cancel)
    }

    func run() -> Disposable {
        return self._parent._scheduler.schedulePeriodic(0 as Observer.Element, startAfter: self._parent._dueTime, period: self._parent._period!) { state in
          ...
        }
    }
}
  1. _scheduler是一个调度者,以后会详细说说。先看后面的执行函数:
public class SerialDispatchQueueScheduler : SchedulerType {
    ...
    init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
    }

    public func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
        return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
    }
}
  1. 最后来到重点,schedulePeriodic函数里封装了GCD定时器,所以这个RxSwift定时器不会受到滑动屏幕的影响而停止执行。
extension DispatchQueueConfiguration {
    ...
    func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
        let initial = DispatchTime.now() + startAfter

        var timerState = state
        //真正作用的定时器
        let timer = DispatchSource.makeTimerSource(queue: self.queue)
        timer.schedule(deadline: initial, repeating: period, leeway: self.leeway)
        
        var timerReference: DispatchSourceTimer? = timer
        let cancelTimer = Disposables.create {
            timerReference?.cancel()//停止定时器
            timerReference = nil
        }
        //定时执行
        timer.setEventHandler(handler: {
            if cancelTimer.isDisposed {
                return
            }
            timerState = action(timerState)//执行外面的尾随闭包
        })
        timer.resume()//定时器启动
        
        return cancelTimer
    }
}
  1. 定时器启动后,便定时执行action(timerState)回到外面的尾随闭包,接着执行self.forwardOn(.next(state))发送响应:
final private class TimerSink<Observer: ObserverType> : Sink<Observer> where Observer.Element : RxAbstractInteger  {
    ...
    func run() -> Disposable {
        return self._parent._scheduler.schedulePeriodic(0 as Observer.Element, startAfter: self._parent._dueTime, period: self._parent._period!) { state in
            self._lock.lock(); defer { self._lock.unlock() }
            self.forwardOn(.next(state))//发送响应
            return state &+ 1
        }
    }
}

最后根据RxSwift核心逻辑,最终实现定时响应

.subscribe(onNext: { (num) in
    print("定时器执行")
}).disposed(by: disposbag)
上一篇下一篇

猜你喜欢

热点阅读