RXSwift

RxSwift---核心逻辑(二)

2023-04-21  本文已影响0人  会骑车的iOSer

在上一章中我们初步的领略了RxSwift的魅力,那么这一章我们就来一起探索RxSwift的核心逻辑

RxSwift的核心流程

RxSwift的使用其实就分为三步,也是我们通常讲的三步曲

        //1.创建序列
        let ob = Observable<Any>.create { observer in
            //3.发送信号
            observer.onNext("Rx发送信号")
            return Disposables.create()  // 这个销毁不影响我们这次的解读
        }
        
        //2.订阅信号
        ob.subscribe { text in
            print("订阅到了:\(text)")
        } onError: { error in
            print("error:\(error)")
        } onCompleted: {
            print("done")
        } onDisposed: {
            print("销毁")
        }.disposed(by: disposebag)

//打印结果:订阅到了:Rx发送信号

哎~ :为什么我们的订阅到了:Rx发送信号这个字符串会在订阅序列的subscribe的闭包打印,下面是代码分析

代码分析

猜测:这样就不能猜出要想把 observer.onNext("Rx发送信号")中的信号在闭包B中打印出来,在subscribe中必然调用了onNext,接下来我们去验证这个猜测是否正确

RxSwift的核心逻辑

创建序列

extension ObservableType {
    // MARK: create
    public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        AnonymousObservable(subscribe)
    }
}

作者通过协议的扩展创建了一个匿名的可观察序列(AnonymousObservable) ,AnonymousObservable是一个内部类,是一个很好设计模式,只返回外部所需要的,至于内部的操作对外部完全匿名,下面我们看下这个类的继承关系

AnonymousObservable继承关系
final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        //保存外部的闭包
        self.subscribeHandler = subscribeHandler
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

订阅序列

我们通过subscribe点进源码

public func subscribe(
        onNext: ((Element) -> Void)? = nil,
        onError: ((Swift.Error) -> Void)? = nil,
        onCompleted: (() -> Void)? = nil,
        onDisposed: (() -> Void)? = nil
    ) -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<Element> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
class Producer<Element>: Observable<Element> {
//为了方便我们理解,这里省略了一些不必要的代码 ...
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
        //直接省略...
            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
                return disposer
            }
        }
    }
}
final private class AnonymousObservable<Element>: Producer<Element> {
    //这里同样省略了些代码...

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    //这里同样省略了些代码...

    func run(_ parent: Parent) -> Disposable {
        parent.subscribeHandler(AnyObserver(self))
    }
}
public struct AnyObserver<Element> : ObserverType {
        //这里同样省略了些代码...

    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }

    //这里同样省略了些代码...
}

在这个构造方法里面,我们创建了一个结构体 AnyObserver 保存了一个信息 AnonymousObservableSink.on 函数,不是 AnonymousObservableSink,,很重要~!!!!!!!!

发送响应

根据我们上面的分析:

observer.onNext("Rx发送信号")其实就等于AnyObserver ().onNext("Rx发送信号")
这时发现AnyObserver没有这个方法,不过按照作者的思路,我们在其父类中寻找,或者协议中寻找

extension ObserverType {
    
    /// Convenience method equivalent to `on(.next(element: Element))`
    ///
    /// - parameter element: Next element to send to observer(s)
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    
    /// Convenience method equivalent to `on(.completed)`
    public func onCompleted() {
        self.on(.completed)
    }
    
    /// Convenience method equivalent to `on(.error(Swift.Error))`
    /// - parameter error: Swift.Error to send to observer(s)
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

外界 observer.onNext("Rx发送信号") 再次变形 :AnyObserver.on(.next("Rx发送信号")) ,这里一定要主要,这个AnyObserver调用了 on 里面传的是 .next函数, .next函数带有我们最终的参数

public struct AnyObserver<Element> : ObserverType {
    /// Anonymous event handler type.
    public typealias EventHandler = (Event<Element>) -> Void

    private let observer: EventHandler

    /// Construct an instance whose `on(event)` calls `eventHandler(event)`
    ///
    /// - parameter eventHandler: Event handler that observes sequences events.
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }
    
    /// Send `event` to this observer.
    ///
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
        self.observer(event)
    }

    /// Erases type of observer and returns canonical observer.
    ///
    /// - returns: type erased observer.
    public func asObserver() -> AnyObserver<Element> {
        self
    }
}

总结:

1:序列概念 万物皆序列(编码统一) ,随时随地享用
2:通过函数式思想吧一些列的需求操作下沉(把开发者不关心的东西封装) - 优化代码,节省逻辑


Rxswift核心流程图 RxSwift核心详解.png

最后一句:创作不易,如果对各位大佬有帮助,请留下你们的小星星✨

上一篇 下一篇

猜你喜欢

热点阅读