RxSwift高阶函数skip解读

2019-07-30  本文已影响0人  silasjs

RxSwift高阶函数skip解读

skip

skip的作用:跳过 Observable 中头 n 个元素,只关注后面的元素。

skip

skip的简单使用:

Observable.of(1, 2, 3, 4, 5, 6)
        .skip(2)
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)

跳过前n个,输出剩余的元素:3 4 5 6

虽然这里主要是研究skip函数,但是调用者是of函数的返回值,所以of函数也不能省掉。

先看of函数的实现:

public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
    return ObservableSequence(elements: elements, scheduler: scheduler)
}

这里只是传入了几个元素和默认的调度者,创建了一个ObservableSequence序列的实例,它就是skip的调用者。

skip函数的实现:

public func skip(_ count: Int) -> Observable<Element> {
    return SkipCount(source: self.asObservable(), count: count)
}

SkipCount保存了调用者ObservableSequence序列和需要跳过的count

final private class SkipCount<Element>: Producer<Element> {
    let source: Observable<Element>
    let count: Int
    
    init(source: Observable<Element>, count: Int) {
        self.source = source
        self.count = count
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = SkipCountSink(parent: self, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)

        return (sink: sink, subscription: subscription)
    }
}

然后就开始了subscribe()订阅信号。

此时,SkipCount就会调用父类Producer中的subscribe函数,这个函数已经是老朋友了,它会根据所在的线程分别调用子类的run,并创建一个销毁者SinkDisposer,把订阅信号时创建的匿名观察者和这个销毁者SinkDisposer一起通过run函数来传参过去。然后在子类SkipCountrun中让源序列(self.source == ObservableSequence)去订阅信号,并把
携带了匿名观察者SkipCountSink传了过去。

下一步动动脚指头也知道,轮到ObservableSequence去父类的subscribe请安了,都是亲兄弟,也必然会像SkipCount一样回到run函数中:

final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

ObservableSequence中的run函数的实现就和RxSwift核心逻辑中的比较吻合了。都是创建了一个Sink的子类ObservableSequenceSink,然后调用run需要注意的是:ObservableSequenceSink初始化用的observerSkipCountSink

ObservableSequenceSinkrun

final private class ObservableSequenceSink<Sequence: Swift.Sequence, Observer: ObserverType>: Sink<Observer> where Sequence.Element == Observer.Element {
    typealias Parent = ObservableSequence<Sequence>

    func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
            var mutableIterator = iterator
            if let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
}

这里_parent的调度者会带你去它家一顿唠嗑,最后还是会递归回调action闭包。这闭包里的代码不难看出,会去父类SinkforwardOn,然后就是Sink._observer.on(event),这个_observer不就是我们刚刚重点加粗的那个SkipCountSink么!

skip的套路近在眼前:

final private class SkipCountSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Element = Observer.Element 
    typealias Parent = SkipCount<Element>
    
    let parent: Parent
    var remaining: Int
    
    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self.parent = parent
        self.remaining = parent.count
        super.init(observer: observer, cancel: cancel)
    }
    
    func on(_ event: Event<Element>) {
        switch event {
        case .next(let value):
            
            if self.remaining <= 0 {
                self.forwardOn(.next(value))
            }
            else {
                self.remaining -= 1
            }
        case .error:
            self.forwardOn(event)
            self.dispose()
        case .completed:
            self.forwardOn(event)
            self.dispose()
        }
    }
}

soga,原来我们skip(n)几次,这里就在else中就放空几次。剩余的会正常self._observer.on(event),去回调subscribe中的闭包。

skipWhile

skipWhile的作用:跳过 Observable 中头几个元素,直到元素的判定为否

skipWhile

示例:

Observable.of(1, 2, 3, 4, 5, 6)
        .skipWhile { $0 < 4 }
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)
        
跳过满足{$0 < 4}条件的元素,输出剩余的元素:4 5 6

同样的套路:

final private class SkipWhileSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Element = Observer.Element 
    typealias Parent = SkipWhile<Element>

    fileprivate let _parent: Parent
    fileprivate var _running = false

    func on(_ event: Event<Element>) {
        switch event {
        case .next(let value):
            if !self._running {
                do {
                    self._running = try !self._parent._predicate(value)
                } catch let e {
                    self.forwardOn(.error(e))
                    self.dispose()
                    return
                }
            }

            if self._running {
                self.forwardOn(.next(value))
            }
        case .error, .completed:
            self.forwardOn(event)
            self.dispose()
        }
    }
}

关键部分还是on函数中的实现,.next下,用SkipWhile保存的闭包_predicate来判断当前元素是否满足条件。直到self._running标记为true后才会执行forwardOn去响应订阅。

skipUntil

这部分加上就有些长,另外写了个RxSwift高阶函数skipUntil解读

上一篇下一篇

猜你喜欢

热点阅读