ReactiveX

RxSwift(3)-定时器

2020-04-16  本文已影响0人  xxxxxxxx_123

我们在日常使用系统的Timer类的时候,如果页面有滚动操作就会影响到定时器的正常执行。当然如果放到RunLoopcommon模式则不会。

func testTimer() {
    let _ = Timer.scheduledTimer(withTimeInterval: 1, repeats: true, block: { (timer) in
        print(timer)
    })
}

那么RxSwift封装的timer呢?会不会也是这样?下面我们来看一个例子:

func rxTimerTest() {
    let rxTimer = Observable<Int>.timer(1, period: 1, scheduler: MainScheduler.instance)
    rxTimer.subscribe(onNext: { (num) in
        print("==\(num)==")
    })
    .disposed(by: disposeBag)
}

运行上述代码,我们发现定时器并没有受到滑动事件的影响。我们知道平时使用的定时器,诸如NSTimer(Timer)CADisplayLink都是需要加入到RunLoop中的,这样的定时器都会收到RunLoop的影响;而GCD的定时器DispatchSource则和RunLoop无关,不会受到滑动事件的影响。那我们就可以大胆的猜测,RxSwift封装的定时器也是对DispatchSource的封装。

下面我们就具体来看看它的实现:

public static func timer(_ dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType)
    -> Observable<E> {
    return Timer(
        dueTime: dueTime,
        period: period,
        scheduler: scheduler
    )
}

上述方法返回了一个可观察的序列,这个观察序列在间隔一段时间dueTime初始化之后,每隔period时间产生一个值。这个Timer是通过特殊的调度者来执行的。

TimerProducer的子类,保存了外界传入的schedulerdueTimeperiod。另外,Timer也重写了父类定义的run方法,根据我们之前讨论的RxSwift的核心事件流程,在订阅之后,最终会在调用Timerrun方法。

final private class Timer<E: RxAbstractInteger>: Producer<E> {
    fileprivate let _scheduler: SchedulerType
    fileprivate let _dueTime: RxTimeInterval
    fileprivate let _period: RxTimeInterval?

    // 保存调度者
    // 保存dueTime
    // 保存间隔时间
    init(dueTime: RxTimeInterval, period: RxTimeInterval?, scheduler: SchedulerType) {
        self._scheduler = scheduler
        self._dueTime = dueTime
        self._period = period
    }

    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
        if self._period != nil {
            let sink = TimerSink(parent: self, observer: observer, cancel: cancel)
            let subscription = sink.run()
            return (sink: sink, subscription: subscription)
        }
        else {
            let sink = TimerOneOffSink(parent: self, observer: observer, cancel: cancel)
            let subscription = sink.run()
            return (sink: sink, subscription: subscription)
        }
    }
}

Timerrun方法中,会判断外界是否传入了_period,传入_period,就有TimerSink接管,否则由TimerOneOffSink处理。我们先看看TimerSink的处理流程。

TimerSink

final private class TimerSink<O: ObserverType> : Sink<O> where O.E : RxAbstractInteger  {
    typealias Parent = Timer<O.E>

    private let _parent: Parent
    private let _lock = RecursiveLock()

    init(parent: Parent, observer: O, cancel: Cancelable) {
        self._parent = parent
        super.init(observer: observer, cancel: cancel)
    }

    func run() -> Disposable {
        // 传过去的state为0
        return self._parent._scheduler.schedulePeriodic(0 as O.E, startAfter: self._parent._dueTime, period: self._parent._period!) { state in
            self._lock.lock(); defer { self._lock.unlock() }
            self.forwardOn(.next(state))
            return state &+ 1
        }
    }
}

TimerSink调用run方法的时候,self._parent._scheduler就是Timer_scheduler,上述例子中,我们传入的是MainScheduler,相当于``调用schedulePeriodic方法,而MainScheduler并没有对应的方法,所以我们需要去其父类SerialDispatchQueueScheduler中找。

