RxSwift源码解析

Rx ObservableSequence 递归调度

2018-09-25  本文已影响13人  狼性刀锋

        DispatchQueue.global().async {
            assert(!Thread.current.isMainThread)
            Observable.from(["🐶", "🐱", "🐭", "🐹"], scheduler: MainScheduler.instance)
                .subscribe { event in
                    assert(Thread.current.isMainThread)
                    print(event)
            }
        }


先看下简单的例子:
上面例子的订阅事件会被调度到主线程中去,那么具体是怎么调度,以及细节是什么样的呢?


  public static func from(_ array: [E], scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
        return ObservableSequence(elements: array, scheduler: scheduler)
    }


// ObservableSequenceSink run
    func run() -> Disposable {
        
        return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in

            var mutableIterator = iterator

            if let next = mutableIterator.0.next() {
                print("scheduleRecursive: \(next)")
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }


轻车熟路的找到入口位置

// scheduleRecursive method
    public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> ()) -> ()) -> Disposable {
        let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
        
        recursiveScheduler.schedule(state)
        
        return Disposables.create(with: recursiveScheduler.dispose)
    }


这个scheduleRecursive的入参非常复杂,要看懂它的结合上下文看,从方法名来看可以理解为递归调度,暂且先了解到这,继续看。

    func schedule(_ state: State) {
        
        
        var scheduleState: ScheduleState = .initial

        let d = _scheduler.schedule(state) { (state) -> Disposable in
            // best effort
            if self._group.isDisposed {
                return Disposables.create()
            }
            
            let action = self._lock.calculateLocked { () -> Action? in
                switch scheduleState {
                case let .added(removeKey):
                    self._group.remove(for: removeKey)
                case .initial:
                    break
                case .done:
                    break
                }

                scheduleState = .done

                return self._action
            }
            
            if let action = action {
                action(state, self.schedule)
            }
            
            return Disposables.create()
        }
 
                
        
        _lock.performLocked {
            switch scheduleState {
            case .added:
                rxFatalError("Invalid state")
                break
            case .initial:
                if let removeKey = _group.insert(d) {
                    scheduleState = .added(removeKey)
                }
                else {
                    scheduleState = .done
                }
                break
            case .done:
                break
            }
        }
    }


暂且不管它的生命周期和线程安全 ,去掉这些细节代码就变成如下模样:


    func schedule(_ state: State) {
        
        let d = _scheduler.schedule(state) { (state) -> Disposable in
            // best effort
            if self._group.isDisposed {
                return Disposables.create()
            }
 
            action(state, self.schedule)

            return Disposables.create()
        }
 
        

    }


去掉这些边角料代码就清晰多了, schedule(state) { } 这个是一个线程调度,确保closure在指定的线程运行,详情参见:CurrentThreadScheduler

去掉这个线程调度,再来看一下:


    func schedule(_ state: State) {
         
        action(state, self.schedule)

    }


再结合着看一下:

// 代码片段1

    func run() -> Disposable {
        
        return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in

            var mutableIterator = iterator

            if let next = mutableIterator.0.next() {
                print("scheduleRecursive: \(next)")
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
    
    
// 代码片段2

 recursiveScheduler.schedule(state)
 
// 代码片段3
     func schedule(_ state: State) {
         
        action(state, self.schedule)

    }



----> 表示触发的话,那么就是:
ObservableSequenceSink.run ----> recursiveScheduler.schedule ----> action, run函数的执行最终会触发action闭包的执行。好了现在来分析一下:
action(state, self.schedule), 将对象自身schedule函数作为入参传入闭包,那么action闭包干了啥呢? 首先使用迭代器迭代下一个元素,如果不为空的话调用recurse(mutableIterator), 也就是 recursiveScheduler.schedule方法,recursiveScheduler.schedule又会调用action,如此往复直到遍历所有元素。对迭代器不太理解的可以参考下面这个简单的例子:

         let numbers = [2, 3, 5, 7]
         var numbersIterator = numbers.makeIterator()
   
         while let num = numbersIterator.next() {
             print(num)
         }
         // Prints "2"
         // Prints "3"
         // Prints "5"
         // Prints "7"


再来复习一下递归,什么时候可以使用递归:

  1. 一个问题总可以分解为若干个规模更小的相似的问题
  2. 最小的子问题可以直接求解

显然序列的遍历问题满足这个条件,对一个序列的遍历总可以分解为:

回头再分析下 scheduleRecursive 方法

    public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> ()) -> ()) -> Disposable

了解这些我们可以写个非递归的run方法,看起来如下:


        return _parent._scheduler.schedule(_parent._elements.makeIterator(), action: { iterator  -> Disposable in
            var mutableIterator = iterator
            
            while let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                 print("unScheduleRecursive: \(next)")
            }
            
            self.forwardOn(.completed)
            self.dispose()
            return Disposables.create()
        })


