RxSwift源码解析

Rx 构造操作符分类

2018-09-28  本文已影响13人  狼性刀锋

前言

这里的分类指的是按照实现原理分类,而不是按照功能进行分类,针对每一个分类选择一个具体类型,进行分析

Override subscribe operator

// class Just
    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
    // 无需经过Sink,直接发出信号
        observer.on(.next(_element))
        observer.on(.completed)
        return Disposables.create()
    }



Recursive scheduling operator

// ObservableSequenceSink Class
      return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in

            var mutableIterator = iterator

            if let next = mutableIterator.0.next() {
                print("scheduleRecursive: \(next)")
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }

scheduleRecursive: 线程递归调度方法
let next = mutableIterator.0.next(): 迭代器迭代元素

create

这个之前详细讲过,创建一个AnonymousObservable 作为承载闭包的实体

    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }

do

这个很重要,重点讲一下

// simple Example
 Observable.of("🍎", "🍐", "🍊", "🍋")
        .do(onNext: { print("Intercepted:", $0) }, onError: { print("Intercepted error:", $0) }, onCompleted: { print("Completed")  })
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)

do这个操作符是专门用来处理副作用的,什么是副作用呢,打个简单的比方实现: 1 + 2 + 3 + 4 + 5, 我要在+3的时候,做一个额外的操作,我要改变一下ui的背景色,但是这一步对最终的结果没有任何影响,就可以使用do操作符,这么做的好处能够提高代码可读性同时也良好体现了函数的单一性职责。好了现在看看实现原理。

// Do Class
    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        _onSubscribe?()
        let sink = DoSink(eventHandler: _eventHandler, observer: observer, cancel: cancel)
        let subscription = _source.subscribe(sink)
        _onSubscribed?()
        let onDispose = _onDispose
        let allSubscriptions = Disposables.create {
            subscription.dispose()
            onDispose?()
        }
        return (sink: sink, subscription: allSubscriptions)
    }

这里的_source即 original Observable,而这里的sink 指的就是DoSink

// DoSink Class
    func on(_ event: Event<Element>) {
        do {
            try _eventHandler(event)
            forwardOn(event)
            if event.isStopEvent {
                dispose()
            }
        }
        catch let error {
            forwardOn(.error(error))
            dispose()
        }
    }

DoSink 会在执行on事件的时候执行_eventHandler,也就是最开始用户传进来的那个闭包。

Rx最核心的是什么就是响应式的编程, A -> B -> C -> D -> E,A事件的发生最终导致E事件的发生。那么本来相互孤立的事件如何建立联系呢? 答案就在 let subscription = _source.subscribe(sink) 这一句, 通过subscribe将本来孤立的事情紧密的联系在一起,并且Rx隐藏了所以的细节,用户无需为此做大量的额外操作就能获得该功能。 每一个Observer 都不需要了解具体有多少个Observable,它只需要上个Observable是谁就可以了。整个事件看起来是这样:E subscribe D subscribe C subscribe B subscribe A,但是细分一点其实是这样:

E subscribe D 
              D subscribe C
                             C subscribe B 
                                            B subscribe A

不管整个事件流到底有多长,其核心构建就是 B和A,用算法归纳就是如下:

var e
while(e.hasObservable) {
  e.observer(e.Observable)
  e = e.Observable
}

这个很像单车的链条,不管链条多长,其最小的组成单元都是一个小链,有头尾两端,可以与其他的链在一起。

Deferred

推迟执行,它只在被订阅的时候才去创建Observable,而create是在一开始就创建ObservableDeferred 每次被订阅都会创建一个新的Observable,而create被多次订阅都是同一个Observable

  let disposeBag = DisposeBag()
    var count = 1
    
    let deferredSequence = Observable<String>.deferred {
        print("Creating \(count)")
        count += 1
        
        return Observable.create { observer in
            print("Emitting...")
            observer.onNext("🐶")
            observer.onNext("🐱")
            observer.onNext("🐵")
            return Disposables.create()
        }
    }
    
    deferredSequence
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)
    
    deferredSequence
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)

// output log 
Creating 1
Emitting...
🐶
🐱
🐵
Creating 2
Emitting...
🐶
🐱
🐵


由于每次创建都是通过闭包创建新的Observable,而闭包捕获的值count在更新,所以两次运行结果不一样

实现原理

// DeferredSink Class
    func run() -> Disposable {
        do {
            let result = try _observableFactory()
            return result.subscribe(self)
        }
        catch let e {
            forwardOn(.error(e))
            dispose()
            return Disposables.create()
        }
    }

这里可以看到每次订阅的时候都会通过_observableFactory产生新的Observable

上一篇 下一篇

猜你喜欢

热点阅读