RxSwift源码解析

RxSwift Combination Operators of

2018-10-13  本文已影响33人  狼性刀锋

使用示例

example("zip") {
    let disposeBag = DisposeBag()
    
    let stringSubject = PublishSubject<String>()
    let intSubject = PublishSubject<Int>()
    
    Observable.zip(stringSubject, intSubject) { stringElement, intElement in
        "\(stringElement) \(intElement)"
        }
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)
    
    stringSubject.onNext("🅰️")
    stringSubject.onNext("🅱️")
    
    intSubject.onNext(1)
    
    intSubject.onNext(2)
    
    stringSubject.onNext("🆎")
    intSubject.onNext(3)
}

// out put log 

--- zip example ---
🅰️ 1
🅱️ 2
🆎 3

实现原理

Zip 有着一系列类簇,从Zip2 - Zip8 ,实现原理都是一样的区别在于Observable 数量。所以这里只重点关注下Zip2的实现

final class Zip2<E1, E2, R> : Producer<R> {
    typealias ResultSelector = (E1, E2) throws -> R

    let source1: Observable<E1>
    let source2: Observable<E2>

    let _resultSelector: ResultSelector

    init(source1: Observable<E1>, source2: Observable<E2>, resultSelector: @escaping ResultSelector) {
        self.source1 = source1
        self.source2 = source2

        _resultSelector = resultSelector
    }

    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == R {
        let sink = ZipSink2_(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

和其他操作符一样,仍然通过Sink实现核心功能

// ZipSink2_.run

    var _values1: Queue<E1> = Queue(capacity: 2)
    var _values2: Queue<E2> = Queue(capacity: 2)
    
    func run() -> Disposable {
        let subscription1 = SingleAssignmentDisposable()
        let subscription2 = SingleAssignmentDisposable()

        let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
        let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)

        subscription1.setDisposable(_parent.source1.subscribe(observer1))
        subscription2.setDisposable(_parent.source2.subscribe(observer2))

        return Disposables.create([
           subscription1,
           subscription2
        ])
    }

ZipSink2_ 创建两个ZipObserver, 分别订阅observerable1, observerable2

// ZipObserver._synchronized_on
    func _synchronized_on(_ event: Event<E>) {
        if let _ = _parent {
            switch event {
            case .next(_):
                break
            case .error(_):
                _this.dispose()
            case .completed:
                _this.dispose()
            }
        }
        
        if let parent = _parent {
            switch event {
            case .next(let value):
                _setNextValue(value)
                parent.next(_index)
            case .error(let error):
                parent.fail(error)
            case .completed:
                parent.done(_index)
            }
        }
    }

Merge操作符原理一样, ZipObserver在接收事件后,会将事件传递给ZipSink2_。在next事件的时候会调用_setNextValue, 这会触发self._values1.enqueue($0)操作,即数据入列操作

// ZipSink.next
 func next(_ index: Int) {
        var hasValueAll = true
        
        for i in 0 ..< _arity {
            if !hasElements(i) {
                hasValueAll = false
                break
            }
        }
        
        if hasValueAll {
            do {
                let result = try getResult()
                self.forwardOn(.next(result))
            }
            catch let e {
                self.forwardOn(.error(e))
                dispose()
            }
        }
        else {
            var allOthersDone = true
            
            let arity = _isDone.count
            for i in 0 ..< arity {
                if i != index && !_isDone[i] {
                    allOthersDone = false
                    break
                }
            }
            
            if allOthersDone {
                forwardOn(.completed)
                self.dispose()
            }
        }
    }

函数分为3个逻辑块:

  1. 检测是否都有值,_arity 是被ZipObservable数量,在本例子中等于2
// ZipSink2_. hasElements
    override func hasElements(_ index: Int) -> Bool {
        switch (index) {
        case 0: return _values1.count > 0
        case 1: return _values2.count > 0

        default:
            rxFatalError("Unhandled case (Function)")
        }

        return false
    }

  1. 如果都有值便发送forwardOn next
// ZipSink2_.getResult
    override func getResult() throws -> R {
        return try _parent._resultSelector(_values1.dequeue()!, _values2.dequeue()!)
    }

这里要注意的是getResult, 会触发数据的出列操作,也就是说getResult
_values1.count = 1, _values2.count = 1

之后
_values1.count = 0, _values2.count = 0

  1. 检测是否有Observable 已经completed了,如果是的话,发送 forwardOn completed
    简单验证一下
example("zip") {
    let disposeBag = DisposeBag()
    
    let stringSubject = PublishSubject<String>()
    let intSubject = PublishSubject<Int>()
    
    Observable.zip(stringSubject, intSubject) { stringElement, intElement in
        "\(stringElement) \(intElement)"
        }
        .debug()
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)
    
    stringSubject.onNext("🅰️")
    
    
    stringSubject.onNext("🅱️")
    
  //  stringSubject.onNext("🆎")
    
    stringSubject.onCompleted()
    
    intSubject.onNext(1)
    
    intSubject.onNext(2)
    
    intSubject.onNext(3)
    
    
    
}

// out put log 
--- zip example ---
2018-10-13 11:27:37.532: Rx.playground:73 (__lldb_expr_7) -> subscribed
2018-10-13 11:27:37.536: Rx.playground:73 (__lldb_expr_7) -> Event next(🅰️ 1)
🅰️ 1
2018-10-13 11:27:37.537: Rx.playground:73 (__lldb_expr_7) -> Event next(🅱️ 2)
🅱️ 2
2018-10-13 11:27:37.539: Rx.playground:73 (__lldb_expr_7) -> Event completed
2018-10-13 11:27:37.539: Rx.playground:73 (__lldb_expr_7) -> isDisposed


这里 如果去掉 stringSubject.onCompleted() 或者 intSubject.onNext(3) 的话,都不会收到completed 事件

ZipSinkZipSink2_ 的父类

上一篇下一篇

猜你喜欢

热点阅读