iOS-RXSwift

RxSwift-Scheduler

2019-08-02  本文已影响22人  Code_人生
        let ob = Observable.of(1,2,3,4,5)
        ob.observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
            .subscribe {
                print("observeOn",$0,Thread.current)
            }
        .disposed(by: disposeBag)
步骤1、代码改写

1.1 ob -> ObservableSequence<Array<Int>>
1.2 obOn -> ObserveOn<Int>
1.3 obOnSubscribe -> SinkDisposer

        let ob = Observable.of(1,2,3,4,5)
        let obOn = ob.observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
        let obOnSubscribe = obOn.subscribe {
            print("observeOn",$0,Thread.current)
        }
        let obOnSubscribeDisposed = obOnSubscribe.disposed(by: disposeBag)
步骤2、点击observeOn

2.1 ObservableSequence继承Producer,所以点击observeOn来到ObservableType协议的observeOn方法,如下
2.2 scheduler -> ConcurrentDispatchQueueScheduler
2.3 self.asObservable() -> ObservableSequence<Array<Int>>

extension ObservableType {
    public func observeOn(_ scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
            if let scheduler = scheduler as? SerialDispatchQueueScheduler {
                return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
            }
            else {
                return ObserveOn(source: self.asObservable(), scheduler: scheduler)
            }
    }
}
步骤3、点击ObserveOn(source: self.asObservable(), scheduler: scheduler)

3.1 ObserveOn 继承 Producer
3.2 初始化 ObserveOn
3.3 self.scheduler = scheduler 保存传进来的 ConcurrentDispatchQueueScheduler 2.2
3.4 self.source = source 保存传进来的 self.asObservable() 即 ObservableSequence<Array<Int>> 2.3

final private class ObserveOn<Element>: Producer<Element> {
    let scheduler: ImmediateSchedulerType
    let source: Observable<Element>

    init(source: Observable<Element>, scheduler: ImmediateSchedulerType) {
        self.scheduler = scheduler
        self.source = source

#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObserveOnSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }

#if TRACE_RESOURCES
    deinit {
        _ = Resources.decrementTotal()
    }
#endif
}
步骤4、外界.subscribe(on) 即ObserveOn.subscribe(on)

4.1 保存AnonymousObserver

extension ObservableType {
    public func subscribe(_ on: @escaping (Event<Element>) -> Void)
        -> Disposable {
            let observer = AnonymousObserver { e in
                on(e)
            }
            return self.asObservable().subscribe(observer)
    }
}
步骤5、 来到Producer 的 subscribe方法

5.1 CurrentThreadScheduler.isScheduleRequired 为 true,来到else ,执行CurrentThreadScheduler.instance.schedule 来到步骤6
5.2 去到步骤3中的run方法,ObservableSequence调用subscribe,把ObserveOnSink作为观察者传入进去,来到5.3
5.2 进入!CurrentThreadScheduler.isScheduleRequired 中,此时的self是 ObservableSequence

class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

    func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        rxAbstractMethod()
    }
}
步骤6 点击 schedule

6.1 schedule 方法
6.1.1 CurrentThreadScheduler.isScheduleRequired = false 置为false
6.1.2 let disposable = action(state) 执行闭包 回到5.1

public class CurrentThreadScheduler : ImmediateSchedulerType {
    typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>

    /// The singleton instance of the current thread scheduler.
    public static let instance = CurrentThreadScheduler()

    private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
        let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
        defer { key.deallocate() }
                                                               
        guard pthread_key_create(key, nil) == 0 else {
            rxFatalError("isScheduleRequired key creation failed")
        }

        return key.pointee
    }()

    private static var scheduleInProgressSentinel: UnsafeRawPointer = { () -> UnsafeRawPointer in
        return UnsafeRawPointer(UnsafeMutablePointer<Int>.allocate(capacity: 1))
    }()

    static var queue : ScheduleQueue? {
        get {
            return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
        }
        set {
            Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
        }
    }

    /// Gets a value that indicates whether the caller must call a `schedule` method.
    public static fileprivate(set) var isScheduleRequired: Bool {
        get {
            return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
        }
        set(isScheduleRequired) {
            if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
                rxFatalError("pthread_setspecific failed")
            }
        }
    }

    public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        if CurrentThreadScheduler.isScheduleRequired {
            CurrentThreadScheduler.isScheduleRequired = false

            let disposable = action(state)

            defer {
                CurrentThreadScheduler.isScheduleRequired = true
                CurrentThreadScheduler.queue = nil
            }

            guard let queue = CurrentThreadScheduler.queue else {
                return disposable
            }

            while let latest = queue.value.dequeue() {
                if latest.isDisposed {
                    continue
                }
                latest.invoke()
            }

            return disposable
        }

        let existingQueue = CurrentThreadScheduler.queue

        let queue: RxMutableBox<Queue<ScheduledItemType>>
        if let existingQueue = existingQueue {
            queue = existingQueue
        }
        else {
            queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
            CurrentThreadScheduler.queue = queue
        }

        let scheduledItem = ScheduledItem(action: action, state: state)
        queue.value.enqueue(scheduledItem)

        return scheduledItem
    }
}

步骤6 来到 ObservableSequence的run 方法

6.1 run方法中的observer -> <ObserveOnSink<AnonymousObserver<Int>>

