Rx ObservableSequence 递归调度
DispatchQueue.global().async {
assert(!Thread.current.isMainThread)
Observable.from(["🐶", "🐱", "🐭", "🐹"], scheduler: MainScheduler.instance)
.subscribe { event in
assert(Thread.current.isMainThread)
print(event)
}
}
先看下简单的例子:
上面例子的订阅事件会被调度到主线程中去,那么具体是怎么调度,以及细节是什么样的呢?
public static func from(_ array: [E], scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return ObservableSequence(elements: array, scheduler: scheduler)
}
// ObservableSequenceSink run
func run() -> Disposable {
return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in
var mutableIterator = iterator
if let next = mutableIterator.0.next() {
print("scheduleRecursive: \(next)")
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
轻车熟路的找到入口位置
// scheduleRecursive method
public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> ()) -> ()) -> Disposable {
let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
recursiveScheduler.schedule(state)
return Disposables.create(with: recursiveScheduler.dispose)
}
这个scheduleRecursive
的入参非常复杂,要看懂它的结合上下文看,从方法名来看可以理解为递归调度,暂且先了解到这,继续看。
func schedule(_ state: State) {
var scheduleState: ScheduleState = .initial
let d = _scheduler.schedule(state) { (state) -> Disposable in
// best effort
if self._group.isDisposed {
return Disposables.create()
}
let action = self._lock.calculateLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self._group.remove(for: removeKey)
case .initial:
break
case .done:
break
}
scheduleState = .done
return self._action
}
if let action = action {
action(state, self.schedule)
}
return Disposables.create()
}
_lock.performLocked {
switch scheduleState {
case .added:
rxFatalError("Invalid state")
break
case .initial:
if let removeKey = _group.insert(d) {
scheduleState = .added(removeKey)
}
else {
scheduleState = .done
}
break
case .done:
break
}
}
}
暂且不管它的生命周期和线程安全 ,去掉这些细节代码就变成如下模样:
func schedule(_ state: State) {
let d = _scheduler.schedule(state) { (state) -> Disposable in
// best effort
if self._group.isDisposed {
return Disposables.create()
}
action(state, self.schedule)
return Disposables.create()
}
}
去掉这些边角料代码就清晰多了, schedule(state) { }
这个是一个线程调度,确保closure在指定的线程运行,详情参见:CurrentThreadScheduler
去掉这个线程调度,再来看一下:
func schedule(_ state: State) {
action(state, self.schedule)
}
再结合着看一下:
// 代码片段1
func run() -> Disposable {
return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in
var mutableIterator = iterator
if let next = mutableIterator.0.next() {
print("scheduleRecursive: \(next)")
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
// 代码片段2
recursiveScheduler.schedule(state)
// 代码片段3
func schedule(_ state: State) {
action(state, self.schedule)
}
用 ---->
表示触发的话,那么就是:
ObservableSequenceSink.run ----> recursiveScheduler.schedule ----> action, run函数的执行最终会触发action闭包的执行。好了现在来分析一下:
action(state, self.schedule)
, 将对象自身schedule
函数作为入参传入闭包,那么action闭包干了啥呢? 首先使用迭代器迭代下一个元素,如果不为空的话调用recurse(mutableIterator)
, 也就是 recursiveScheduler.schedule方法,recursiveScheduler.schedule又会调用action,如此往复直到遍历所有元素。对迭代器不太理解的可以参考下面这个简单的例子:
let numbers = [2, 3, 5, 7]
var numbersIterator = numbers.makeIterator()
while let num = numbersIterator.next() {
print(num)
}
// Prints "2"
// Prints "3"
// Prints "5"
// Prints "7"
再来复习一下递归,什么时候可以使用递归:
- 一个问题总可以分解为若干个规模更小的相似的问题
- 最小的子问题可以直接求解
显然序列的遍历问题满足这个条件,对一个序列的遍历总可以分解为:
- 求解序列的首部元素
- 遍历除开首部元素的子序列
回头再分析下 scheduleRecursive
方法
public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> ()) -> ()) -> Disposable
- state: 初始值,
action
在执行的时候必须要有个实参,这个值是由state决定 - action:重点介绍这个, 第一个入参: state 即当前的值
第二个入参: 即 (State) -> (), 这个闭包用来做一些额外的处理, 比如在上面的例子当中,就成功的将每一次迭代都放在指定的线程执行。
了解这些我们可以写个非递归的run方法,看起来如下:
return _parent._scheduler.schedule(_parent._elements.makeIterator(), action: { iterator -> Disposable in
var mutableIterator = iterator
while let next = mutableIterator.next() {
self.forwardOn(.next(next))
print("unScheduleRecursive: \(next)")
}
self.forwardOn(.completed)
self.dispose()
return Disposables.create()
})
你看多好啊,线程调度也实现了,而且简洁明了为什么非要用递归调度呢? 我一次调度就完事了。我想了很久终于想到了, 想象一个应用场景假设这个序列元素非常多,就比如有100万个,当迭代器迭代到第100个的时候突然客户说后面的我都不要了,那你怎么让这个迭代器停止避免不必要的运算呢?为了验证这个想法了写了如下代码:
let range = 0..<100000
let source = range.map {"\($0)" }
fromArrayDisposables = Observable.from(source)
.elementAt(1000)
.subscribe(onNext: {[weak self] element in
print(element)
})
然后我在run函数加了一日志语句, 果然迭代器迭代到1000的时候,就停止了不再迭代,换上我自己的写的run函数, 迭代器一口气迭代完了100000个元素,这里不讨论elementAt
具体是怎么实现的,只是想说明这个递归调度不是为了炫耀技术是,而是出于可扩展性需要而实现的。
线程安全和生命周期管理
func schedule(_ state: State) {
//FIX ME
// ConcurrentMainScheduler
var scheduleState: ScheduleState = .initial
let d = _scheduler.schedule(state) { (state) -> Disposable in
// best effort
if self._group.isDisposed {
return Disposables.create()
}
let action = self._lock.calculateLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self._group.remove(for: removeKey)
case .initial:
break
case .done:
break
}
scheduleState = .done
return self._action
}
if let action = action {
action(state, self.schedule)
}
return Disposables.create()
}
_lock.performLocked {
switch scheduleState {
case .added:
rxFatalError("Invalid state")
break
case .initial:
if let removeKey = _group.insert(d) {
scheduleState = .added(removeKey)
}
else {
scheduleState = .done
}
break
case .done:
break
}
}
}
这里主要是要保证scheduleState
的线程安全, schedule {}
的代码块不一定立马执行,如果调度的线程正忙的时候,只会简单的将closure存储起来。
//代码块1
let action = self._lock.calculateLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self._group.remove(for: removeKey)
case .initial:
break
case .done:
break
}
scheduleState = .done
return self._action
}
// 代码块2
_lock.performLocked {
switch scheduleState {
case .added:
rxFatalError("Invalid state")
break
case .initial:
if let removeKey = _group.insert(d) {
scheduleState = .added(removeKey)
}
else {
scheduleState = .done
}
break
case .done:
break
}
}
也就是说这两个代码块不一定按照顺序执行,而这两块代码都涉及到scheduleState
的状态,所以必须加锁。 先假设按12执行,那么皆大欢喜。
- 代码块1 会执行 initial case
- 代码块2 会执行 done case
这两个case什么都没干
那么按21执行呢?
- 代码块2 会执行 initial case
- 代码块1 会执行 added case
那么先将Disposable 加入到 _group
中进行管理,然后在闭包执行完毕之后,执行dispose
,这个主要是为了管理其生命周期.如果没有这个dispose
操作,有可能造成内存泄漏
Simple Demo
/// 实现阶加, 比如 输入5的话 ,就是 1 + 2 + 3 + 4 + 5
typealias Action = (_ state: Int, _ recurse: (Int) -> Void) -> Void
struct RecursiveAdder {
// 最简单的递归,递归的缺陷在于容易造成爆栈,当然这种递归也是尾递归,很多编译器能够对其进行优化
static func recursiveAdd(_ number: Int, result: Int = 0) -> Int {
if number <= 0 {
return result
} else {
let updateResult = result + number
return recursiveAdd(number - 1, result: updateResult)
}
}
// 非递归
static func unRecursiveAdd(_ number: Int) -> Int {
var m = number
var result = 0
while m > 0 {
result = addNumber(m, number2: result)
m -= 1
}
return result
}
static func addNumber(_ number1: Int, number2: Int) -> Int {
return number1 + number2
}
// 重现Rx 递归调度原理
static func functinalRecursiveAdd(_ number: Int, result: Int = 0) -> Int {
var _result = result
// number 当前状态
// recursive 递归调度函数
// number - 1 模拟迭代器迭代
add(number) { (number, recursive) in
_result += number
if number <= 0 {
print("recursive end")
} else {
recursive(number - 1)
}
}
return _result
}
static func add(_ state: Int, action: @escaping Action) {
let item = RecursiveAdderItem(action: action)
item.add(state)
}
}
struct RecursiveAdderItem {
let action: Action
// 这里什么也没有做,在rx中这一步是做了线程调度的
func add(_ state: Int) {
action(state, self.add)
}
}