Android单向数据流——MvRx核心源码解析
前言
背景知识,Android真响应式架构——MvRx。
MvRx是什么?最简单的解释是,Mv(ModelView)和Rx(ReactiveX),MvRx是AAC ModelView和RxJava的结合,但这只是表象,MvRx最核心的价值在于,它是React理念在原生Android开发领域的一种实现,并且这种实现充分利用了AAC和Kotlin的能力,以一种相对简单的方式实现,降低了它的理解和使用门槛,对客户端是比较友好的。那什么是React的理念呢?我认为关键有两点:
- View由状态(State)表征
- 状态由事件(Event,或者称为动作、意图,不同地方有不同的叫法)单向驱动
如上图所示,使用State数据渲染View,View产生Event事件,发送给StateStore,StateStore根据旧的State构造出新的State,再传递给View。这种单向数据流的架构在其它地方也被称作MVI(Model-View-Intent)架构。
那么实现这么一套架构复杂吗?其实还是很复杂的,其中涉及到State的管理,对State的观察,View如何由State表征等等一系列问题。但是,如果借助现有的一些库,整个实现也并不复杂。State用Kotlin data class来表示,从View到StateStore的连接借助ViewModel来实现,State的管理StateStore使用RxJava中的Subject来完成,对State的观察使用LifecycleObserver+RxJava Observer来实现,从State到View的渲染借助Epoxy来完成。这就是MvRx的设计思路,所以MvRx本身的源码并不多,也不复杂。本文主要对MvRx的核心源码StateStore和State Observer进行解析。源码版本com.airbnb.android:mvrx:1.5.1
。
1. 太长不看
每个MvRxViewModel都包含一个MvRxStateStore,通过MvRxViewModel对MvRxStateStore进行的setState
,getState
的操作会被分别放进两个队列中,并且会给MvRxStateStore中的一个BehaviorSubject
发送一个信号,BehaviorSubject
收到信号后,会在一个单独的线程从setState
,getState
的队列中取元素,先从setState
队列中取,之后是getState
队列(也就是说异步getState获取的State是之前还没执行的setState执行之后的最新的State),直至getState
队列都为空;每次从setState
队列取一个元素(元素类型State.()->State
,一般称之为reducer
)后,都会执行这个reducer
,完成从旧State到新State的更新,然后通知StateObservable
,这样外部就能观察到State的变化了。
上图出自Unidirectional data flow on Android using Kotlin,他把Event称为Action,这篇文章从概念上解释了什么是Android单向数据流,如果你不关心具体的源码实现,只是想从概念上对单向数据流建立一个图像,那么推荐你看看这篇文章。
2. MvRxStateStore
如上所述,StateStore的职责就是对State的管理,主要包括接收外部输入,由旧生新;让外部可以观察State的变化,基本上就这些。MvRx定义了一个接口MvRxStateStore明确了这些职责:
interface MvRxStateStore<S : Any> : Disposable {
val state: S //同步方式获取 state
fun get(block: (S) -> Unit) //异步方式获取 state
fun set(stateReducer: S.() -> S) // 更新 state,由旧生新
val observable: Observable<S> //外部观察 state
}
获取State的方式有两种:同步和异步,获取State并不等同于观察State,一般是为了根据当前State决定下一步的动作,是一种一次性的行为。
每个MvRxViewModel都必须明确它的State,并且每个MvRxViewModel也都包含一个MvRxStateStore:
interface MvRxState
abstract class BaseMvRxViewModel<S : MvRxState>(
initialState: S,
debugMode: Boolean,
private val stateStore: MvRxStateStore<S> = RealMvRxStateStore(initialState)
) : ViewModel() {
//通过stateStore同步获取state
internal val state: S
get() = stateStore.state
protected fun setState(reducer: S.() -> S) {
if (debugMode) {
//debug模式下会进行一些验证
//核心逻辑就是,reducer运行两遍,两遍运行得到的 state要求相等,这基本上保证了reducer是个“纯函数”
} else {
stateStore.set(reducer)
}
}
protected fun withState(block: (state: S) -> Unit) {
stateStore.get(block)
}
}
MvRxState只是一个标记接口,所以State可以是任意类,只要它实现了MvRxState接口。但是,BaseMvRxViewModel包含了对State的验证逻辑,要求State必须是Kotlin data class,并且其所有属性必须是不可变的(immutable),不能在data class中定义var
属性,并且不能使用诸如MutableList
,MutableMap
之类的可变集合,这是为了方便MvRxStateStore对State的管理,强制要求改变State必须通过MvRxStateStore由旧生新。
每个MvRxViewModel都包含一个MvRxStateStore,默认值是RealMvRxStateStore
,并且构造时就传入了初始State,初始State对于RealMvRxStateStore
而言是很重要的。
BaseMvRxViewModel
对MvRxStateStore
的使用是很直接的,其实对MvRxStateStore.observable
也是很直接的使用,但是observable
的观察者比较复杂,我们之后再看。
3. RealMvRxStateStore
class RealMvRxStateStore<S : Any>(initialState: S) : MvRxStateStore<S> {
//State Subject
private val subject: BehaviorSubject<S> = BehaviorSubject.createDefault(initialState)
//刷新“队列”的信号
private val flushQueueSubject = BehaviorSubject.create<Unit>()
//“队列”实体,Jobs类的定义之后会看到
private val jobs = Jobs<S>()
//State Observable
override val observable: Observable<S> = subject
//同步获取 state
override val state: S
get() = subject.value!! //必然不为null,因为subject创建的时候提供了初始值 initialState
init {
//在一个单独的线程刷新队列,避免setState的竞争
flushQueueSubject.observeOn(Schedulers.newThread())
//在flushQueueSubject收到信号的时候,刷新队列,state由旧生新的核心逻辑就在flushQueues方法中
.subscribe({ flushQueues() }, ::handleError)
.registerDisposable()
}
//异步获取 state
override fun get(block: (S) -> Unit) {
//入getState队列,然后发送信号
jobs.enqueueGetStateBlock(block)
flushQueueSubject.onNext(Unit)
}
override fun set(stateReducer: S.() -> S) {
//入setState队列,然后发送信号
jobs.enqueueSetStateBlock(stateReducer)
flushQueueSubject.onNext(Unit)
}
}
一个小的背景知识,Subject
是RxJava中一种既是Observable
又是Observer
的类,而BehaviorSubject
是Subject
几个子类中最“正常”的那一个,把它自己接收到的数据,再发送出去。RealMvRxStateStore
中包含两个BehaviorSubject
,一个就是我们的State Subject,另一个是信号Subject。当set,get State的时候,会把要执行的内容入队列,然后向信号Subject发送信号,在RealMvRxStateStore
的构造函数中就已经注册了信号Subject的观察者——flushQueues()
,信号会在一个新的线程中被接收到,接收到信号就会在这个新线程中执行flushQueues()
,虽然我们还没有看flushQueues()
的具体内容,但想想也知道肯定就是从队列中取内容然后执行,如果是setState会设置“新State”给State Subject,这样新的state就传递了出去,Done。
来看看“队列”的实体类Jobs:
class Jobs<S> {
private val getStateQueue = LinkedList<(state: S) -> Unit>()
private var setStateQueue = LinkedList<S.() -> S>()
@Synchronized
fun enqueueGetStateBlock(block: (state: S) -> Unit) {
getStateQueue.add(block)
}
@Synchronized
fun enqueueSetStateBlock(block: S.() -> S) {
setStateQueue.add(block)
}
@Synchronized
fun dequeueGetStateBlock(): ((state: S) -> Unit)? {
//getStateQueue为空,就会返回 null
return getStateQueue.poll()
}
//出队所有setStateQueue
@Synchronized
fun dequeueAllSetStateBlocks(): List<(S.() -> S)>? {
if (setStateQueue.isEmpty()) return null
val queue = setStateQueue
setStateQueue = LinkedList()
return queue
}
}
相当之无聊,就两个LinkedList
,进进队出出队。RealMvRxStateStore
的set
和get
方法一般都会是在后台线程中执行,对setStateQueue
,getStateQueue
进出队是会有多线程同步的问题,所以这些方法都加了锁。
好了,现在一切都准备好了,就剩下flushQueues()
的具体实现了:
class RealMvRxStateStore<S : Any>(initialState: S) : MvRxStateStore<S> {
//不常用的 tailrec功能,即尾递归优化,会把递归调用改为循环的模式
private tailrec fun flushQueues() {
//先出队 setStateQueue并且执行
flushSetStateQueue()
//然后再出队 getStateQueue
val block = jobs.dequeueGetStateBlock() ?: return
block(state)
//递归调用,直至getStateQueue为空
flushQueues()
}
private fun flushSetStateQueue() {
//所有setStateQueue全部出队,挨个执行
val blocks = jobs.dequeueAllSetStateBlocks() ?: return
for (block in blocks) {
val newState = state.block()
// state不同才发送
if (newState != state) {
subject.onNext(newState)
}
}
}
}
正如我们预期的那样,flushQueues()
核心逻辑就是出队然后执行,只不过执行顺序是有讲究的。在MvRxModelView中,我们经常这么写:
getState { state ->
if (state.isLoading) return
//或者是发送一个网络请求,总之最后会调用 setState
setState { state ->
state.copy(...)
}
}
也就是说,getStateQueue
队列中的内容时常包含着入队setStateQueue
,那么考虑下面一个问题:
getStateA {
setStateA {}
}
getStateB {
setStateB {}
}
如果是像上面这样的调用顺序,那么我们期望的执行顺序是getStateA->setStateA->getStateB->setStateB
,你仔细思考一下flushQueues()
的出队顺序,得到的执行顺序正是我们期望的那样。
最后,调用subject.onNext(newState)
,通过RealMvRxStateStore
的override val observable: Observable<S> = subject
,新state状态就被传递了出去。注意,flushQueues()
是在一个单独的新线程中执行,每个RealMvRxStateStore
都会新建一个线程,多个RealMvRxStateStore
之间不存在竞争。
这里补充一个小细节,
flushSetStateQueue
方法中会依次遍历setStateQueue
出队的各个元素,并且对每个元素执行完reducer操作之后,都会把得到的新的State传递出去subject.onNext(newState)
,其实还有另外一种选择,那就是“状态折叠”,假设setStateQueue
出队元素有两个A,B,我们可以A,B依次执行,但是只对外传递最终的newStateB
,A作为中间状态就不对外传递了,这样可以提高效率,但是这样会引发一个问题,oldState->newStateA->newStateB
,如果只传递最终的状态newStateB
,newStateA
就会消失了,至于这是否是我们想要的结果,这取决于实际情况,所以“状态折叠”会提高效率,但是可能会有问题,不进行“状态折叠”会些许降低效率,但是总是一个不会出错方案,MvRx也是从最初的“状态折叠”调整为现在的不再折叠。
3. MvRxLifecycleAwareObserver
经过RealMvRxStateStore
的一番操作,新的State通过Observable被传递了出去,要想观察State的变化就需要一个Observer,同时,这个Observe还需要感知生命周期,所以这个这个观察者应该是RxJava Observer + LifecycleObserver,在MvRx中的实现就是MvRxLifecycleAwareObserver
。
有一点需要明确,无论我们以何种方式观察State的变化,观察整个State,还是观察State中的某几个属性,在View中观察,还是在ViewModel中观察,最终都会调用BaseMvRxViewModel
的subscribeLifecycle
方法:
abstract class BaseMvRxViewModel<S : MvRxState> {
private fun <T : Any> Observable<T>.subscribeLifecycle(
lifecycleOwner: LifecycleOwner? = null,
deliveryMode: DeliveryMode,
subscriber: (T) -> Unit
): Disposable {
//观察者逻辑在主线程执行
return observeOn(AndroidSchedulers.mainThread())
.resolveSubscription(lifecycleOwner, deliveryMode, subscriber)
.disposeOnClear()
}
private fun <T : Any> Observable<T>.resolveSubscription(
lifecycleOwner: LifecycleOwner? = null,
deliveryMode: DeliveryMode, //忽略这个参数
subscriber: (T) -> Unit
): Disposable = if (lifecycleOwner == null || FORCE_DISABLE_LIFECYCLE_AWARE_OBSERVER) {
//没有提供生命周期,或者调试状态,直接观察
this.subscribe(subscriber)
} else {
//绑定生命周期,那么观察者会被包装成 MvRxLifecycleAwareObserver
this.subscribeWith(
MvRxLifecycleAwareObserver(
lifecycleOwner,
onNext = Consumer { value ->
subscriber(value)
}
)
)
}
}
如果提供的观察者subscriber
绑定了生命周期,那么它会被包装成MvRxLifecycleAwareObserver
。既然我们提供了生命周期,那么就是想实现类似LiveData
那样的数据观察模式,生命周期≥STARTED时,才通知观察者。MvRxLifecycleAwareObserver
就是实现这样的逻辑。
//实现了三个接口 LifecycleObserver, Observer<T>, Disposable,我们忽略Disposable的相关逻辑,主要看LifecycleObserver, Observer
internal class MvRxLifecycleAwareObserver<T : Any>(
private var owner: LifecycleOwner?,
private val activeState: Lifecycle.State = State.STARTED,
private val deliveryMode: DeliveryMode = RedeliverOnStart,
private var lastDeliveredValueFromPriorObserver: T?,
private var sourceObserver: Observer<T>?,
private val onDispose: () -> Unit
) : AtomicReference<Disposable>(), LifecycleObserver, Observer<T>, Disposable {
//次构造函数,就是把sourceObserver一个参数拆分成onNext, onError, onComplete, onSubscribe四个参数
constructor(
owner: LifecycleOwner,
activeState: Lifecycle.State = DEFAULT_ACTIVE_STATE,
deliveryMode: DeliveryMode = RedeliverOnStart,
lastDeliveredValue: T? = null,
onComplete: Action = Functions.EMPTY_ACTION,
onSubscribe: Consumer<in Disposable> = Functions.emptyConsumer(),
onError: Consumer<in Throwable> = Functions.ON_ERROR_MISSING,
onNext: Consumer<T> = Functions.emptyConsumer(),
onDispose: () -> Unit
) : this(owner, activeState, deliveryMode, lastDeliveredValue, LambdaObserver<T>(onNext, onError, onComplete, onSubscribe), onDispose)
//上次未传输的值,未传输是因为 Lifecycle未活跃
private var lastUndeliveredValue: T? = null
//上次的值,可能已经传输过了
private var lastValue: T? = null
//Lifecycle处于未活跃状态时则上锁
private val locked = AtomicBoolean(true)
private val isUnlocked
get() = !locked.get()
override fun onSubscribe(d: Disposable) {
if (DisposableHelper.setOnce(this, d)) {
//开始观察时,也会对生命周期进行观察
owner!!.lifecycle.addObserver(this)
sourceObserver!!.onSubscribe(this)
}
}
//观察 Lifecycle ON_DESTROY事件
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun onDestroy() {
owner!!.lifecycle.removeObserver(this)
if (!isDisposed) {
dispose()
}
owner = null
sourceObserver = null
}
//观察 Lifecycle任意事件
@OnLifecycleEvent(Event.ON_ANY)
fun onLifecycleEvent() {
updateLock()
}
private fun updateLock() {
//每个Lifecycle事件都更新锁,处于活跃状态时解锁,否则加锁
if (owner?.lifecycle?.currentState?.isAtLeast(activeState) == true) {
unlock()
} else {
lock()
}
}
override fun onNext(nextValue: T) {
if (isUnlocked) {
sourceObserver!!.onNext(nextValue)
} else {
//记录下未传输的值
lastUndeliveredValue = nextValue
}
lastValue = nextValue
}
override fun onError(e: Throwable) {
if (!isDisposed) {
lazySet(DisposableHelper.DISPOSED)
sourceObserver!!.onError(e)
}
}
override fun onComplete() {
sourceObserver!!.onComplete()
}
private fun unlock() {
if (!locked.getAndSet(false)) {
return
}
if (!isDisposed) {
val valueToDeliverOnUnlock = //根据 deliveryMode确定要传输的值
lastUndeliveredValue = null
//只有Lifecycle处于活跃状态,并且没被dispose,才会传输值
if (valueToDeliverOnUnlock != null) {
onNext(valueToDeliverOnUnlock)
}
}
}
private fun lock() {
locked.set(true)
}
}
MvRxLifecycleAwareObserver
既是RxJava Observer又是LifecycleObserver,每个Lifecycle的Event事件,都会导致MvRxLifecycleAwareObserver
锁标记的更新,Lifecycle处于活跃时locked = false
,否则locked = true
,只有在Lifecycle处于活跃时才会调用我们定义的观察逻辑sourceObserver
,否则只是把值记录下来,待到活跃时再传输。最终sourceObserver
接收到的值还和deliveryMode
的取值有关,这是个sealed class,只有两种类型:RedeliverOnStart
和UniqueOnly
,顾名思义,在RedeliverOnStart
模式下,值会被重新传输给我们的观察者sourceObserver
,哪怕这个值之前已经传输过;而UniqueOnly
模式下则只会传输还未传输的值。比如手机不断的息屏亮屏,假设这期间State的值没有变化,那么在RedeliverOnStart
模式下,每次亮屏时,sourceObserver
都会接收到之前的值;UniqueOnly
模式下则不会接收到任何值(所以UniqueOnly
模式一般适用于一次性的行为,例如Toast,Snackbar,Log等)。简明起见,关于deliveryMode
的逻辑,在上述代码中都被删除了。
4. 总结
MvRx最核心的逻辑就是MvRxStateStore
了,这是实现单向数据流的关键,MvRx利用了RxJava Subject的特性,非常简明地完成了StateStore的功能。State管理的核心逻辑其实就是一个多向输入,单向输出的过程,而Subject的功能正契合这一需求,这使得RxJava Subject成了Android平台实现单向数据流的不二之选——直到Kotlin coroutines的出现。
关于Android单向数据流/MvRx更多的思考请看Android单向数据流——Side Effect。