Android单向数据流——MvRx核心源码解析

2020-05-07  本文已影响0人  珞泽珈群

前言

背景知识,Android真响应式架构——MvRx

MvRx是什么?最简单的解释是,Mv(ModelView)和Rx(ReactiveX),MvRx是AAC ModelView和RxJava的结合,但这只是表象,MvRx最核心的价值在于,它是React理念在原生Android开发领域的一种实现,并且这种实现充分利用了AAC和Kotlin的能力,以一种相对简单的方式实现,降低了它的理解和使用门槛,对客户端是比较友好的。那什么是React的理念呢?我认为关键有两点:

  1. View由状态(State)表征
  2. 状态由事件(Event,或者称为动作、意图,不同地方有不同的叫法)单向驱动

如上图所示,使用State数据渲染View,View产生Event事件,发送给StateStore,StateStore根据旧的State构造出新的State,再传递给View。这种单向数据流的架构在其它地方也被称作MVI(Model-View-Intent)架构。

那么实现这么一套架构复杂吗?其实还是很复杂的,其中涉及到State的管理,对State的观察,View如何由State表征等等一系列问题。但是,如果借助现有的一些库,整个实现也并不复杂。State用Kotlin data class来表示,从View到StateStore的连接借助ViewModel来实现,State的管理StateStore使用RxJava中的Subject来完成,对State的观察使用LifecycleObserver+RxJava Observer来实现,从State到View的渲染借助Epoxy来完成。这就是MvRx的设计思路,所以MvRx本身的源码并不多,也不复杂。本文主要对MvRx的核心源码StateStore和State Observer进行解析。源码版本com.airbnb.android:mvrx:1.5.1

1. 太长不看

每个MvRxViewModel都包含一个MvRxStateStore,通过MvRxViewModel对MvRxStateStore进行的setState,getState的操作会被分别放进两个队列中,并且会给MvRxStateStore中的一个BehaviorSubject发送一个信号,BehaviorSubject收到信号后,会在一个单独的线程从setState,getState的队列中取元素,先从setState队列中取,之后是getState队列(也就是说异步getState获取的State是之前还没执行的setState执行之后的最新的State),直至getState队列都为空;每次从setState队列取一个元素(元素类型State.()->State,一般称之为reducer)后,都会执行这个reducer,完成从旧State到新State的更新,然后通知StateObservable,这样外部就能观察到State的变化了。

上图出自Unidirectional data flow on Android using Kotlin,他把Event称为Action,这篇文章从概念上解释了什么是Android单向数据流,如果你不关心具体的源码实现,只是想从概念上对单向数据流建立一个图像,那么推荐你看看这篇文章。

2. MvRxStateStore

如上所述,StateStore的职责就是对State的管理,主要包括接收外部输入,由旧生新;让外部可以观察State的变化,基本上就这些。MvRx定义了一个接口MvRxStateStore明确了这些职责:

interface MvRxStateStore<S : Any> : Disposable {
    val state: S //同步方式获取 state
    fun get(block: (S) -> Unit) //异步方式获取 state
    fun set(stateReducer: S.() -> S) // 更新 state,由旧生新
    val observable: Observable<S> //外部观察 state
}

获取State的方式有两种:同步和异步,获取State并不等同于观察State,一般是为了根据当前State决定下一步的动作,是一种一次性的行为。

每个MvRxViewModel都必须明确它的State,并且每个MvRxViewModel也都包含一个MvRxStateStore:

interface MvRxState

abstract class BaseMvRxViewModel<S : MvRxState>(
    initialState: S,
    debugMode: Boolean,
    private val stateStore: MvRxStateStore<S> = RealMvRxStateStore(initialState)
) : ViewModel() {
    //通过stateStore同步获取state
    internal val state: S
        get() = stateStore.state
    
    
    protected fun setState(reducer: S.() -> S) {
        if (debugMode) {
            //debug模式下会进行一些验证
            //核心逻辑就是,reducer运行两遍,两遍运行得到的 state要求相等,这基本上保证了reducer是个“纯函数”
        } else {
            stateStore.set(reducer)
        }
    }

    
    protected fun withState(block: (state: S) -> Unit) {
        stateStore.get(block)
    }
}

MvRxState只是一个标记接口,所以State可以是任意类,只要它实现了MvRxState接口。但是,BaseMvRxViewModel包含了对State的验证逻辑,要求State必须是Kotlin data class,并且其所有属性必须是不可变的(immutable),不能在data class中定义var属性,并且不能使用诸如MutableListMutableMap之类的可变集合,这是为了方便MvRxStateStore对State的管理,强制要求改变State必须通过MvRxStateStore由旧生新。
每个MvRxViewModel都包含一个MvRxStateStore,默认值是RealMvRxStateStore,并且构造时就传入了初始State,初始State对于RealMvRxStateStore而言是很重要的。
BaseMvRxViewModelMvRxStateStore的使用是很直接的,其实对MvRxStateStore.observable也是很直接的使用,但是observable的观察者比较复杂,我们之后再看。

