走进 RxSwift 之观察者模式
RxSwift简介和吐槽
RxSwift 是 ReactiveX 系列的 Swift 版本,如果你之前用过 ReactiveCocoa(RAC) 的话,想必对 Functional Reactive Programming(FRP,函数响应式编程)这个概念不会陌生,是的,RxSwift 同样是一个 FRP 框架。值得一提的是,RAC 的README 里有这么几句话:
ReactiveCocoa was originally inspired, and therefore heavily influenced, by Microsoft’s Reactive Extensions (Rx) library.
Although ReactiveCocoa was started as an Objective-C framework, as of version 3.0, all major feature development is concentrated on the Swift API.
第一句是说,RAC 是受微软的 Reactive Extensions 启发的,所以也受了 Reactive Extensions 很大的影响(当然,之后它罗列了 RAC 跟 Rx 的一些差别,并且安利 iOS 开发者来使用 RAC)。第二句是说,虽然 RAC 是作为一个 OC 框架出道的,但是从3.0版本开始,所有主要特性的开发都已经以 Swift API 为重心了。也就是说,今后不管是不是下雨天,RAC 都跟 Swift 更配哦。
如果你是一个使用 Swift 的 iOS 开发者,并且对无处不在的 OOP 感到了厌倦,想要打开新世界的大门看看的话,这两个框架都是可以选择的。不过由于我感兴趣的是框架的具体实现,相比于 OC 我又更喜欢 Swfit,所以挑了纯 Swift 实现的 RxSwift 来看。
An API for asynchronous programming
with observable streams
上面这句话来自 Rx 的官网,看到streams
我立马就想到了《 SICP》第三章的“流”,加之 Swift 对函数式编程的支持又很好,所以我原以为 RxSwift 的内部实现会用延迟的表来作为信号流,用流来表示某个对象顺序状态的时间史,这样一切都是函数,没有状态变化,也就不需要同步机制来保证线程安全了。事实证明我还是图样!RxSwift 内部还是有各种类各种继承,当然也有各种同步机制:自旋锁、递归锁、原子操作……说好的 functional 呢?只有暴露给使用者的 API 是functional 么?这一开始让我有些失望,不过后来发现整个框架还是用到了大量函数式特性的,只是不像我所想的那么纯粹(一个 pure functional 的框架大概也很难真正流行起来……)。
好了吐槽完毕,我们再看一句官网的介绍:
ReactiveX is a combination of the best ideas from
the Observer pattern, the Iterator pattern, and functional programming
这句话的意思是说 Rx 是结合了观察者模式、迭代器模式和函数式编程这三种最佳思维的产物。虽然它没有如我所想用纯函数式的代码实现,不过用到了“流”的思想倒也是实实在在的。目前,我只看了一小部分代码,大致清楚了观察者模式部分的实现,下面就跟大家分享一下。
Observable 和 Observer
RxSwift 项目内部有个 Rx.playground,在介绍页面上有这么一句话:
The key to understanding RxSwift is in understanding the notion of Observables. Creating them, manipulating them, and subscribing to them in order to react to changes.
这句话是说,理解 RxSwfit 的关键是理解“被观察者”这个概念,创造它们,操纵它们,然后订阅它们来响应变化。Observable
的重要性可见一斑。让我们来看一个使用Observable
的实例:
empty
empty creates an empty sequence. The only message it sends is the .Completed message.
介绍了一个 empty 函数,它会创建一个空的 sequence(翻译成序列的话总感觉会引起误会),这个 sequence 只会发送 .Completed 这一个消息,示例代码如下:
let emptySequence: Observable<Int> = empty()
let subscription = emptySequence
.subscribe { event in
print(event)
}
上述代码通过empty
函数得到了一个Observable<Int>
,好现在去看看empty
:
public func empty<E>() -> Observable<E> {
return Empty<E>()
}
果然是 OOP 外面套 FP 的皮啊,没有这个 empty 函数我们照样可以直接let emptySequence = Empty<Int>()
来得到一个Observable<Int>
嘛,那现在就去看看这个Empty
是个什么鬼:
class Empty<Element> : Producer<Element> {
override init() {
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
observer.on(.Completed)
return NopDisposable.instance
}
}
乍一看这个类还是比较容易懵的。这个空构造器是什么意思?好吧大概是为了初始化的时候避免调用父类构造器,就是确保什么都不做。然后下面这个 run 函数就令人费解了,这一堆参数,还有这个Disposable
是什么?其实如果是写过 C# 的朋友,一定觉得这个Disposable
非常熟悉,没错,它是一个协议(似乎微软系的接口比较喜欢用形容词,用able结尾的很多),跟 C# 中用来显式释放资源的IDisposable
接口类似:
/**
类似 C# 中的 IDisposable 接口,用来释放资源。
由于 Swift 使用 ARC,所以 dispose 方法大部分时候只是取消对某个资源的引用,
譬如 resource = nil
*/
public protocol Disposable {
/**
Dispose resource.
*/
func dispose()
}
由于这篇文章重点在于观察者模式,所以我想先放下Disposable
相关的东西不谈,因为涉及资源的保存释放有一些线程相关的操作,挺麻烦的,但这些跟观察者模式并没有什么关系。基于此,我把 RxSwfit 中跟empty
、just
相关的一些类稍微简化了一下,去掉了Disposable
相关的一些内容,然后加了点注释,放到一起之后empty
、just
这几个例子还是都能正常运行。
好的,简化后Empty
类变成了这样:
class Empty<Element> : Producer<Element> {
override func run<O : ObserverType where O.E == Element>(observer: O) {
// 观察者订阅了一个完成信号
observer.on(.Completed)
}
}
好,我们已经得到一个Empty
的实例,接下来我们要调用它的subscribe
方法,这个subscribe
方法的参数类型是(Event<E>) -> Void
,是一个闭包类型。我们在ObservableType
协议的扩展里找到了符合条件的subscribe
方法:
extension ObservableType {
func subscribe(on: (event: Event<E>) -> Void) {
// 构造一个匿名观察者,把参数 on 赋值给这个匿名观察者的 eventHandler,
// 相当于 let observer = AnonymousObserver(on)
let observer = AnonymousObserver { e in
on(event: e)
}
self.subscribeSafe(observer)
}
subscribe
方法接受了闭包之后,先构造了一个匿名观察者,event
这个闭包作为构造器的参数传给了observer
。看一下AnonymousObserver
:
class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = Event<Element> -> Void
private let eventHandler : EventHandler
init(_ eventHandler: EventHandler) {
// 资源情况追踪(为了开发期解决内存泄漏问题吧)
#if TRACE_RESOURCES
// 原子操作:resourceCount 加1
OSAtomicIncrement32(&resourceCount)
#endif
self.eventHandler = eventHandler
}
// onCore 会被 on 调用(on 继承自父类)
override func onCore(event: Event<Element>) {
return self.eventHandler(event)
}
#if TRACE_RESOURCES
deinit {
// 原子操作:resourceCount 减1
OSAtomicDecrement32(&resourceCount)
}
#endif
}
忽略追踪内存情况的代码不看,这个类主要就是在构造器中接受一个闭包,然后赋值给私有属性eventHandler
,然后在onCore
方法中,eventHandler
会被调用。可是我们之前看Empty
类的时候已经知道,观察者的on
方法会在run
中被调用,并不是这个onCore
啊,看来还得到父类ObserverBase
中看看:
class ObserverBase<ElementType>: ObserverType {
typealias E = ElementType
var isStopped: Int32 = 0
init() {
}
func on(event: Event<E>) {
switch event {
case .Next:
if isStopped == 0 {
onCore(event)
}
// 一旦出现一次 Error 或 Completed 事件,之后也不会再执行 onCore 了
case .Error, .Completed:
// OSAtomicCompareAndSwap32:比较和交换的原子操作,如果 isStopped == 0,则 isStoppend = 1,返回 true,否则返回 false
if !OSAtomicCompareAndSwap32(0, 1, &isStopped) {
return
}
onCore(event)
}
}
// 会在子类中重写
func onCore(event: Event<E>) {
abstractMethod()
}
}
好了,这下清楚了,onCore
会被on
调用。回到subscribe
中继续往下走,得到了observer
这个实例之后,它将会一路被作为参数传递。先是调用self.subscribeSafe(observer)
,observer
被传递给subscribeSafe
方法,这个方法同样在ObserverType
的extension
中:
func subscribeSafe<O: ObserverType where O.E == E>(observer: O) {
// 会调用被子类实现的的 subscribe 方法
self.subscribe(observer)
}
在subscribeSafe
中最后又会调用subscribe
方法,不过这个subscribe
的参数是ObserverType
的实现类,不是闭包,所以这是一个重载方法。它的声明在协议ObservableType
中:
protocol ObservableType {
/**
hack: 因为 Swift 中没有范型协议,只能在协议中声明一个别名,
然后将实现类声明为范型类,再将传入的范型名命名为 E(如 typealias E = Element)
在接受范型参数的地方这样使用:
func demo<O : ObservableType where O.E == Int>(ob: O)
大致与 C# 中 void demo(ObservableType<int> ob) 作用相同
*/
typealias E
func subscribe<O: ObserverType where O.E == E>(observer: O)
}
我们发现这个方法没有出现在Empty
类中,只能沿着Empty
的继承树往上找,在Empty
的父类Producer
中可以找到它的实现:
class Producer<Element> : Observable<Element> {
// 会被 ObserverType 的 extension 方法 subscribeSafe 调用
override func subscribe<O : ObserverType where O.E == E>(observer: O) {
// 会有一些关于资源释放以及线程相关的操作
// ……
run(observer)
}
func run<O : ObserverType where O.E == Element>(observer: O) {
abstractMethod()
}
}
subscribe
方法会调用run
方法,但是这个run
方法里面调用了abstractMethod
,我们来看看它是什么:
@noreturn func abstractMethod() -> Void {
fatalError("Abstract method")
}
一旦调用这个方法就会触发致命错误fatalError
,所以run
必须被子类重写,否则程序会终止。我猜是因为 Swift 中没有抽象类和抽象方法的概念,不能在函数前加 abstract 强制子类重写该方法,只能用这种不重写就终止的方式来模拟抽象方法。既然这样我们就来看看子类中的run
方法:
class Empty<Element> : Producer<Element> {
// run 会在父类中被 subscribe 方法调用
override func run<O : ObserverType where O.E == Element>(observer: O) {
// 观察者订阅了一个完成信号
observer.on(.Completed)
}
}
class Just<Element>: Producer<Element> {
let element: Element
init(element: Element) {
self.element = element
}
override func run<O : ObserverType where O.E == Element>(observer: O) {
observer.on(.Next(element))
observer.on(.Completed)
}
}
如上是Empty
和Just
的两个run
实现,在Empty
中,会调用传递过来的observer
的on
方法一次,并将.Completed
作为参数。我们知道这个on
方法其实就是最开始我们调用subscribe
方法时传入的那个闭包,即{event in print(event)}
,所以最后就会打印出.Completed
。至于这个.Completed
么,显然是个枚举,它是一个Event
类型:
enum Event<Element> {
case Next(Element)
case Error(ErrorType)
case Completed
}
而Just
的初始化函数会接受一个值并将其赋值给实例属性element
,然后调用run
方法的时候,会调用传递过来的observer
的on
方法两次,一次以.Next(element)
为参数,一次以.Completed
为参数表示结束。就像这样:
// MARK: - 调用
print("just observable demo:")
let justObservable = just(1)
justObservable.subscribe { event in
print(event)
}
print("----")
print("empty observable demo:")
let emptyObservable: Observable<Int> = empty()
emptyObservable.subscribe { event in
print(event)
}
输出:
just observable demo:
Next(1)
Completed
----
empty observable demo:
Completed
有点绕对不对,主要是因为继承太多,很多方法都要到父类中去找。我简化后的版本在这里,可能我说这么多还不如大家自己 clone 下来看一眼来得明白。
小结
因为代码只看了个开头,所以我暂时还不能理解 RxSwift 中继承层级这么多的必要性。主要这毕竟是个重型的框架,我必须读一点记录一点,不然看到后面就忘了前面。要说目前为止有什么收获么,大抵是如下几点:
- 观察者模式的 Swift 实现。
- 借助 typealias 模拟范型协议的具体做法。
- 借助 fatalError 模拟抽象方法的具体做法。
- 自旋锁、递归锁和两种原子操作的用法。