Android进阶宝典 -- Kotlin中的冷流和热流
在实际的开发中,伙伴们对于流的使用很少,或者不知道什么时候会使用到流,因此在这篇文章中,我将会详细介绍Kotlin中常见的流的使用及原理。
1 Kotlin中的Flow
Flow是所有流中的基础,是常见的冷流之一,这里我们先不介绍冷流和热流的关系,我们可以先通过一段代码来看Flow到底能做什么事。
val flow: Flow<Int> = flow {
while (count < 20) {
Log.e(TAG, "发射流数据 $count")
//发送数据
emit(count)
count++
}
}
我们通过flow{ }数据流构建器创建了一个Int型的数据流,它是属于生产者,通过emit将数据源源不断的发送到接收者方,如果接收者想要接收这些数据,那么需要通过collect{ }函数来收集。
lifecycleScope.launch {
Log.e(TAG, "开始收集数据")
viewModel.flow.collect {
Log.e(TAG, "collect:$it")
}
}
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
这其实是一段比较简单的代码实现,为什么要在协程中收集,是因为collect函数是一个挂起函数,这也意味着我们可以通过异步的方式来获取数据,同时更新屏幕画面。
2023-10-14 13:03:19.893 29157-29157 layz4android com.example.myapplication E 开始收集数据
2023-10-14 13:03:19.893 29157-29157 layz4android com.example.myapplication E 发射流数据 0
2023-10-14 13:03:19.895 29157-29157 layz4android com.example.myapplication E collect:0
2023-10-14 13:03:19.895 29157-29157 layz4android com.example.myapplication E 发射流数据 1
2023-10-14 13:03:19.895 29157-29157 layz4android com.example.myapplication E collect:1
2023-10-14 13:03:19.895 29157-29157 layz4android com.example.myapplication E 发射流数据 2
2023-10-14 13:03:19.895 29157-29157 layz4android com.example.myapplication E collect:2
2023-10-14 13:03:19.896 29157-29157 layz4android com.example.myapplication E 发射流数据 3
2023-10-14 13:03:19.896 29157-29157 layz4android com.example.myapplication E collect:3
2023-10-14 13:03:19.896 29157-29157 layz4android com.example.myapplication E 发射流数据 4
我们可以看到,当接收方确定开始接收数据的时候,flow{ }才开始生产数据发送数据,也就是说通过Flow构建器创建的流不能独立于collect之外单独存在,只有调用collect才能算是完整的流式链路,这种就是冷流。
前面我们提到了,在调用collect的时候,同样因为emit是挂起函数所以必须要在协程中收集,那么发送方在调用emit的时候,能够随意切换协程上下文吗?
答案是不可以,这也是Flow虽然得益于挂起函数的异步回调,但是是有限制的,在emit发送数据时,不能提供不同协程上下文的数据,看下面的例子。
val flow: Flow<Int> = flow {
while (count < 20) {
Log.e(TAG, "发射流数据 $count")
if (count > 10) {
viewModelScope.launch {
withContext(Dispatchers.IO) {
emit(count)
}
}
} else {
//发送数据
emit(count)
}
count++
}
}
当count累加到10后,通过协程切换了上下文再次调用emit函数,此时接收方便收不到任何数据,执行结束之后还导致了崩溃。
2023-10-14 13:22:30.146 462-462 layz4android com.example.myapplication E 发射流数据 10
2023-10-14 13:22:30.146 462-462 layz4android com.example.myapplication E collect:10
2023-10-14 13:22:30.146 462-462 layz4android com.example.myapplication E 发射流数据 11
2023-10-14 13:22:30.146 462-462 layz4android com.example.myapplication E 发射流数据 12
2023-10-14 13:22:30.146 462-462 layz4android com.example.myapplication E 发射流数据 13
2023-10-14 13:22:30.146 462-462 layz4android com.example.myapplication E 发射流数据 14
2023-10-14 13:22:30.146 462-462 layz4android com.example.myapplication E 发射流数据 15
2023-10-14 13:22:30.147 462-462 layz4android com.example.myapplication E 发射流数据 16
2023-10-14 13:22:30.147 462-462 layz4android com.example.myapplication E 发射流数据 17
2023-10-14 13:22:30.147 462-462 layz4android com.example.myapplication E 发射流数据 18
2023-10-14 13:22:30.147 462-462 layz4android com.example.myapplication E 发射流数据 19
1.png
所以对于冷流来说,不能随意切换协程上下文,这也是flow{ }最终返回的是一个SafeFlow的原因吧,当然这里也给到了一个建议,使用channelFlow{ }代替flow{ },这也意味着热流是能够绕开这个限制的。
1.1 流的取消
假如现在有一个场景,当从数据流中拿到了想要的数据之后,不再需要提供方发送数据,也就是说需要流停止发送数据,有什么方案吗?
(1)取消协程
在之前我们介绍协程时,协程是可以取消的,如果协程收集被取消,那么此操作也会让底层提供方停止活动。
lifecycleScope.launch {
Log.e(TAG, "开始收集数据")
viewModel.flow.collect {
Log.e(TAG, "collect:$it")
if (it > 10) {
cancel()
}
}
}
在我们收集数据时,如果数据大于10,那么就取消协程,此时就可以取消流的发送,以便节省不必要的数据请求。
2023-10-14 13:51:25.927 6789-6789 layz4android com.example.myapplication E 发射流数据 9
2023-10-14 13:51:25.927 6789-6789 layz4android com.example.myapplication E collect:9
2023-10-14 13:51:25.927 6789-6789 layz4android com.example.myapplication E 发射流数据 10
2023-10-14 13:51:25.927 6789-6789 layz4android com.example.myapplication E collect:10
2023-10-14 13:51:25.927 6789-6789 layz4android com.example.myapplication E 发射流数据 11
2023-10-14 13:51:25.927 6789-6789 layz4android com.example.myapplication E collect:11
2023-10-14 13:51:25.928 6789-6789 layz4android com.example.myapplication E 发射流数据 12
从日志中看,当收集到数据11之后,后续的数据便不会再发送过来。
(2)数据全部发送完成
其实这就是一个正常的数据发送自动关闭流的一个过程,此时发送方会停止流的发送。如果你想要在收集完成之后,再次调用collect,并执行发送方的代码,通过上述这种方式是无法实现的。
1.2 流数据的共享
接着上面的例子,如果想要有多个接收方去同时收集数据,那么就需要将这个流打造成一个共享流,此时可以使用shareIn/stateIn操作符,将冷流转换为一个热流。
什么是共享流,官方文档中的解释就是,通过shareIn操作符生成的Flow,当有多个数据收集器采集数据时,不会重复创建多个Flow,而是复用同一个实例。感觉这好像一句废话,我们在同一个ViewModel实例下,取任何一个成员变量当然都是一个实例了,这个不是核心,核心则是在于数据的共享。
lifecycleScope.launch {
Log.e(TAG, "开始收集数据 ${viewModel.flow.hashCode()}")
launch {
viewModel.flow.collect {
Log.e(TAG, "collect:$it")
if (it > 10) {
cancel()
}
}
}
launch {
viewModel.flow.collect {
Log.e(TAG, "另一个观察者开始收集数据:$it")
}
}
}
有这样一个例子,在第一个收集器取到11之后,就停止收集,此时第二个收集器开始收集数据,因为传统方式,当emit发射完成之后,发射出去的数据不会重新放回采集器,因此第二个收集器便会接着剩下的数据继续采集。
2023-10-14 16:09:17.397 1839-1839 layz4android com.example.myapplication E collect:11
2023-10-14 16:09:17.397 1839-1839 layz4android com.example.myapplication E 发射流数据 12
2023-10-14 16:09:17.398 1839-1839 layz4android com.example.myapplication E 发射流数据 12
2023-10-14 16:09:17.398 1839-1839 layz4android com.example.myapplication E 另一个观察者开始收集数据:12
2023-10-14 16:09:17.398 1839-1839 layz4android com.example.myapplication E 发射流数据 13
2023-10-14 16:09:17.398 1839-1839 layz4android com.example.myapplication E 另一个观察者开始收集数据:13
2023-10-14 16:09:17.398 1839-1839 layz4android com.example.myapplication E 发射流数据 14
2023-10-14 16:09:17.398 1839-1839 layz4android com.example.myapplication E 另一个观察者开始收集数据:14
2023-10-14 16:09:17.398 1839-1839 layz4android com.example.myapplication E 发射流数据 15
2023-10-14 16:09:17.398 1839-1839 layz4android com.example.myapplication E 另一个观察者开始收集数据:15
2023-10-14 16:09:17.398 1839-1839 layz4android com.example.myapplication E 发射流数据 16
2023-10-14 16:09:17.398 1839-1839 layz4android com.example.myapplication E 另一个观察者开始收集数据:16
如果我们想要第二个收集器也能取到之前发送的数据,那么我们就需要使用到shareIn操作符。
val flow: Flow<Int> = flow<Int> {
while (count < 20) {
Log.e(TAG, "发射流数据 $count")
//发送数据
emit(count)
count++
}
}.shareIn(viewModelScope, SharingStarted.WhileSubscribed(), 20)
通过上面这种方式,就可以将flow转换为一个热流SharedFlow,shareIn操作符有3个参数,我们一一介绍一下。
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> {
val config = configureSharing(replay)
val shared = MutableSharedFlow<T>(
replay = replay,
extraBufferCapacity = config.extraBufferCapacity,
onBufferOverflow = config.onBufferOverflow
)
@Suppress("UNCHECKED_CAST")
val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
return ReadonlySharedFlow(shared, job)
}
scope:协程作用域,一般代指流的生命周期,一般使用viewModelScope或者lifecycleScope,跟随ViewModel或者Activity的生命周期,在此作用域下会一直处于活跃状态。
started:这里可以根据业务场景定制一些策略,目前主要有3种,
/**
* Sharing is started immediately and never stops.
*/
public val Eagerly: SharingStarted = StartedEagerly()
/**
* Sharing is started when the first subscriber appears and never stops.
*/
public val Lazily: SharingStarted = StartedLazily()
/**
* Sharing is started when the first subscriber appears, immediately stops when the last
* subscriber disappears (by default), keeping the replay cache forever (by default).
*/
public fun WhileSubscribed(
//这个参数,可以在超过 stopTimeoutMillis 时间没有订阅,才消失。
stopTimeoutMillis: Long = 0,
replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted =
StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
【Eagerly】:这个策略在流创建之后就会立刻启动,而且永远不会停止,典型的热流;
【Lazily】:这个策略会在第一个订阅者collect的时候启动,而且永远不会停止;
【WhileSubscribed】:这个策略会在第一个订阅者collect的时候启动,最后一个订阅者订阅结束之后停止,而且会将缓存一直存在内存当中。
目前看WhileSubscribed这个策略是比较合适的,而且性能最好。
replay:保存最后发射的replay个数据在内存中,当其他订阅者订阅时,则会给他们推送这些数据。
所以回到刚才那个场景,当第二个订阅者订阅时,想要获取之前发送的全部数据,那么就可以将replay设置为数据集合总数。
2023-10-14 16:48:52.989 9838-9838 layz4android com.example.myapplication E 发射流数据 0
2023-10-14 16:48:52.990 9838-9838 layz4android com.example.myapplication E 发射流数据 1
2023-10-14 16:48:52.990 9838-9838 layz4android com.example.myapplication E 发射流数据 2
2023-10-14 16:48:52.990 9838-9838 layz4android com.example.myapplication E 发射流数据 3
......
2023-10-14 16:48:52.991 9838-9838 layz4android com.example.myapplication E 发射流数据 17
2023-10-14 16:48:52.991 9838-9838 layz4android com.example.myapplication E 发射流数据 18
2023-10-14 16:48:52.991 9838-9838 layz4android com.example.myapplication E 发射流数据 19
2023-10-14 16:48:52.991 9838-9838 layz4android com.example.myapplication E collect:0
2023-10-14 16:48:52.991 9838-9838 layz4android com.example.myapplication E collect:1
2023-10-14 16:48:52.991 9838-9838 layz4android com.example.myapplication E collect:2
2023-10-14 16:48:52.991 9838-9838 layz4android com.example.myapplication E collect:3
......
2023-10-14 16:48:52.991 9838-9838 layz4android com.example.myapplication E collect:10
2023-10-14 16:48:52.991 9838-9838 layz4android com.example.myapplication E collect:11
2023-10-14 16:48:52.992 9838-9838 layz4android com.example.myapplication E 另一个观察者开始收集数据:0
2023-10-14 16:48:52.992 9838-9838 layz4android com.example.myapplication E 另一个观察者开始收集数据:1
2023-10-14 16:48:52.992 9838-9838 layz4android com.example.myapplication E 另一个观察者开始收集数据:2
2023-10-14 16:48:52.992 9838-9838 layz4android com.example.myapplication E 另一个观察者开始收集数据:3
2023-10-14 16:48:52.992 9838-9838 layz4android com.example.myapplication E 另一个观察者开始收集数据:4
......
2023-10-14 16:48:52.992 9838-9838 layz4android com.example.myapplication E 另一个观察者开始收集数据:18
2023-10-14 16:48:52.992 9838-9838 layz4android com.example.myapplication E 另一个观察者开始收集数据:19
从日志中我们可以看到,当第一个订阅者开始collect的时候,开始启动数据的发送。当第一个订阅者完成之后,第二个订阅者再次collect的时候,其实拿到的是缓存中的数据,这时就不再需要再执行数据提供方的代码,这也是性能的一个提升。
1.3 流的异常捕获
Flow执行中如果发生异常,那么可以采用catch操作符捕获异常,此时发生异常collect将不会继续工作,但是在catch代码块中,可以通过emit发送数据,通知上游发生了异常。
val flow: Flow<Int> = flow<Int> {
while (count < 20) {
Log.e(TAG, "发射流数据 $count")
if (count > 5) {
throw NullPointerException()
}
//发送数据
emit(count)
count++
}
}.catch {
emit(-1)
}.shareIn(viewModelScope, SharingStarted.WhileSubscribed(), 20)
通过日志可以看到,在发生异常之后,最后一次接收到的数据,就是-1.
2023-10-14 17:01:51.433 12504-12504 layz4android com.example.myapplication E 发射流数据 0
2023-10-14 17:01:51.437 12504-12504 layz4android com.example.myapplication E 发射流数据 1
2023-10-14 17:01:51.437 12504-12504 layz4android com.example.myapplication E 发射流数据 2
2023-10-14 17:01:51.438 12504-12504 layz4android com.example.myapplication E 发射流数据 3
2023-10-14 17:01:51.438 12504-12504 layz4android com.example.myapplication E 发射流数据 4
2023-10-14 17:01:51.438 12504-12504 layz4android com.example.myapplication E 发射流数据 5
2023-10-14 17:01:51.439 12504-12504 layz4android com.example.myapplication E 发射流数据 6
2023-10-14 17:01:51.440 12504-12504 layz4android com.example.myapplication E collect:0
2023-10-14 17:01:51.440 12504-12504 layz4android com.example.myapplication E collect:1
2023-10-14 17:01:51.441 12504-12504 layz4android com.example.myapplication E collect:2
2023-10-14 17:01:51.441 12504-12504 layz4android com.example.myapplication E collect:3
2023-10-14 17:01:51.441 12504-12504 layz4android com.example.myapplication E collect:4
2023-10-14 17:01:51.441 12504-12504 layz4android com.example.myapplication E collect:5
2023-10-14 17:01:51.441 12504-12504 layz4android com.example.myapplication E collect:-1
2023-10-14 17:01:51.442 12504-12504 layz4android com.example.myapplication E 另一个观察者开始收集数据:0
2023-10-14 17:01:51.442 12504-12504 layz4android com.example.myapplication E 另一个观察者开始收集数据:1
2023-10-14 17:01:51.442 12504-12504 layz4android com.example.myapplication E 另一个观察者开始收集数据:2
2023-10-14 17:01:51.442 12504-12504 layz4android com.example.myapplication E 另一个观察者开始收集数据:3
2023-10-14 17:01:51.443 12504-12504 layz4android com.example.myapplication E 另一个观察者开始收集数据:4
2023-10-14 17:01:51.443 12504-12504 layz4android com.example.myapplication E 另一个观察者开始收集数据:5
2023-10-14 17:01:51.443 12504-12504 layz4android com.example.myapplication E 另一个观察者开始收集数据:-1
2 StateFlow和SharedFlow
前面我在介绍shareIn运算符的时候简单提过,通过shareIn和stateIn分别生成的是StateFlow和SharedFlow,那么两者有什么区别呢,接下来重点介绍一下。
2.1 Stateflow
通过名字其实就能看出来与状态相关的Flow,与冷流不同的是,Stateflow是热流:从数据流收集数据不会触发任何提供方代码,也就是说流创建完成之后,即便多次调用末端操作符collect,也不会再次创建,其实从1.2小节中就可以看出来,如果你想要使用Stateflow,那么最常用的就是MutableStateFlow.
private val _stateFlow: MutableStateFlow<CountUiState> =
MutableStateFlow(CountUiState.IdleState)
val stateFlow: StateFlow<CountUiState> = _stateFlow
fun startCount() {
while (count < 20) {
if (count == 0) {
_stateFlow.value = CountUiState.FirstCountState
}
if (count == 19) {
_stateFlow.value = CountUiState.EndCountState
}
}
}
上游调用方式如下:
lifecycleScope.launch {
viewModel.stateFlow.collect { state ->
when (state) {
is CountUiState.IdleState -> {
Log.d(TAG, "startCount: idle")
}
is CountUiState.FirstCountState -> {
Log.d(TAG, "startCount: first")
}
is CountUiState.EndCountState -> {
Log.d(TAG, "startCount: end")
}
}
}
}
viewModel.startCount()
2023-10-14 18:06:25.419 24931-24931 layz4android com.example.myapplication D startCount: idle
2023-10-14 18:06:25.419 24931-24931 layz4android com.example.myapplication D startCount: first
2023-10-14 18:06:25.419 24931-24931 layz4android com.example.myapplication D startCount: end
从日志当中看,三种状态都有序地打印了出来,这只是我们模拟场景,实际的场景中可能会存在网络请求,需要更新UI,那么需要切记一点,不能只通过launch开启协程收集数据,需要配合repeatOnLifecycle API使用,否则可能会导致崩溃。
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.stateFlow.collect { state ->
when (state) {
is CountUiState.IdleState -> {
Log.d(TAG, "startCount: idle")
}
is CountUiState.FirstCountState -> {
Log.d(TAG, "startCount: first")
}
is CountUiState.EndCountState -> {
Log.d(TAG, "startCount: end")
}
}
}
}
}
当我们设置生命周期为Lifecycle.State.STARTED时,意味着将会从onStart开始收集数据,当生命周期进入到onStop后停止收集数据,如果不设置repeatOnLifecycle,当页面退到后台之后会继续收集数据,当采集到新的状态更新UI时,可能存在View不可见或者不存在的场景,导致CRASH。
fun startCount() {
viewModelScope.launch {
while (count < 20) {
if (count == 0) {
_stateFlow.value = CountUiState.FirstCountState
}
delay(1_000)
if (count == 19) {
_stateFlow.value = CountUiState.EndCountState
}
count++
}
}
}
我们模拟一下网络请求的场景,中间加了一些延时,正常前台场景下,20s后收到了end的状态回调。
2023-10-14 19:12:57.606 6816-6816 layz4android com.example.myapplication D startCount: first
2023-10-14 19:13:17.625 6816-6816 layz4android com.example.myapplication D startCount: end
当开始计时,然后退到后台时,我们发现在20s后没有回调end状态,当页面回到前台之后,收到了end状态的回调,这个就是repeatOnLifecycle函数的作用。
2023-10-14 19:13:25.410 7076-7076 layz4android com.example.myapplication D startCount: first
2023-10-14 19:15:00.078 7076-7076 layz4android com.example.myapplication D startCount: end
所以从这里看,Stateflow和LiveData还是有些相似的地方的,都可以实时监听数据的变化,通过数据驱动UI,典型的MVVM思想,但是两者还是有些不一样的点。
(1)LiveData在初始化时,不需要初始值;而Stateflow则是需要给一个初始状态;
(2)当View退到后台时,LiveData会取消observe不再监听数据变化,而Stateflow不做处理,退到后台还是会监听数据变化的,需要配合repeatOnLifecycle达到LiveData的效果;
(3)LiveData适合UI状态不是频繁变化的场景,因为LiveData设计的特性(回调value的最新值),频繁的状态变化会导致LiveData丢失部分状态;而Stateflow流式状态不会丢失状态,适合UI变化频繁的场景。
2.2 SharedFlow
在之前1.2节我们介绍流数据共享的时候,通过shareIn转换拿到的就是SharedFlow,其主要作用就是多个接收方可以共享一个流数据源,而我们不需要通过shareIn就可以创建一个SharedFlow,常用的就是MutableSharedFlow。
private val _sharedFlow: MutableSharedFlow<CountUiState> = MutableSharedFlow(3)
val sharedFlow: SharedFlow<CountUiState> = _sharedFlow
fun startCount() {
viewModelScope.launch {
_sharedFlow.tryEmit(CountUiState.IdleState)
while (count < 20) {
if (count == 0) {
// _stateFlow.value = CountUiState.FirstCountState
_sharedFlow.tryEmit(CountUiState.FirstCountState)
}
// delay(1_000)
if (count == 19) {
// _stateFlow.value = CountUiState.EndCountState
_sharedFlow.tryEmit(CountUiState.EndCountState)
}
count++
}
}
}
具体使用方式就不看了,跟StateFlow不一样的是,MutableSharedFlow不需要一个初始状态,我们看下构造方法,其中第一个参数replay,我们之前介绍过,对于新的订阅者来说,可以从缓存区拿到之前发送的数据。
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
require(replay >= 0) { "replay cannot be negative, but was $replay" }
require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
"replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
}
val bufferCapacity0 = replay + extraBufferCapacity
val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
那么这个值的作用体现在哪里呢?如果使用Stateflow,新订阅者可能拿不到之前发送的数据,例如我们把2.1小节中,上游的调用方式改变一下顺序,先执行startCount,那么Stateflow只会拿到一个end状态,而使用SharedFlow,可以设置replay的个数,针对新订阅者重新发送多个之前已发出的值。
当然缓存区的大小是有限制的,由replay的值决定,当缓存区满了之后,对于缓存数据的处理,由onBufferOverflow这个参数决定,默认是BufferOverflow.SUSPEND,那么调用方会被挂起;
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
*/
SUSPEND,
/**
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
*/
DROP_OLDEST,
/**
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
*/
DROP_LATEST
}
当然我们还是希望调用方能够拿到最新的状态,因此还有两种策略为DROP_OLDEST和DROP_LATEST,用于将新旧数据移除,添加新的数据进来,此时调用方会收到新的数据,而不会被挂起。
所以StateFlow和SharedFlow两者最大的区别在于,SharedFlow能够针对新的订阅者发送之前已经发出的值,具体多少由业务侧决定,但是这些数据都是在缓存中,存的数据量大也会影响性能,因此使用上需要注意。