elementAt运算

2016-03-13  本文已影响40人  幸运的小强本人

说明:
Returns a sequence emitting only item _n_ emitted by an observable

图片说明如下:


elementAt
extension ObservableType {
    public func elementAt(index: Int)->Observable<E> {
        return ElementAt(source: asObservable(), index: index, throwOnEmpty: true)
    }
}

class ElementAt<SourceType>: Producer<SourceType> {
    let _source: Observable<SourceType>
    let _throwOnEmpty: Bool
    let _index: Int

    init(source: Observable<SourceType>, index: Int, throwOnEmpty: Bool) {
        if index < 0 {
            rxFatalError("index can't be negative")
        }
        self._source = source
        self._index = index
        self._throwOnEmpty = throwOnEmpty
    }

    override func run<O: ObserverType where O.E == SourceType>(observer: O)->Disposable {
        let sink = ElementAtSink(parent: self, observer: observer)
        sink.disposable = _source.subscribeSafe(sink)
        return sink
    }
}

class ElementAtSink<SourceType, O: ObserverType where O.E == SourceType>: Sink<O>, ObserverType {
    typealias Parent = ElementAt<SourceType>

    let _parent: parent
    var _i: Int

    init(parent: Parent, observer: O) {
        _parent = parent
        _i = parent._index

        super.init(observer: observer)
    }

    func on(event: Event<SourceType>) {
        switch event {
        case .Next(_):
            if _i == 0 {
                forwardOn(event)
                forwardOn(.Completed)
                self.dispose()
            }

            do {
                try decrementChecked(&_i)
            }catch (let e) {
                forwardOn(.Error(e))
                dispose()
                return
            }
        case .Error(let e):
            forwardOn(.Error(e))
            self.dispose()
        case .Completed:
            if(_parent._throwOnEmpty) {
                forwardOn(.Error(RxError.ArgumentOutOfRange))
            }else {
                forwardOn(.Completed)
            }
            self.dispose()
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读