Kotlin中 Flow、SharedFlow与StateFlo
一、简介
了解过协程Flow 的同学知道是典型的冷数据流,而SharedFlow
与StateFlow
则是热数据流。
- 冷流:只有当订阅者发起订阅时,事件的发送者才会开始发送事件。
- 热流:不管订阅者是否存在,只要发送了事件就会被消费,意思是不管接受方是否能够接收到,在这一点上有点像我们Android的
LiveData
。
解释:LiveData新的订阅者不会接收到之前发送的事件,只会收到之前发送的最后一条数据,
这个特性和SharedFlow的参数replay设置为1相似
二、使用分析
最好的分析是从使用时入手冷流flow
,热流SharedFlow和StateFlow
热流的具体的实现类分别是MutableSharedFlow和MutableStateFlow
用一个简单的例子来说明什么是冷流,什么是热流。
冷流flow:
private fun testFlow() {
val flow = flow<Int> {
(1..5).forEach {
delay(1000)
emit(it)
}
}
mBind.btCollect.setOnClickListener {
lifecycleScope.launch {
flow.collect {
Log.d(TAG, "testFlow 第一个收集器: 我是冷流:$it")
}
}
lifecycleScope.launch {
delay(5000)
flow.collect {
Log.d(TAG, "testFlow:第二个收集器 我是冷流:$it")
}
}
}
}
我点击收集按钮响应事件后,打印结果如下图:
这就是冷流
,需要去触发收集,才能接收到结果。
从上图时间可知flow每次重新订阅收集都会将所有事件重新发送一次
热流MutableSharedFlow和
private fun testSharedFlow() {
val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 0,//接受的慢时候,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND
)
lifecycleScope.launch {
launch {
sharedFlow.collect {
println("collect1 received ago shared flow $it")
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a 100
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}
}
第二个流收集被延迟,晚了100毫秒后就收不到了,想当于不管是否订阅,流都会发送,只管发,而collect1能够收集到是因为他在发送之前进行了订阅收集。
三、分析MutableSharedFlow中参数的具体含义
以上面testSharedFlow()方法中对象为例,上面的配置就是,当前对象的默认配置 源码如下图:
val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 0,//接受的慢时候,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND //产生背压现象后的,执行策略
)
3.1、 reply:事件粘滞数
reply:事件粘滞数以testSharedFlow
方法为例如果设置了数目的话,那么其他订阅者不管什么时候订阅都能够收到replay数目的最新的事件,reply=1的话
有点类似Android中使用的livedata。
eg:和testSharedFlow
方法区别在于 replay = 2
private fun testSharedFlowReplay() {
val sharedFlow = MutableSharedFlow<Int>(
replay = 2,//相当于粘性数据
extraBufferCapacity = 0,//接受的慢时候,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND
)
lifecycleScope.launch {
launch {
sharedFlow.collect {
println("collect1 received ago shared flow $it")
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}
}
按照上面的解释collect2会收集到最新的4,5两个事件如下图:
3.2 extraBufferCapacity:缓存容量
extraBufferCapacity
:缓存容量,就是先发送几个事件,不管已经订阅的消费者是否接收,这种只管发不管消费者消费能力的情况就会出现背压,参数onBufferOverflow
就是用于处理背压问题
eg:和testSharedFlow
方法区别在于 extraBufferCapacity = 2
private fun testSharedFlowCapacity() {
val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 2,//接受的慢时候,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND
)
lifecycleScope.launch {
launch {
sharedFlow.collect {
println("collect1 received ago shared flow $it")
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}
}
结果如下图:
优先发送将其缓存起来,testSharedFlow
测试中发送与接收在没有干扰(延时之类的干扰)的情况下 是一条顺序链,而设置了extraBufferCapacity
优先发送两条,不管消费情况,不设置的话(extraBufferCapacity = 0)
这时如果在collect1
里面设置延时delay(100)
,send会被阻塞(因为默认是 onBufferOverflow = BufferOverflow.SUSPEND的策略)
3.3、onBufferOverflow
onBufferOverflow:由背压就有处理策略,sharedflow默认为BufferOverflow.SUSPEND ,也即是如果当事件数量超过缓存,发送就会被挂起,上面提到了一句,DROP_OLDEST销毁最旧的值,DROP_LATEST销毁最新的值
三种参数含义
public enum class BufferOverflow {
/**
* 在缓冲区溢出时挂起。
*/
SUSPEND,
/**
* 在缓冲区溢出时删除** *旧的**值,添加新的值到缓冲区,不挂起。
*/
DROP_OLDEST,
/**
* 在缓冲区溢出时,删除当前添加到缓冲区的最新的**值\
*(使缓冲区内容保持不变),不要挂起。
*/
DROP_LATEST
}
eg:和testSharedFlowCapacity
方法区别在于 多了个delay(100)
- SUSPEND模式
private fun testSharedFlow2() {
val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 2,//接受的慢时候,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND
)
lifecycleScope.launch {
launch {
sharedFlow.collect {
println("collect1 received ago shared flow $it")
delay(100)
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}
}
SUSPEND
情况下从第一张图知道collect1
都收集了,第二张图发现collect2
也打印了两次
,为什么只有两次呢?
因为 extraBufferCapacity = 2,
等于2,错过了两次的事件发送的接收,不信的话可以试一下extraBufferCapacity = 0
,这时候肯定打印了4次
,可能有人问为什么是4次
呢,因为collect2
的订阅者延时了100毫秒
才开始订阅,
- DROP_LATEST模式
private fun testSharedFlow2() {
val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 2,//接受的慢时候,发送的入栈
onBufferOverflow = BufferOverflow.DROP_LATEST
)
lifecycleScope.launch {
launch {
sharedFlow.collect {
println("collect1 received ago shared flow $it")
delay(100)
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}
}
发送过快的话,销毁最新的,只保留最老的两条事件,我们可以知道1,2,肯定保留其他丢失
要想不丢
是怎么办呢,很简单不要产生背压现象
就行,在emit中延时delay(200)
,比收集耗时长就行。
-
DROP_OLDEST模式 该模式同理DROP_LATEST模式,
保留最新的extraBufferCapacity = 2(多少)的数据就行
。
四、StateFlow
初始化
val stateFlow = MutableStateFlow<Int>(value = -1)
由上图的继承关系可知stateFlow其实就是一种特殊的SharedFlow
,它多了个初始值value
由上图可知:每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值。
SharedFlow和StateFlow的侧重点
- StateFlow就是一个replaySize=1的sharedFlow,同时它必须有一个初始值,此外,每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值。
- StateFlow重点在状态,ui永远有状态,所以StateFlow必须有初始值,同时对ui而言,过期的状态毫无意义,所以stateFLow永远更新最新的数据(和liveData相似),所以必须有粘滞度=1的粘滞事件,让ui状态保持到最新。
另外在一个时间内发送多个事件,不会管中间事件有没有消费完成都会执行最新的一条.(中间值会丢失)
- SharedFlow侧重在事件,当某个事件触发,发送到队列之中,按照挂起或者非挂起、缓存策略等将事件发送到接受方,在具体使用时,SharedFlow更适合通知ui界面的一些事件,比如toast等,也适合作为viewModel和repository之间的桥梁用作数据的传输。
eg测试如下中间值丢失:
private fun testSharedFlow2() {
val stateFlow = MutableStateFlow<Int>(value = -1)
lifecycleScope.launch {
launch {
stateFlow.collect {
println("collect1 received ago shared flow $it")
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
stateFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
stateFlow.collect {
println("collect2 received shared flow $it")
}
}
}
}
由下图可知,中间值丢失,collect2结果
可知永远有状态
好了到这里文章就结束了,源码分析后续再写。