Swift学习

RxSwift核心逻辑的理解

2019-07-29  本文已影响211人  iOS猿_员

原文地址:https://www.jianshu.com/p/0e0703466483

作为ReactiveX家族之一的RxSwiftGithub截止现在Star:16K.为什么这个框架如此受欢迎,作为函数响应式框架典型代表,底层实现又是如何实现的呢?这一篇文章全面解密

RxSwift核心流程

RxSwift这个优秀的框架,设计的api也是非常精简,让陌生的用户也能非常快速上手

// 1: 创建序列
_ = Observable<String>.create { (obserber) -> Disposable in
    // 3:发送信号
    obserber.onNext("Cooci -  框架班级")
    return Disposables.create()  // 这个销毁不影响我们这次的解读
    // 2: 订阅序列
    }.subscribe(onNext: { (text) in
        print("订阅到:\(text)")
    })

// 控制台打印:“订阅到:Cooci -  框架班级”

我刚开始在探索的时候,我是比较好奇的:为什么我们的Cooci - 框架班级这个字符串会在订阅序列的subscribe的闭包打印。下面是我的代码分析

分析代码:

PS: 说实话 RxSwift框架的源码的确比较复杂并且很多,很多基础薄弱或者耐性不够的小伙伴很容易放弃。但是你看到这篇博客,你有福了:我会快速简短给你介绍,在最后面会附上我绘制的思维导图!

RxSwift核心逻辑

创建序列

extension ObservableType {
    // MARK: create
    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }
}

大家可以很清晰看到我们的 可观察序列 的创建是利用协议拓展功能的create方法实现的,里面创建了一个 AnonymousObservable(匿名可观察序列) 命名还是体现了作者的思维 :这个类就是一个内部类,具备一些通用特性(具有自己功能的类才会命名) 下面我贴出这个类的继承关系

从上面的图,我们可以清晰的看到的继承关系。那么这么多的内容还有那么多层嵌套,这个地方我们需要掌握什么:

订阅序列

这里说明这个订阅方法 subscribe 和我们上面所说的 subscribe 不是同一个方法

来自于对 ObservableType 的拓展功能

extension ObservableType {
    public func subscribe(onNext: ((E) -> Void)? = nil, ...) -> Disposable {
           // 因为篇幅 省略不影响我们探索的代码
            let observer = AnonymousObserver<E> { event in                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
}

代码说明:

public class Observable<Element> : ObservableType {
    /// Type of elements in sequence.
    public typealias E = Element

override func subscribe(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // 篇幅原因,我们省略一些代码,方便我们理解
            ...
            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
              // 篇幅原因,我们省略一些代码,方便我们理解
              ...
                return disposer
            }
        }
    }

override func run (...) {
    let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
    let subscription = sink.run(self)
    return (sink: sink, subscription: subscription)
}

func run(_ parent: Parent) -> Disposable {
    return parent._subscribeHandler(AnyObserver(self))
}

public init<O : ObserverType>(_ observer: O) where O.E == Element {
    self.observer = observer.on
}

发送响应

我们从上面的分析,非常清晰:
obserber.onNext("Cooci - 框架班级") 的本质是: AnyObserver.onNext("Cooci - 框架班级")

这时候发现我们的AnyObserver 是没有这个方法,这很正常!一般思路,找父类,找协议

extension ObserverType {
    public func onNext(_ element: E) {
        self.on(.next(element))
    }
}

public struct AnyObserver<Element> : ObserverType {
    public init<O : ObserverType>(_ observer: O) where O.E == Element {
        self.observer = observer.on
    }
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
}

class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    func on(_ event: Event<E>) {
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
}

class Sink<O : ObserverType> : Disposable {
    final func forwardOn(_ event: Event<O.E>) {
        if isFlagSet(self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }
}

let observer = AnonymousObserver<E> { event in
    switch event {
    case .next(let value):
        onNext?(value)
    case .error(let error):
        if let onError = onError {
            onError(error)
        }
        else {
            Hooks.defaultErrorHandler(callStack, error)
        }
        disposable.dispose()
    case .completed:
        onCompleted?()
        disposable.dispose()
    }
}

总结:RxSwift的结构

对于RxSwift,有更好的见解,想要更好的探讨,可以进入iOS技术群,一起探讨交流

上一篇 下一篇

猜你喜欢

热点阅读