你看多好啊,线程调度也实现了,而且简洁明了为什么非要用递归调度呢? 我一次调度就完事了。我想了很久终于想到了, 想象一个应用场景假设这个序列元素非常多,就比如有100万个,当迭代器迭代到第100个的时候突然客户说后面的我都不要了,那你怎么让这个迭代器停止避免不必要的运算呢?为了验证这个想法了写了如下代码:

         let range = 0..<100000
        let source = range.map {"\($0)" }
         fromArrayDisposables =  Observable.from(source)
            .elementAt(1000)
            .subscribe(onNext: {[weak self] element in
                print(element)       
            })


然后我在run函数加了一日志语句, 果然迭代器迭代到1000的时候,就停止了不再迭代,换上我自己的写的run函数, 迭代器一口气迭代完了100000个元素,这里不讨论elementAt具体是怎么实现的,只是想说明这个递归调度不是为了炫耀技术是,而是出于可扩展性需要而实现的。

线程安全和生命周期管理


  func schedule(_ state: State) {
        
        //FIX ME
   // ConcurrentMainScheduler
        var scheduleState: ScheduleState = .initial

        let d = _scheduler.schedule(state) { (state) -> Disposable in
            // best effort
            if self._group.isDisposed {
                return Disposables.create()
            }
            
            let action = self._lock.calculateLocked { () -> Action? in
                switch scheduleState {
                case let .added(removeKey):
                    self._group.remove(for: removeKey)
                case .initial:
                    break
                case .done:
                    break
                }

                scheduleState = .done

                return self._action
            }
            
            
            
            if let action = action {
                action(state, self.schedule)
            }
            
            return Disposables.create()
        }
 

        
        
        _lock.performLocked {
            switch scheduleState {
            case .added:
                rxFatalError("Invalid state")
                break
            case .initial:
                if let removeKey = _group.insert(d) {
                    scheduleState = .added(removeKey)
                }
                else {
                    scheduleState = .done
                }
                break
            case .done:
                break
            }
        }
    }


这里主要是要保证scheduleState的线程安全, schedule {}的代码块不一定立马执行,如果调度的线程正忙的时候,只会简单的将closure存储起来。


//代码块1
            let action = self._lock.calculateLocked { () -> Action? in
                switch scheduleState {
                case let .added(removeKey):
                    self._group.remove(for: removeKey)
                case .initial:
                    break
                case .done:
                    break
                }

                scheduleState = .done

                return self._action
            }



// 代码块2
    _lock.performLocked {
            switch scheduleState {
            case .added:
                rxFatalError("Invalid state")
                break
            case .initial:
                if let removeKey = _group.insert(d) {
                    scheduleState = .added(removeKey)
                }
                else {
                    scheduleState = .done
                }
                break
            case .done:
                break
            }
        }

也就是说这两个代码块不一定按照顺序执行,而这两块代码都涉及到scheduleState的状态,所以必须加锁。 先假设按12执行,那么皆大欢喜。

这两个case什么都没干

那么按21执行呢?

那么先将Disposable 加入到 _group中进行管理,然后在闭包执行完毕之后,执行dispose,这个主要是为了管理其生命周期.如果没有这个dispose操作,有可能造成内存泄漏

Simple Demo


/// 实现阶加, 比如 输入5的话 ,就是 1 + 2 + 3 + 4 + 5

typealias Action =  (_ state: Int, _ recurse: (Int) -> Void) -> Void


struct RecursiveAdder {
    
    // 最简单的递归,递归的缺陷在于容易造成爆栈,当然这种递归也是尾递归,很多编译器能够对其进行优化
    static func recursiveAdd(_ number: Int, result: Int = 0) -> Int {
        
        if number <= 0 {
            return result
        } else {
            let updateResult = result + number
            return recursiveAdd(number - 1, result: updateResult)
        }
    }
    
    // 非递归
    static func unRecursiveAdd(_ number: Int) -> Int {
        var m = number
        var result = 0
        while m > 0 {
            result = addNumber(m, number2: result)
            m -= 1
        }
        return result
    }
    
    static func addNumber(_ number1: Int, number2: Int) -> Int {
        return number1 + number2
    }
    
    // 重现Rx 递归调度原理
    static func functinalRecursiveAdd(_ number: Int, result: Int = 0) -> Int {
        var _result = result
        // number 当前状态
        // recursive 递归调度函数
        // number - 1 模拟迭代器迭代
        add(number) { (number, recursive) in
            _result += number
            if number <= 0 {
                
                print("recursive end")
            } else {
                recursive(number - 1)
            }
        }
        
        return _result
        
    }
    
    static func add(_ state: Int, action: @escaping Action)  {
        
        let item = RecursiveAdderItem(action: action)
        item.add(state)
        
    }
}

struct RecursiveAdderItem {
    let action: Action
    
    // 这里什么也没有做,在rx中这一步是做了线程调度的
    func add(_ state: Int)  {
        action(state, self.add)
    }
}



上一篇下一篇

猜你喜欢

热点阅读