RxSwift---核心逻辑(二)
在上一章中我们初步的领略了RxSwift的魅力,那么这一章我们就来一起探索RxSwift的核心逻辑
RxSwift的核心流程
RxSwift的使用其实就分为三步,也是我们通常讲的三步曲
- 1.创建序列
- 2.订阅序列
- 3.发送信号
//1.创建序列
let ob = Observable<Any>.create { observer in
//3.发送信号
observer.onNext("Rx发送信号")
return Disposables.create() // 这个销毁不影响我们这次的解读
}
//2.订阅信号
ob.subscribe { text in
print("订阅到了:\(text)")
} onError: { error in
print("error:\(error)")
} onCompleted: {
print("done")
} onDisposed: {
print("销毁")
}.disposed(by: disposebag)
//打印结果:订阅到了:Rx发送信号
哎~ :为什么我们的订阅到了:Rx发送信号这个字符串会在订阅序列的subscribe的闭包打印,下面是代码分析
代码分析
- 1:代码的执行顺序是 1 -> 2 -> 3
- 2:整个代码只有一个输入口
observer.onNext("Rx发送信号") - 3:在1中创建了一个
闭包A里面有3:发送信号,如果执行发送信号必然要来到闭包A中 - 4:执行2:订阅 创建了
闭包B - 5:通过打印结果知道:先执行
闭包A然后传给了闭包B
猜测:这样就不能猜出要想把 observer.onNext("Rx发送信号")中的信号在闭包B中打印出来,在subscribe中必然调用了onNext,接下来我们去验证这个猜测是否正确
RxSwift的核心逻辑
创建序列
extension ObservableType {
// MARK: create
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
AnonymousObservable(subscribe)
}
}
作者通过协议的扩展创建了一个匿名的可观察序列(AnonymousObservable) ,AnonymousObservable是一个内部类,是一个很好设计模式,只返回外部所需要的,至于内部的操作对外部完全匿名,下面我们看下这个类的继承关系
AnonymousObservable继承关系
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
//保存外部的闭包
self.subscribeHandler = subscribeHandler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
-
create创建了一个匿名对象AnonymousObservable
-
-
AnonymousObservable匿名对象保存了外部的闭包
-
-
AnonymousObservable继承自Producer,Producer有一个很重要的方法subscribe------ (函数式编程思想)
-
订阅序列
我们通过subscribe点进源码
public func subscribe(
onNext: ((Element) -> Void)? = nil,
onError: ((Swift.Error) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onDisposed: (() -> Void)? = nil
) -> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
- 1.
subscribe方法中传入了onNext,onError,onCompleted以及onDisposed4个闭包,并且返回Disposable 对象 - 2.
let observer = AnonymousObserver<Element>创建了一个AnonymousObserver匿名的观察者对象,并且把这里传入的闭包给保存起来 - 3.通过
Disposables.create创建了一个销毁者并且返回 - 4.注意⚠️
self.asObservable().subscribe(observer),这里的self.asObservable()其实就是外部的我们通过create方法创建的ob序列,这个是我们的RxSwift为了保持一致性的写法,subscribe方法就是创建序列流程中我们看到的Producer的subscribe方法,并且把我们创建的AnonymousObserver<Element>对象observer传入了进去
class Producer<Element>: Observable<Element> {
//为了方便我们理解,这里省略了一些不必要的代码 ...
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
//直接省略...
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
}
- 1.关于销毁的内容在这里不做分析,只关注与我们三步曲中的关键步骤
- 2.
self.run这个代码最终由Producer延伸到我们具体的事务代码AnonymousObservable.run
final private class AnonymousObservable<Element>: Producer<Element> {
//这里同样省略了些代码...
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
- 1.
let sink = AnonymousObservableSink()声明了一个sink对象 - 2.
sink.run(self)这个写法很好,把业务进行向下分发,使分工更加的明确
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
//这里同样省略了些代码...
func run(_ parent: Parent) -> Disposable {
parent.subscribeHandler(AnyObserver(self))
}
}
- 1.
parent是上一步传入的self->AnonymousObservable对象 - 2.我们非常兴奋的看到
AnonymousObservable.subscribeHandler,从这句代码我们解惑了为什么我们的序列订阅的时候流程会执行我们序列闭包,然后去执行发送响应 - 3.这里还有一个比较重要的东西
AnyObserver(self)
public struct AnyObserver<Element> : ObserverType {
//这里同样省略了些代码...
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.observer = observer.on
}
//这里同样省略了些代码...
}
在这个构造方法里面,我们创建了一个结构体 AnyObserver 保存了一个信息 AnonymousObservableSink.on 函数,不是 AnonymousObservableSink,,很重要~!!!!!!!!
发送响应
根据我们上面的分析:
observer.onNext("Rx发送信号")其实就等于AnyObserver ().onNext("Rx发送信号")
这时发现AnyObserver没有这个方法,不过按照作者的思路,我们在其父类中寻找,或者协议中寻找
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: Element))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: Element) {
self.on(.next(element))
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
self.on(.completed)
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
外界 observer.onNext("Rx发送信号") 再次变形 :AnyObserver.on(.next("Rx发送信号")) ,这里一定要主要,这个AnyObserver调用了 on 里面传的是 .next函数, .next函数带有我们最终的参数
public struct AnyObserver<Element> : ObserverType {
/// Anonymous event handler type.
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
/// Construct an instance whose `on(event)` calls `eventHandler(event)`
///
/// - parameter eventHandler: Event handler that observes sequences events.
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
/// Construct an instance whose `on(event)` calls `observer.on(event)`
///
/// - parameter observer: Observer that receives sequence events.
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.observer = observer.on
}
/// Send `event` to this observer.
///
/// - parameter event: Event instance.
public func on(_ event: Event<Element>) {
self.observer(event)
}
/// Erases type of observer and returns canonical observer.
///
/// - returns: type erased observer.
public func asObserver() -> AnyObserver<Element> {
self
}
}
- 我们分析过
self.observer = observer.on=self.observer = AnonymousObservableSink.on,那么这里的self.observer(event)=AnonymousObservableSink.on(event),其中event = .next("Rx发送信号"),看到这里不禁对RxSwift说一句 “卧槽~牛逼!”
总结:
1:序列概念 万物皆序列(编码统一) ,随时随地享用
2:通过函数式思想吧一些列的需求操作下沉(把开发者不关心的东西封装) - 优化代码,节省逻辑
Rxswift核心流程图
RxSwift核心详解.png
最后一句:创作不易,如果对各位大佬有帮助,请留下你们的小星星✨