windowTimeCount运算

2016-03-12  本文已影响34人  幸运的小强本人

windowTimeCount运算的含义如下图所示:

屏幕快照 2016-03-12 下午9.14.26.png
extension: ObservableType {
    public func window(timeSpa timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)->Observable<Observable<E>> {
        return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
    }
}

class WindowTimeCount<Element>: Producer<Observable<Element>> {
    private let _timeSpan: RxTimeInterval
    private let _count: Int
    private let _scheduler: SchedulerType
    private let _source: Observable<Element>

    init(source: Observable<Element>, timeSpan:RxTimeInterval, count: Int, scheduler: SchedulerType) {
        _source = source
        _timeSpan = timeSpan
        _count = count
        _scheduler = scheduler
    }

    override func run<O: ObserverType where O.E == Observable<Element>(observer: O)->Disposable {
        let sink = WindowTimeCountSink(parent: self, observer: observer)
        sink.disposable = sink.run()
        return sink
    }
}

class WindowTimeCountSink<Element, O: ObserverType where O.E == Observable<Element>>: Sink<O>, ObserverType, LockOwnerType, SynchronizedOnType {
    typealias Parent = WindowTimeCount<Element>
    typealias E = Element

    private let _parent: Parent

    let _lock = NSREcursiveLock()

    private var _subject = PublishSubject<Element>()
    private var _count = 0
    private var _windowId = 0

    private let _timerD = SerialDisposable()
    private let _refCountDisposable: RefCountDisposable
    private let _groupDisposable = CompositeDisposable()

    init(parent: parent, observer: O) {
        _parent = parent
        _groupDisposable.addDisposable(_timerD)
        _refCountDisposable = RefCountDisposable(disposable: _groupDisposable)
        super.init(observer: observer)
    }

    func run() -> Disposable {
        forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
        createTimer(_windowId)
    
        _groupDisposable.addDisposable(_parent._source.subscribeSafe(self))
        return _refCountDisposable
    }

    func startNewWindowAndCompleteCurrentOne() {
        _subject.on(.Completed)
        _subject = PublishSubject<Element>()
    
        forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
    }

    func on(event: Event<E>) {
        synchronizedOn(event)
    }

    func _synchronized_on(event: Event<E>) {
        var newWindow = false
        var newId = 0
    
        switch event {
        case .Next(let element):
            _subject.on(.Next(element))
        
            do {
                try incrementChecked(&_count)
            } catch (let e) {
                _subject.on(.Error(e as ErrorType))
                dispose()
            }
        
            if (_count == _parent._count) {
                newWindow = true
                _count = 0
                _windowId += 1
                newId = _windowId
                self.startNewWindowAndCompleteCurrentOne()
            }
        
        case .Error(let error):
            _subject.on(.Error(error))
            forwardOn(.Error(error))
            dispose()
        case .Completed:
            _subject.on(.Completed)
            forwardOn(.Completed)
            dispose()
        }

        if newWindow {
            createTimer(newId)
        }
    }

    func createTimer(windowId: Int) {
        if _timerD.disposed {
            return
        }
    
        if _windowId != windowId {
            return
        }

        let nextTimer = SingleAssignmentDisposable()

        _timerD.disposable = nextTimer

        nextTimer.disposable = _parent._scheduler.scheduleRelative(windowId, dueTime: _parent._timeSpan) { previousWindowId in
        
        var newId = 0
        
        self._lock.performLocked {
            if previousWindowId != self._windowId {
                return
            }
            
            self._count = 0
            self._windowId = self._windowId &+ 1
            newId = self._windowId
            self.startNewWindowAndCompleteCurrentOne()
        }
        
        self.createTimer(newId)
        
        return NopDisposable.instance
    }
}
}
上一篇 下一篇

猜你喜欢

热点阅读