RxSwift_核心原理

2019-08-16  本文已影响0人  LeeWkai

基本流程

  1. 创建序列
  2. 订阅序列
  3. 发送信号
       // 1:创建序列
        let observable = Observable<Any>.create { (obserber) -> Disposable in
            // 3:发送信号
            obserber.onNext("发送信号")
            obserber.onCompleted()
            return Disposables.create()
        }
        
        // 2:订阅信号
        _ = observable.subscribe { (event) in
            print(event)
        }

带着问题去思考底层的实现

屏幕快照 2019-08-15 下午4.33.45.png

涉及到的几个主要的类的继承关系

屏幕快照 2019-08-15 下午4.37.20.png

流程分析

RxSwift核心流程.png

源码分析

啰说一句

  1. RxSwift中经常会用父类声明方法,子类extension重写,传入当前子类的数据OC里没这么玩的
  2. 看到的subscribe不一定是当前类中的方法,也可能调的父类,父父类,父父父类已经习惯了的当我没说

当前流程所在文件Create.swift

extension ObservableType {
    public func subscribe(onNext: ((E) -> Void)? = nil, ...) -> Disposable {
    .
    .
    .
            let observer = AnonymousObserver<E> { event in                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
}

中间的流程是最后的外界闭包调用还没到

Disposables.create()是RxSwift的自己的销毁机制,不用管,先看内部

当前流程所在文件Producer.swift

override func subscribe(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            ·
            ·
            ·
            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
              ·
              ·
              ·
                return disposer
            }
        }
    }

当前流程所在文件Create.swift

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
 ·
 ·
 ·
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    ·
    ·
    ·

    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}

parent 是传过来的AnonymousObservable对象,AnonymousObservable._subscribeHandler()完成了对外部生成序列时代码块的调用 create和subscribe怎么关联的问题
然后去执行 发送响应,回到最外部 3:送onNext()信号

let observable = Observable<Any>.create { (obserber) -> Disposable in
            // 3:发送信号
            obserber.onNext("发送信号")
            obserber.onCompleted()
            return Disposables.create()
        }

当前流程所在文件Create.swift 再次回来

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    ·
    ·
    ·

    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}

当前流程所在文件AnyObserver

public struct AnyObserver<Element> : ObserverType {
    ·
    ·
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    public init<O : ObserverType>(_ observer: O) where O.E == Element {
        self.observer = observer.on
    }
    ·
    ·
}

当obserber.onNext("发送信号")就会找到AnyObserver父类

let observable = Observable<Any>.create { (obserber) -> Disposable in
            // 3:发送信号
            obserber.onNext("发送信号")
            obserber.onCompleted()
            return Disposables.create()
        }

当前流程所在文件ObserverType.swift

public protocol ObserverType {
    associatedtype E
    func on(_ event: Event<E>)
}
extension ObserverType {
    public func onNext(_ element: E) {
        self.on(.next(element))
    }
    public func onCompleted() {
        self.on(.completed)
    }
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

当前流程所在文件Create.swift

class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    func on(_ event: Event<E>) {
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
}

当前流程所在文件Sink.swift

class Sink<O : ObserverType> : Disposable {
    ·
    ·
    final func forwardOn(_ event: Event<O.E>) {
        if isFlagSet(self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }
}

当前流程所在文件Create.swift

let observer = AnonymousObserver<E> { event in
    switch event {
    case .next(let value):
        onNext?(value)
    case .error(let error):
        if let onError = onError {
            onError(error)
        }
        else {
            Hooks.defaultErrorHandler(callStack, error)
        }
        disposable.dispose()
    case .completed:
        onCompleted?()
        disposable.dispose()
    }
}
  // 2:订阅信号
        _ = observable.subscribe { (event) in
            print(event)
        }
上一篇下一篇

猜你喜欢

热点阅读