filter运算

2016-03-13  本文已影响24人  幸运的小强本人

说明:
Filters the elements of an observable sequence based on a predicate

filter

代码如下:

extension ObservableType {
    public func filter(predicate: (E) throws ->Bool)->Observable<E> {
        return Filter(source: asObservable(), predicate: predicate)
    }
}

class Filter<Element>: Producer<Element> {
    typealias Predicate = (Element) throws->Bool

    private let _source: Observable<Element>
    private let _predicate: Predicate

    init(source: Observable<Element>, predicate: Predicate) {
        _source = source
        _predicate = predicate
    }

    override func run<O: ObserverType where O.E == Element>(observer: O)->Disposable {
        let sink = FilterSink(predicate: _predicate, observer: observer)
        sink.disposable = _source.subscribe(sink)
        return sink
    }
}

class FilterSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias Predicate = (Element) throws ->Bool
    typealias Element = O.E

    typealias Parent = Filter<Element>

    private let _predicate: Predicate

    init(predicate: Predicate, observer: O) {
        _predicate = predicate
        super.init(observer: observer)
    }

    func on(event: Event<Element>) {
        switch event {
        case .Next(let value):
            do {
                let satisfies = try _predicate(value)
                if satisfies {
                    forwardOn(.Next(value))
                }
            }catch let e {
                forwardOn(.Error(e))
                dispose()
            }
        case .Completed, .Error:
            forwardOn(event)
            dispose()
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读