final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
    fileprivate let _elements: Sequence
    fileprivate let _scheduler: ImmediateSchedulerType

    init(elements: Sequence, scheduler: ImmediateSchedulerType) {
        self._elements = elements
        self._scheduler = scheduler
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}
步骤7 来到ObservableSequenceSink的 run方法

7.1 self._parent -> ObservableSequence
7.2 self._parent._scheduler -> CurrentThreadScheduler
7.3 self._parent._element -> (1,2,3,4,5)
7.4 CurrentThreadScheduler类继承ImmediateSchedulerType协议
7.4 self._parent._scheduler.scheduleRecursive -> CurrentThreadScheduler.scheduleRecursive 来到 步骤8

final private class ObservableSequenceSink<Sequence: Swift.Sequence, Observer: ObserverType>: Sink<Observer> where Sequence.Element == Observer.Element {
    typealias Parent = ObservableSequence<Sequence>

    private let _parent: Parent

    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self._parent = parent
        super.init(observer: observer, cancel: cancel)
    }

    func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
            var mutableIterator = iterator
            if let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
}
步骤8 点击scheduleRecursive

8.1 self -> CurrentThreadScheduler
8.2 recursiveScheduler -> RecursiveImmediateScheduler
8.3 初始化 RecursiveImmediateScheduler
8.4 执行 schedule 来到 步骤9

extension ImmediateSchedulerType {
    public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
        let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
        
        recursiveScheduler.schedule(state)
        
        return Disposables.create(with: recursiveScheduler.dispose)
    }
}
步骤9 点击RecursiveImmediateScheduler

9.1 let d = self._scheduler.schedule(state),self._scheduler是CurrentThreadScheduler,即CurrentThreadScheduler执行schedule,来到步骤6.1

final class RecursiveImmediateScheduler<State> {
    typealias Action =  (_ state: State, _ recurse: (State) -> Void) -> Void
    private var _lock = SpinLock()
    private let _group = CompositeDisposable()
    private var _action: Action?
    private let _scheduler: ImmediateSchedulerType
    init(action: @escaping Action, scheduler: ImmediateSchedulerType) {
        self._action = action
        self._scheduler = scheduler
    }
    func schedule(_ state: State) {
        var scheduleState: ScheduleState = .initial
        let d = self._scheduler.schedule(state) { state -> Disposable in
            // best effort
            if self._group.isDisposed {
                return Disposables.create()
            }            
            let action = self._lock.calculateLocked { () -> Action? in
                switch scheduleState {
                case let .added(removeKey):
                    self._group.remove(for: removeKey)
                case .initial:
                    break
                case .done:
                    break
                }
                scheduleState = .done
                return self._action
            }
            if let action = action {
                action(state, self.schedule)
            }
            return Disposables.create()
        }
        self._lock.performLocked {
            switch scheduleState {
            case .added:
                rxFatalError("Invalid state")
            case .initial:
                if let removeKey = self._group.insert(d) {
                    scheduleState = .added(removeKey)
                }
                else {
                    scheduleState = .done
                }
            case .done:
                break
            }
        }
    }
    func dispose() {
        self._lock.performLocked {
            self._action = nil
        }
        self._group.dispose()
    }
}
步骤10 self.forwardOn(.next(next)) 此时的观察者是ObserveOnSink

10.1 来到 ObserveOnSink 的 onCore
10.2 self._scheduler -> ConcurrentDispatchQueueScheduler,ConcurrentDispatchQueueScheduler类继承SchedulerType协议,SchedulerType协议继承ImmediateSchedulerType协议

final private class ObserveOnSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
    typealias Element = Observer.Element 

    let _scheduler: ImmediateSchedulerType

    var _lock = SpinLock()
    let _observer: Observer

    // state
    var _state = ObserveOnState.stopped
    var _queue = Queue<Event<Element>>(capacity: 10)

    let _scheduleDisposable = SerialDisposable()
    let _cancel: Cancelable

    init(scheduler: ImmediateSchedulerType, observer: Observer, cancel: Cancelable) {
        self._scheduler = scheduler
        self._observer = observer
        self._cancel = cancel
    }

    override func onCore(_ event: Event<Element>) {
        let shouldStart = self._lock.calculateLocked { () -> Bool in
            self._queue.enqueue(event)

            switch self._state {
            case .stopped:
                self._state = .running
                return true
            case .running:
                return false
            }
        }

        if shouldStart {
            self._scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
        }
    }

    func run(_ state: (), _ recurse: (()) -> Void) {
        let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<Element>?, Observer) in
            if !self._queue.isEmpty {
                return (self._queue.dequeue(), self._observer)
            }
            else {
                self._state = .stopped
                return (nil, self._observer)
            }
        }

        if let nextEvent = nextEvent, !self._cancel.isDisposed {
            observer.on(nextEvent)
            if nextEvent.isStopEvent {
                self.dispose()
            }
        }
        else {
            return
        }

        let shouldContinue = self._shouldContinue_synchronized()

        if shouldContinue {
            recurse(())
        }
    }

    func _shouldContinue_synchronized() -> Bool {
        self._lock.lock(); defer { self._lock.unlock() } // {
            if !self._queue.isEmpty {
                return true
            }
            else {
                self._state = .stopped
                return false
            }
        // }
    }

    override func dispose() {
        super.dispose()

        self._cancel.dispose()
        self._scheduleDisposable.dispose()
    }
}

有点晕了,先放一放

上一篇 下一篇

猜你喜欢

热点阅读