public class SerialDispatchQueueScheduler : SchedulerType {
    public func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
        return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
    }
}
extension DispatchQueueConfiguration {

    func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
        let initial = DispatchTime.now() + dispatchInterval(startAfter)

        var timerState = state

        let timer = DispatchSource.makeTimerSource(queue: self.queue)
        timer.schedule(deadline: initial, repeating: dispatchInterval(period), leeway: self.leeway)
        
        var timerReference: DispatchSourceTimer? = timer
        let cancelTimer = Disposables.create {
            timerReference?.cancel()
            timerReference = nil
        }

        timer.setEventHandler(handler: {
            if cancelTimer.isDisposed {
                return
            }
            // 回调action
            timerState = action(timerState)
        })
        timer.resume()
        
        return cancelTimer
    }
}

在上述方法里面,我们就可以清晰的看到,RxSwift的定时器其实是对GCDDispatchSource的封装,在GCD的定时回调里完成了action的回调。我们再来看看action里的代码:

self._lock.lock(); defer { self._lock.unlock() }
self.forwardOn(.next(state))
return state &+ 1

调用了父类SinkforwardOn方法,其实这里就进入了发送事件的流程,这时候外界就会接收到发送的事件。最后将state的值加1,这也就是为什么外界接收到的值一直在加1。前面TimerSink在调用方法传入的state为0,定时器初始化的时候发送的值为0。

当我们需要销毁定时器的时候,调用onError或者onComplete方法即可,也可以直接回收disposeBag

至此带有事件间隔,也就是重复定时器的响应就算结束了。下面我们再来看看一次性定时器的流程。

TimerOneOffSink

final private class TimerOneOffSink<O: ObserverType>: Sink<O> where O.E: RxAbstractInteger {
    typealias Parent = Timer<O.E>

    private let _parent: Parent

    init(parent: Parent, observer: O, cancel: Cancelable) {
        self._parent = parent
        super.init(observer: observer, cancel: cancel)
    }

    func run() -> Disposable {
        return self._parent._scheduler.scheduleRelative(self, dueTime: self._parent._dueTime) { [unowned self] _ -> Disposable in
            self.forwardOn(.next(0))
            self.forwardOn(.completed)
            self.dispose()
            return Disposables.create()
        }
    }
}

可以看到一次性定时器和重复定时器调用的方法并不一样。它调用的是MainSchedulerscheduleRelative方法,也就是其父类SerialDispatchQueueScheduler的这个方法:

public final func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
    return self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
}

最后进入DispatchQueueConfiguration的方法:

func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
    let deadline = DispatchTime.now() + dispatchInterval(dueTime)

    let compositeDisposable = CompositeDisposable()

    let timer = DispatchSource.makeTimerSource(queue: self.queue)
    timer.schedule(deadline: deadline, leeway: self.leeway)

    var timerReference: DispatchSourceTimer? = timer
    let cancelTimer = Disposables.create {
        timerReference?.cancel()
        timerReference = nil
    }

    timer.setEventHandler(handler: {
        if compositeDisposable.isDisposed {
            return
        }
        // 回调出去之后直接销毁
        _ = compositeDisposable.insert(action(state))
        // 销毁定时器
        cancelTimer.dispose()
    })
    timer.resume()

    _ = compositeDisposable.insert(cancelTimer)
    return compositeDisposable
}

回调的实现如下:

self.forwardOn(.next(0))
self.forwardOn(.completed)
self.dispose()
return Disposables.create()

首先给外界发送一个值为0的事件,然后又发送完成事件,接着销毁自己。所以外界只能接收到一次事件。

总结

RxSwift的定时器其实是对GCD的封装,订阅Timer观察序列之后,如果是重复定时器,订阅者每隔设定的时间间隔就会收到一个值,该值从0开始每次加1;如果是单次定时器,则只会收到一个值0,收到之后定时器就会销毁。整体流程如下:

Rx定时器.png
上一篇下一篇

猜你喜欢

热点阅读