3. RealMvRxStateStore

class RealMvRxStateStore<S : Any>(initialState: S) : MvRxStateStore<S> {
    //State Subject
    private val subject: BehaviorSubject<S> = BehaviorSubject.createDefault(initialState)

    //刷新“队列”的信号
    private val flushQueueSubject = BehaviorSubject.create<Unit>()

    //“队列”实体,Jobs类的定义之后会看到
    private val jobs = Jobs<S>()

    //State Observable
    override val observable: Observable<S> = subject
    //同步获取 state
    override val state: S
        get() = subject.value!! //必然不为null,因为subject创建的时候提供了初始值 initialState

    init {
        //在一个单独的线程刷新队列,避免setState的竞争
        flushQueueSubject.observeOn(Schedulers.newThread())
            //在flushQueueSubject收到信号的时候,刷新队列,state由旧生新的核心逻辑就在flushQueues方法中
            .subscribe({ flushQueues() }, ::handleError)
            .registerDisposable()
    }

    //异步获取 state
    override fun get(block: (S) -> Unit) {
        //入getState队列,然后发送信号
        jobs.enqueueGetStateBlock(block)
        flushQueueSubject.onNext(Unit)
    }

    override fun set(stateReducer: S.() -> S) {
        //入setState队列,然后发送信号
        jobs.enqueueSetStateBlock(stateReducer)
        flushQueueSubject.onNext(Unit)
    }
}

一个小的背景知识,Subject是RxJava中一种既是Observable又是Observer的类,而BehaviorSubjectSubject几个子类中最“正常”的那一个,把它自己接收到的数据,再发送出去。RealMvRxStateStore中包含两个BehaviorSubject,一个就是我们的State Subject,另一个是信号Subject。当set,get State的时候,会把要执行的内容入队列,然后向信号Subject发送信号,在RealMvRxStateStore的构造函数中就已经注册了信号Subject的观察者——flushQueues(),信号会在一个新的线程中被接收到,接收到信号就会在这个新线程中执行flushQueues(),虽然我们还没有看flushQueues()的具体内容,但想想也知道肯定就是从队列中取内容然后执行,如果是setState会设置“新State”给State Subject,这样新的state就传递了出去,Done。

来看看“队列”的实体类Jobs:

class Jobs<S> {

    private val getStateQueue = LinkedList<(state: S) -> Unit>()
    private var setStateQueue = LinkedList<S.() -> S>()

    @Synchronized
    fun enqueueGetStateBlock(block: (state: S) -> Unit) {
        getStateQueue.add(block)
    }

    @Synchronized
    fun enqueueSetStateBlock(block: S.() -> S) {
        setStateQueue.add(block)
    }

    @Synchronized
    fun dequeueGetStateBlock(): ((state: S) -> Unit)? {
        //getStateQueue为空,就会返回 null
        return getStateQueue.poll()
    }

    //出队所有setStateQueue
    @Synchronized
    fun dequeueAllSetStateBlocks(): List<(S.() -> S)>? {
        if (setStateQueue.isEmpty()) return null

        val queue = setStateQueue
        setStateQueue = LinkedList()
        return queue
    }
}

相当之无聊,就两个LinkedList,进进队出出队。RealMvRxStateStoresetget方法一般都会是在后台线程中执行,对setStateQueue,getStateQueue进出队是会有多线程同步的问题,所以这些方法都加了锁。

好了,现在一切都准备好了,就剩下flushQueues()的具体实现了:

class RealMvRxStateStore<S : Any>(initialState: S) : MvRxStateStore<S> {
    //不常用的 tailrec功能,即尾递归优化,会把递归调用改为循环的模式
    private tailrec fun flushQueues() {
        //先出队 setStateQueue并且执行
        flushSetStateQueue()
        //然后再出队 getStateQueue
        val block = jobs.dequeueGetStateBlock() ?: return
        block(state)
        //递归调用,直至getStateQueue为空
        flushQueues()
    }

    private fun flushSetStateQueue() {
        //所有setStateQueue全部出队,挨个执行
        val blocks = jobs.dequeueAllSetStateBlocks() ?: return
        for (block in blocks) {
            val newState = state.block()
            // state不同才发送
            if (newState != state) {
                subject.onNext(newState)
            }
        }
    }
}

正如我们预期的那样,flushQueues()核心逻辑就是出队然后执行,只不过执行顺序是有讲究的。在MvRxModelView中,我们经常这么写:

getState { state ->
  if (state.isLoading) return
  //或者是发送一个网络请求,总之最后会调用 setState
  setState { state ->
    state.copy(...)
  }
}

也就是说,getStateQueue队列中的内容时常包含着入队setStateQueue,那么考虑下面一个问题:

getStateA {
  setStateA {}
}
getStateB {
  setStateB {}
}

如果是像上面这样的调用顺序,那么我们期望的执行顺序是getStateA->setStateA->getStateB->setStateB,你仔细思考一下flushQueues()的出队顺序,得到的执行顺序正是我们期望的那样。
最后,调用subject.onNext(newState),通过RealMvRxStateStoreoverride val observable: Observable<S> = subject,新state状态就被传递了出去。注意,flushQueues()是在一个单独的新线程中执行,每个RealMvRxStateStore都会新建一个线程,多个RealMvRxStateStore之间不存在竞争。

这里补充一个小细节,flushSetStateQueue方法中会依次遍历setStateQueue出队的各个元素,并且对每个元素执行完reducer操作之后,都会把得到的新的State传递出去subject.onNext(newState),其实还有另外一种选择,那就是“状态折叠”,假设setStateQueue出队元素有两个A,B,我们可以A,B依次执行,但是只对外传递最终的newStateB,A作为中间状态就不对外传递了,这样可以提高效率,但是这样会引发一个问题,oldState->newStateA->newStateB,如果只传递最终的状态newStateBnewStateA就会消失了,至于这是否是我们想要的结果,这取决于实际情况,所以“状态折叠”会提高效率,但是可能会有问题,不进行“状态折叠”会些许降低效率,但是总是一个不会出错方案,MvRx也是从最初的“状态折叠”调整为现在的不再折叠。

3. MvRxLifecycleAwareObserver

经过RealMvRxStateStore的一番操作,新的State通过Observable被传递了出去,要想观察State的变化就需要一个Observer,同时,这个Observe还需要感知生命周期,所以这个这个观察者应该是RxJava Observer + LifecycleObserver,在MvRx中的实现就是MvRxLifecycleAwareObserver
有一点需要明确,无论我们以何种方式观察State的变化,观察整个State,还是观察State中的某几个属性,在View中观察,还是在ViewModel中观察,最终都会调用BaseMvRxViewModelsubscribeLifecycle方法:

abstract class BaseMvRxViewModel<S : MvRxState> {
    
    private fun <T : Any> Observable<T>.subscribeLifecycle(
        lifecycleOwner: LifecycleOwner? = null,
        deliveryMode: DeliveryMode,
        subscriber: (T) -> Unit
    ): Disposable {
        //观察者逻辑在主线程执行
        return observeOn(AndroidSchedulers.mainThread())
            .resolveSubscription(lifecycleOwner, deliveryMode, subscriber)
            .disposeOnClear()
    }

    private fun <T : Any> Observable<T>.resolveSubscription(
        lifecycleOwner: LifecycleOwner? = null,
        deliveryMode: DeliveryMode, //忽略这个参数
        subscriber: (T) -> Unit
    ): Disposable = if (lifecycleOwner == null || FORCE_DISABLE_LIFECYCLE_AWARE_OBSERVER) {
        //没有提供生命周期,或者调试状态,直接观察
        this.subscribe(subscriber)
    } else {
        //绑定生命周期,那么观察者会被包装成 MvRxLifecycleAwareObserver
        this.subscribeWith(
            MvRxLifecycleAwareObserver(
                lifecycleOwner,
                onNext = Consumer { value ->
                    subscriber(value)
                }
            )
        )
    }
}

如果提供的观察者subscriber绑定了生命周期,那么它会被包装成MvRxLifecycleAwareObserver。既然我们提供了生命周期,那么就是想实现类似LiveData那样的数据观察模式,生命周期≥STARTED时,才通知观察者。MvRxLifecycleAwareObserver就是实现这样的逻辑。

