RxSwift Observable可观察序列
squence序列
- 有限序列
let ob = Observable.just([1, 2, 3, 4])
ob.subscribe(onNext: { (num) in
print(num)
}, onCompleted: {
print("完成")
})
/*
输出结果:
[1, 2, 3, 4]
完成
*/
- 无限序列
URLSession.shared.rx.response(request: URLRequest.init(url: URL.init(string: "http://www.baidu.xx")!))
.subscribe(onNext: { (response, data) in
}, onError: { (error) in
print(error)
})
/*
输出结果:
错误,Error Domain=NSURLErrorDomain Code=-1022 "The resource could not be loaded because the App Transport Security policy requires the use of a secure connection."
*/
核心逻辑
- 序列的产生
- 序列的订阅
- 序列的销毁
//1、创建序列
let ob = Observable<Any>.create { (observer) -> Disposable in
//3、发送序列
observer.onNext("订阅中心发送消息")
//complete和error不会同时发生
// observer.onCompleted()
observer.onError(NSError.init(domain: "error", code: 1002, userInfo: nil))
return Disposables.create()
}
//2、订阅序列
let _ = ob.subscribe(onNext: { (text) in
print("订阅到:\(text)")
}, onError: { (error) in
print("error:\(error)")
}, onCompleted: {
print("completed")
}) {
print("disposed")
}
响应式核心逻辑
类似于UIControl类调用addTarget响应#selector定义的事件,RxSwift将响应事件留在rx内部类处理,在内部类由订阅中心执行onNext发送消息,消息传送到subscribe接收
代码执行顺序
-
Observable<Any>.create
创建一个闭包 -
ob.subscribe
,需要取得text值才能继续下一步闭包内打印 - 执行
observer.onNext("订阅中心发送消息")
取text的值 - 返回text的值执行
print("订阅到:\(text)")
,打印结果
源码分析
RxSwift+Observable- create方法
create方法声明,从注释和查找得到create方法在Create类中实现
/**
Creates an observable sequence from a specified subscribe method implementation.
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.Element>) -> Disposable) -> RxSwift.Observable<Self.Element>
create方法实现,从RxSwift源码查找Create类的create方法实现,create方法返回了一个带subscribe参数的AnonymousObservable匿名可观察序列
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
AnonymousObservable匿名内部类,继承于Producer;
内部类执行init初始化,_subscribeHandler保存create闭包,使用函数式编程,可以在使用的地方随时调用_subscribeHandler
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
//init初始化
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler //保存闭包
}
//run方法,Producer类内部调用子类AnonymousObservable的run方法,执行sink.run(self)
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
sink.run实现,AnonymousObservableSink拥有了订阅者、销毁者、序列+调度环境,调用了_subscribeHandler将执行create闭包内代码
typealias Parent = AnonymousObservable<Element>
//parent为AnonymousObservable的别名创建的对象
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
AnyObserver实现,AnyObserver是一个结构体,执行init方法,调用observer.on,observer是AnonymousObservableSink类型
public struct AnyObserver<Element> : ObserverType {
...
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.observer = observer.on
}
...
}
observer.on的实现,on方法的代码块会保存在observer函数中,然后observer函数调用ObserverType协议的onNext方法执行发送,其中forwardOn调用了observer.on,observer是AnonymousObserver订阅者,on方法传递发送的消息到订阅闭包里
func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
onNext方法的实现,内部调用了on方法,也就是sink.on方法
public func onNext(_ element: Element) {
self.on(.next(element))
}
- subscribe方法
subscribe方法声明,Observable继承于ObservableType协议,需要找协议里的subscribe方法实现
public func subscribe(onNext: ((Self.Element) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable
subscribe方法实现,创建了AnonymousObserver,参数是闭包,event类型决定执行闭包里的对应方法->next、error、complete,最终返回销毁者,Disposables闭包里将self转化成序列执行subscribe
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable { //销毁
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
//调用堆栈
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
//创建AnonymousObserver,参数是闭包,包含next、error、complete信息
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
//返回销毁者,其中回调subscribe(observer),subscribe是在AnonymousObservable的父类Producer中实现
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
Producer实现,继承于Observable,内部执行了子类AnonymousObservable 的run方法
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
}
AnonymousObserver类的实现,继承于ObserverBase,类内部初始化创建了_eventHandler保存所有事件->next、error、complete信息
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
//初始化,eventHandler保存所有事件
init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()//计数增加
#endif
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()//计数减少
}
#endif
}
ObserverBase类的实现,继承于ObserverType协议,定义了on、onCore、dispose方法供子类实现,on可以发送next、error、completed信号
swift是强类型语言,协议里定义由具体类来赋值属性实现方法以确定类型和具体实现方法
class ObserverBase<Element> : Disposable, ObserverType {
private let _isStopped = AtomicInt(0)
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
func onCore(_ event: Event<Element>) {
rxAbstractMethod()
}
func dispose() {
fetchOr(self._isStopped, 1)
}
}
核心逻辑