【领略RxSwift源码】- 订阅的工作流(Subscribin
开篇
一直觉得自己似乎越来越浮躁了,可能当代的大多数年轻人都活在恐慌里,问题已经从小时候的不思进取变成了“太思进取”,似乎总是想不管什么投入都能立竿见影。但是很多时候总是事与愿违,所以这次我想试着静下心来,读一读RxSwift的源码,希望不要Give up halfway
吧。
关于RxSwift,作为ReactiveX的成员框架之一,它有着血脉相承的语法,语言和哲学上的一致性使得即便之后转向其他的平台我们也能很快的上手Rx。其次,通过阅读源码,我们也可以看到,其实有的时候大神的代码也没有那么的Clean和Prefect ,我们也可以在一些地方看到妥协和失误。
本篇目标
本篇的目标就是了解下面这段代码(来自Rx.playgroud)的实现:
example("just") {
let disposeBag = DisposeBag()
Observable.just("🔴")
.subscribe { event in
print(event)
}
.disposed(by: disposeBag)
}
这寥寥数行代码,我想但凡RxSwift入门的同学都知道它的用处:创建一个单值的可观察序列,并且打印出它的所有序列。恩...没毛病,但是本篇想要知道的是:
- 序列是如何创建的?它的结构是怎么样的?
- 序列是如何被观察者订阅的?
so...let's go !
众所周知,自Swift诞生以来,苹果爸爸就一直在推崇面向协议编程(POP) ,而RxSwift也是同样的,遵循了
从一个协议开始,而不是从一个类开始
。但是我并不想从协议讲起,因为虽然从协议讲起最具逻辑性,但是从文章的角度来说并不好理解和阅读。所以本文将以示例代码为切入点,自上而下的阅读,以求简单清晰易懂。
0x01 - Observable
-> Observable
在开篇的示例代码中,首先映入我们眼帘的是Observable
,Observable
调用了just
方法。其实Observable
是一个遵守ObservableType
的类,实现代码如下:
public class Observable<Element> : ObservableType {
/// Type of elements in sequence.
public typealias E = Element
init() {
#if TRACE_RESOURCES
let _ = Resources.incrementTotal()
#endif
}
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
rxAbstractMethod()
}
public func asObservable() -> Observable<E> {
return self
}
deinit {
#if TRACE_RESOURCES
let _ = Resources.decrementTotal()
#endif
}
// this is kind of ugly I know :(
// Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ¯\_(ツ)_/¯
/// Optimizations for map operator
internal func composeMap<R>(_ selector: @escaping (Element) throws -> R) -> Observable<R> {
return Map(source: self, transform: selector)
}
}
首先这是一个用Public
修饰的抽象类,它是直接面向RxSwift使用者的。在这个类当中,使用了E
的别名来充当序列值的泛型类型。在Init方法中我们可以看到一个Resource的结构体,顺便提一句,这是一个用来“追踪计数”RxSwift引用资源的,每当init
一个资源计数就+1,deinit
的时候就总数-1,以此来追踪全局的资源使用。
除此之外,我们还可以看到有两个方法:
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
public func asObservable() -> Observable<E>
这两个方法都在ObservableType
这个协议中有所定义,前者定义了Observable能够被订阅的行为,后者则是定义了可以将自身“转换”成“Observable实体”的功能。这里可能会有一些Confuse,其实前面已经提到过,单纯的Observable
类并不能作为序列被直接订阅使用,只有Observable
的实体子类才可以被实例化使用。
所以,我们也可以看到,subscribe
函数的实现也只是简单的fatalError
,并没有实际的逻辑操作:
/// Swift does not implement abstract methods. This method is used as a runtime check to ensure that methods which intended to be abstract (i.e., they should be implemented in subclasses) are not called directly on the superclass.
func rxAbstractMethod(file: StaticString = #file, line: UInt = #line) -> Swift.Never {
rxFatalError("Abstract method", file: file, line: line)
}
-> Observable - > ObservableType
现在我们再来看一下ObservableType
:
public protocol ObservableType : ObservableConvertibleType {
/// Type of elements in sequence.
associatedtype E
func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}
extension ObservableType {
/// Default implementation of converting `ObservableType` to `Observable`.
public func asObservable() -> Observable<E> {
// temporary workaround
//return Observable.create(subscribe: self.subscribe)
return Observable.create { o in
return self.subscribe(o)
}
}
}
这两个方法的作用前面已经讲过,在这里我们可以看到通过Protocol Extension
RxSwift为ObservableType
提供了默认的asObservable
的实现,那么ObservableConvertibleType
是一个什么协议呢,是不是有它从根源上定义了asObservable
方法呢?我们来看一下ObservableConvertibleType
的定义:
/// Type that can be converted to observable sequence (`Observer<E>`).
public protocol ObservableConvertibleType {
/// Type of elements in sequence.
associatedtype E
/// Converts `self` to `Observable` sequence.
///
/// - returns: Observable sequence that represents `self`.
func asObservable() -> Observable<E>
}
果然如此,那么至此为止,Observable
之前的协议继承体系我们已经明了,画成图大概是这样的:
很遗憾,至此为止我们并没有看到太多的实现逻辑,但是我们看到了一系列Observable
的Protocol
根基。那么具体的Observable
实体应该是怎么样的呢?我们从just
身上来找到答案。
在Observable+Creation.swift
文件中,我们找到了关于just
的定义:
Observable+Creation.swift
是一个Observable
的一个拓展(extension
),文件中我们可以看到很多关于构建Observable
实体的方法,诸如create
,empty
,never
等等,本篇以just
作为切入点,其实其他的公开的Creation函数也是类似的逻辑,所以不会一一介绍了。
public static func just(_ element: E) -> Observable<E> {
return Just(element: element)
}
我们可以看到,这是一个public
修饰的暴露在外的Observable
的静态方法,返回的也是Observable
类型。那么这个Just
是什么呢?
final class Just<Element> : Producer<Element> {
private let _element: Element
init(element: Element) {
_element = element
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
observer.on(.next(_element))
observer.on(.completed)
return Disposables.create()
}
}
我们可以看到,这是一个继承自Producer
的一个类,OK,我们先不去管这个Just
,先去看看Producer
是一个什么东西:
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
let disposer = SinkDisposer()
let sinkAndSubscription = run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod()
}
}
看到这里我们清楚了,Producer
是继承自Observable
的一个抽象类,结合前面的Just
,于是我们的图可以画成这样了:
在Observable
的子类Producer
我们可以看到该基类实现subscribe
的基础方法,这里用到了RxSwift当中的另外一个概念--Scheduler
,但是它不是本文的重点,我们将在接下来的文章里面去集中讨论它,这里只是做一些简单的解读(感觉给自己埋了坑)。
在Producer
中,subscribe
主要做了以下几件事情:
- 创建一个
SinkDisposer
。 - 判断是否需要
Scheduler
来进行切换线程的调用,如果需要那么就在指定的线程中操作。 - 调用
run
方法,将observer
和刚刚创建的SinkDisposer
作为入参,得到一个Sink
和Subscription
的一个元组。这里的Sink
和Subscription
都是遵守Disposable
的类。
4.SinkDisposer
对传入之前的Sink
和Subscription
执行setSinkAndSubscription
方法。
5.将执行完setSinkAndSubscription
方法的disposer
作为返回值返回。
这里的相关操作其实都容易理解,首先看看这个run
:
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod()
}
好,既然这是一个抽象方法,那么我们暂且先不去管它,今晚不是它的轮次(最近🐺杀玩多了)。除去这个方法,最让人疑惑的就是这个setSinkAndSubscription
方法,那么它的作用是什么呢?
我们先来谈一谈SinkDisposer
类,但是再谈它之前我们需要先知道它所遵守的协议。SinkDisposer
是一个遵守Cancelable
协议的类,那么这个Cancelable
是何方神圣呢?
这一切都要从Disposable
说起。
0X02-Dispose
Disposable
Disposable只是一个简单的协议,其中只有一个dispose
方法,定义了释放资源的统一行为。
/// Respresents a disposable resource.
public protocol Disposable {
/// Dispose resource.
func dispose()
}
OK,这个很简单Cancelable
呢?
/// Represents disposable resource with state tracking.
public protocol Cancelable : Disposable {
/// Was resource disposed.
var isDisposed: Bool { get }
}
没错,Cancelable
只是一个继承自Disposable
的一个协议,其中定义了一个Bool
类型的isDisposed
标识,用来标识是否该序列已经被释放。
SinkDisposer
OK,现在我们终于来到了SinkDisposer
类,先上源码:
fileprivate final class SinkDisposer: Cancelable {
fileprivate enum DisposeState: UInt32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
// Jeej, swift API consistency rules
fileprivate enum DisposeStateInt32: Int32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
private var _state: AtomicInt = 0
private var _sink: Disposable? = nil
private var _subscription: Disposable? = nil
var isDisposed: Bool {
return AtomicFlagSet(DisposeState.disposed.rawValue, &_state)
}
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
_sink = sink
_subscription = subscription
let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set")
}
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}
func dispose() {
let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
return
}
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
guard let sink = _sink else {
rxFatalError("Sink not set")
}
guard let subscription = _subscription else {
rxFatalError("Subscription not set")
}
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}
}
在这里我们看到的一些以Atomic
开头的方法都是在OSAtomic.h
中所定义的自动读取和更新指定值方法,详细的使用方法可以点这里的官方文档。这里使用的Atomic
方法是为了区分以下几种可能性:
case1 - 第一次执行
- 将
sink
和subscription
赋值给自身的私有变量。 - 通过
Atmoic
方法(也就是OSAtomicOr32OrigBarrier
)方法,将_state
的值更新为2,并且返回值previousState
为0。 -
previousState
和DisposeStateInt32. sinkAndSubscriptionSet.rawValue
做逻辑与
运算,得值为0所以不执行if里面的逻辑。 -
previousState
和DisposeStateInt32.disposed.rawValue
做逻辑与
运算,的值为0,所以也不执行if里面的逻辑。 - 结束。
case2 - 再次执行
1.由于没有执行过dispose
方法,所以自从第一次执行setSinkAndSubscription
之后,_state
的值一直为2。当执行previousState
和DisposeStateInt32.disposed.rawValue
的时候,的值为2,所以执行xFatalError("Sink and subscription were already set")
程序中止运行。
case3 - 先执行过dispose
,然后第一次执行
- 由于执行过
dispose
方法,所以_state
的值为1。 - 通过
Atmoic
方法(也就是OSAtomicOr32OrigBarrier
)方法,将_state
的值更新为2,并且返回值previousState
为1。 -
previousState
和DisposeStateInt32. sinkAndSubscriptionSet.rawValue
做逻辑与
运算,得值为0所以不执行if里面的逻辑。 -
previousState
和DisposeStateInt32.disposed.rawValue
做逻辑与
运算,的值为1,所以执行if内的操作,将sink
和subscription
分别执行dispose
操作,并且将两个私有变量置nil
,打破引用环。
我们可以看到,通过一个
_state
和OSAtomic
的方法,RxSwift非常优雅的解决了上述三种场景,非常值得借鉴。而本类中的dispose
方法其实也是类似的处理方法,来保证只有一次有效的dispose
操作,本文就不再赘述。
0X03 - Observer
接下来我们来讲讲RxSwift中的另外一个角色,Observer
(观察者),这次我们从观察者的基类ObserverBase
谈起:
ObserverBase是一个遵守了Disposable
和ObserverType
协议的一个抽象类,实现了on
和dispose
。值得注意的是,在ObserverBase中有一个私有变量:
private var _isStopped: AtomicInt = 0
_isStopped
是一个哨兵,用来标记所观察的序列是否已经停止了,那么什么时候需要标记为Stop呢?我们来看这段代码:
func on(_ event: Event<E>) {
switch event {
case .next:
if _isStopped == 0 {
onCore(event)
}
case .error, .completed:
if !AtomicCompareAndSwap(0, 1, &_isStopped) {
return
}
onCore(event)
}
}
这段代码做的事情很简单:
- 只要
_isStopped
为0,那么就允许“发射”.next
事件,也就是执行onCore
方法。 - 当第一次“发射”
.error
或者.completed
时,执行一次onCore
,并且将_isStopped
设为1。
因为所有的Observer类在事件发射的逻辑上面都相同,所以统一在ObserverBase中作了处理,这也是典型的OOP思想。老铁,没毛病~
值得一提的是,我们可以看到这里使用了一个AtomicCompareAndSwap
的方法,这个方法是做什么的呢?在Platform.Darwin.swift
中,我们可以看到关于这个方法的定义:
typealias AtomicInt = Int32
let AtomicCompareAndSwap = OSAtomicCompareAndSwap32Barrier
let AtomicIncrement = OSAtomicIncrement32Barrier
let AtomicDecrement = OSAtomicDecrement32Barrier
我们可以看到,AtomicCompareAndSwap其实就是OSAtomic
库中所定义的一个全局方法:
/*! @abstract Compare and swap for 32-bit values with barrier.
@discussion
This function compares the value in <code>__oldValue</code> to the value
in the memory location referenced by <code>__theValue</code>. If the values
match, this function stores the value from <code>__newValue</code> into
that memory location atomically.
This function is equivalent to {@link OSAtomicCompareAndSwap32}
except that it also introduces a barrier.
@result Returns TRUE on a match, FALSE otherwise.
*/
@available(iOS 2.0, *)
@available(iOS, deprecated: 10.0, message: "Use atomic_compare_exchange_strong() from <stdatomic.h> instead")
public func OSAtomicCompareAndSwap32Barrier(_ __oldValue: Int32, _ __newValue: Int32, _ __theValue: UnsafeMutablePointer<Int32>!) -> Bool
简单的来说,该方法传入三个参数:__oldValue
,__newValue
和__theValue
,前两个参数都是Int32类型的,后一个是UnsafeMutablePointer<Int32>
的可变指针。当__oldValue
的值和指针所指向的内存地址的变量的值相等时,返回true
否则为false
,于此同时,如果__newValue
和当前的值不相等,那么就赋值,使得__theValue
的值为新值。伪代码如下:
f (*pointer == oldvalue) {
*pointer = newvalue;
return 1;
} else {
return 0;
}
番外 - Memery barrier
为了达到最佳性能,编译器通常会对汇编基本的指令进行重新排序来尽可能保持处理器的指令流水线。作为优化的一部分,编译器有可能对访问主内存的指令,如果它认为这有可能产生不正确的数据时,将会对指令进行重新排序。不幸的是,靠编译器检测到所有可能内存依赖的操作几乎总是不太可能的。如果看似独立的变量实际上是相互影响,那么编译器优化有可能把这些变量更新位错误的顺序,导致潜在不不正确结果。
内存屏障(memory barrier)是一个使用来确保内存操作按照正确的顺序工作的非阻塞的同步工具。内存屏障的作用就像一个栅栏,迫使处理器来完成位于障碍前面的任何加载和存储操作,才允许它执行位于屏障之后的加载和存储操作。内存屏障同样使用来确保一个线程(但对另外一个线程可见)的内存操作总是按照预定的顺序完成。如果在这些地方缺少内存屏障有可能让其他线程看到看似不可能的结果。为了使用一个内存屏障,你只要在你代码里面需要的地方简单的调用OSMemoryBarrier函数。
匿名观察者
看完了ObserverBase
现在我们来看一下AnonymousObserver
:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
let _ = Resources.incrementTotal()
#endif
_eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return _eventHandler(event)
}
#if TRACE_RESOURCES
deinit {
let _ = Resources.decrementTotal()
}
#endif
}
我们可以看到,在这个匿名观察者中,它主要做的事情就是将基类ObserverBase
所没有实现的onCore
方法实现了,将观察者构造方法时传入的EventHandler
在onCore
方法中执行。这也就是观察者受到序列事件的动作。
订阅过程
在我们对Observable
、Observer
和Disposeable
有了一定的认知之后,我们可以来认识一下最为关键的一步,subscribe
也就是订阅。
在ObservableType+Extensions.swift
中我们可以看到相关的实现:
/**
Subscribes an event handler to an observable sequence.
- parameter on: Action to invoke for each event in the observable sequence.
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.subscribeSafe(observer)
}
所谓的subscribe
其实只是做了两个事情。首先是构造了一个匿名观察者,将on
也就是(Event<E>) -> Void
类型的闭包作为参数,每次在匿名观察者有新的事件的时候调用,这里也用到了尾随闭包的语法糖,提高阅读性。其次,将刚刚构造的匿名观察者,通过subscribeSafe
函数来完成订阅。那么subscribeSafe
究竟做了一些什么事情呢?
extension ObservableType {
func subscribeSafe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
return self.asObservable().subscribe(observer)
}
}
subscribeSafe
是一个内部的方法,所有内部的订阅操作全部通过该方法来完成,一般最后都是通过subscribe
方法的多态性来完成最终的订阅,那么回想一下之前�Just
的subscribe
方法我们就可以知道,一旦调用subscribe
方法,Just
立刻给匿名观察者发送一个包裹了初始值的.next
事件和一个.completed
事件,最后返回一个NopDisposable
类型的“存根”,NopDisposable
是一个在执行dispose
操作时不进行任何操作的存根。然后整个订阅过程就结束了。
DisposeBag
对于开头的代码,我们现在唯一还没有讲到的就是addDisposableTo
这个方法,我们都知道,当一个序列执行subscribe
之后我们会得到一个遵守Disposable
的存根。那么根据方法名,我们也可以猜到这个一个将存根添加到一个地方的方法,那么它是要将存根添加到哪里呢?
没错!就是我们天天在写的DisposeBag
。在DisposeBag.swift
中我们可以找到该方法的定义:
extension Disposable {
/// Adds `self` to `bag`.
///
/// - parameter bag: `DisposeBag` to add `self` to.
public func addDisposableTo(_ bag: DisposeBag) {
bag.insert(self)
}
}
那么DisposeBag
到底是个什么东西呢?talk is cheap
,我们直接来看源码:
public final class DisposeBag: DisposeBase {
private var _lock = SpinLock()
// state
private var _disposables = [Disposable]()
private var _isDisposed = false
/// Constructs new empty dispose bag.
public override init() {
super.init()
}
/// Adds `disposable` to be disposed when dispose bag is being deinited.
///
/// - parameter disposable: Disposable to add.
public func insert(_ disposable: Disposable) {
_insert(disposable)?.dispose()
}
private func _insert(_ disposable: Disposable) -> Disposable? {
_lock.lock(); defer { _lock.unlock() }
if _isDisposed {
return disposable
}
_disposables.append(disposable)
return nil
}
/// This is internal on purpose, take a look at `CompositeDisposable` instead.
private func dispose() {
let oldDisposables = _dispose()
for disposable in oldDisposables {
disposable.dispose()
}
}
private func _dispose() -> [Disposable] {
_lock.lock(); defer { _lock.unlock() }
let disposables = _disposables
_disposables.removeAll(keepingCapacity: false)
_isDisposed = true
return disposables
}
deinit {
dispose()
}
}
其实DisposeBag
这个类设计的还是非常的简单明了的,暴露给外部的只有一个insert
方法,将需要被管理的Dispose
交给这个Bag
,当该Bag
执行deinit
方法的时候执行dispose
,将所持有的所有Disposable
遍历一遍,同时挨个dispose
,值得注意的是,该类内部使用了一个锁:
private var _lock = SpinLock()
这个SpinLock
其实就是一个NSRecursiveLock
的递归锁,该🔐的作用就是为了保证_disposables
的数组线程安全,之所以用递归锁是因为有可能会出现在相同的线程多次调用insert
的而引发死锁。
正常情况下,执行insert
方法,首先会执行加锁操作,然后Bag
会将该Disposable
加入到_disposables
这个数组中,最后解锁。但是还有一种情况,那就是当执行insert
操作的时候,该Bag
已经被析构了,那么我们就不需要再将其加入数组,直接将该Disposable
释放掉就可以了。
QA
分析了上述的源码之后,我想开篇的两个问题我们也可以大致回答了:
1.序列是如何创建的?它的结构是怎么样的?
其实无论是通过just
方法构建的序列还是create
、empty
或者of
方法构建的序列,最终得到的都是一个Producer
的子类,只是不同的方法所构建的序列行为不一样,所以要通过子类的方法重写来实现多态,这是典型的OOP
思想这里也就不多解释了。至于它的结构也是取决该序列的特性,比如本文中的Just
,由于它是一个单一序列,只会有第一次的初始值,所以在Just
类中直接定义了一个私有的存储类型的变量来存储初始值,当完成订阅操作的时候,直接将该值通过.next
事件发送出去,然后再将.completed
事件发送,完成整个序列的生命周期。再比如通过never
构建的EmptyProducer
类,由于该序列需要做的只是永远不发送.next
事件,所以EmptyProducer
没有任何私有变量必要,他要做的只是在完成订阅的时候发送一个.completed
事件。由于Producer
的子类太多了,篇幅有限就不在这里一一列举。
2.序列是如何被观察者订阅的?
当一个序列构造完毕的之后,调用subscribe
方法会进行SubscribeHandler
,也就是进行订阅的相关操作。具体来说,对于Just
这个序列,SubscribeHandler
指的是就是发送一个.next(element)
事件和一个.completed
事件;对于NeverProducer
这个序列,SubscribeHandler
指的是单单只发送一个.completed
事件;所以对于不同的SubscribeHandler
会有不同的订阅操作,总的来说是根据序列的特性来发送给观察者不同的事件流。
值得一提的是在RxSwift中还有一个很重要的概念Sink
,关于它的解释可以参考一下这个issue, Sink
相当与一个加工者,可以将源事件流转换成一个新的事件流,如果讲事件流比作水流,事件的传递过程比作水管,那么Sink
就相当于水管中的一个转换头。关于Sink
我们会在之后的文章中详细的讲述。