//实现了三个接口 LifecycleObserver, Observer<T>, Disposable,我们忽略Disposable的相关逻辑,主要看LifecycleObserver, Observer
internal class MvRxLifecycleAwareObserver<T : Any>(
    private var owner: LifecycleOwner?,
    private val activeState: Lifecycle.State = State.STARTED,
    private val deliveryMode: DeliveryMode = RedeliverOnStart,
    private var lastDeliveredValueFromPriorObserver: T?,
    private var sourceObserver: Observer<T>?,
    private val onDispose: () -> Unit
) : AtomicReference<Disposable>(), LifecycleObserver, Observer<T>, Disposable {
    
    //次构造函数,就是把sourceObserver一个参数拆分成onNext, onError, onComplete, onSubscribe四个参数
    constructor(
        owner: LifecycleOwner,
        activeState: Lifecycle.State = DEFAULT_ACTIVE_STATE,
        deliveryMode: DeliveryMode = RedeliverOnStart,
        lastDeliveredValue: T? = null,
        onComplete: Action = Functions.EMPTY_ACTION,
        onSubscribe: Consumer<in Disposable> = Functions.emptyConsumer(),
        onError: Consumer<in Throwable> = Functions.ON_ERROR_MISSING,
        onNext: Consumer<T> = Functions.emptyConsumer(),
        onDispose: () -> Unit
    ) : this(owner, activeState, deliveryMode, lastDeliveredValue, LambdaObserver<T>(onNext, onError, onComplete, onSubscribe), onDispose)

    //上次未传输的值,未传输是因为 Lifecycle未活跃
    private var lastUndeliveredValue: T? = null
    //上次的值,可能已经传输过了
    private var lastValue: T? = null
    //Lifecycle处于未活跃状态时则上锁
    private val locked = AtomicBoolean(true)
    private val isUnlocked
        get() = !locked.get()

    override fun onSubscribe(d: Disposable) {
        if (DisposableHelper.setOnce(this, d)) {
            //开始观察时,也会对生命周期进行观察
            owner!!.lifecycle.addObserver(this)
            sourceObserver!!.onSubscribe(this)
        }
    }

    //观察 Lifecycle ON_DESTROY事件
    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun onDestroy() {
        owner!!.lifecycle.removeObserver(this)
        if (!isDisposed) {
            dispose()
        }
        owner = null
        sourceObserver = null
    }

    //观察 Lifecycle任意事件
    @OnLifecycleEvent(Event.ON_ANY)
    fun onLifecycleEvent() {
        updateLock()
    }

    private fun updateLock() {
        //每个Lifecycle事件都更新锁,处于活跃状态时解锁,否则加锁
        if (owner?.lifecycle?.currentState?.isAtLeast(activeState) == true) {
            unlock()
        } else {
            lock()
        }
    }

    override fun onNext(nextValue: T) {
        if (isUnlocked) {
            sourceObserver!!.onNext(nextValue)
        } else {
            //记录下未传输的值
            lastUndeliveredValue = nextValue
        }
        lastValue = nextValue
    }

    override fun onError(e: Throwable) {
        if (!isDisposed) {
            lazySet(DisposableHelper.DISPOSED)
            sourceObserver!!.onError(e)
        }
    }

    override fun onComplete() {
        sourceObserver!!.onComplete()
    }

    private fun unlock() {
        if (!locked.getAndSet(false)) {
            return
        }
        if (!isDisposed) {
            val valueToDeliverOnUnlock = //根据 deliveryMode确定要传输的值
            lastUndeliveredValue = null
            //只有Lifecycle处于活跃状态,并且没被dispose,才会传输值
            if (valueToDeliverOnUnlock != null) {
                onNext(valueToDeliverOnUnlock)
            }
        }
    }

    private fun lock() {
        locked.set(true)
    }
}

MvRxLifecycleAwareObserver既是RxJava Observer又是LifecycleObserver,每个Lifecycle的Event事件,都会导致MvRxLifecycleAwareObserver锁标记的更新,Lifecycle处于活跃时locked = false,否则locked = true,只有在Lifecycle处于活跃时才会调用我们定义的观察逻辑sourceObserver,否则只是把值记录下来,待到活跃时再传输。最终sourceObserver接收到的值还和deliveryMode的取值有关,这是个sealed class,只有两种类型:RedeliverOnStartUniqueOnly,顾名思义,在RedeliverOnStart模式下,值会被重新传输给我们的观察者sourceObserver,哪怕这个值之前已经传输过;而UniqueOnly模式下则只会传输还未传输的值。比如手机不断的息屏亮屏,假设这期间State的值没有变化,那么在RedeliverOnStart模式下,每次亮屏时,sourceObserver都会接收到之前的值;UniqueOnly模式下则不会接收到任何值(所以UniqueOnly模式一般适用于一次性的行为,例如Toast,Snackbar,Log等)。简明起见,关于deliveryMode的逻辑,在上述代码中都被删除了。

4. 总结

MvRx最核心的逻辑就是MvRxStateStore了,这是实现单向数据流的关键,MvRx利用了RxJava Subject的特性,非常简明地完成了StateStore的功能。State管理的核心逻辑其实就是一个多向输入,单向输出的过程,而Subject的功能正契合这一需求,这使得RxJava Subject成了Android平台实现单向数据流的不二之选——直到Kotlin coroutines的出现。

关于Android单向数据流/MvRx更多的思考请看Android单向数据流——Side Effect

上一篇下一篇

猜你喜欢

热点阅读