Android Kotlin

Kotlin中 Flow、SharedFlow与StateFlo

2022-09-15  本文已影响0人  我爱田Hebe

一、简介

了解过协程Flow 的同学知道是典型的冷数据流,而SharedFlowStateFlow则是热数据流。

解释:LiveData新的订阅者不会接收到之前发送的事件,只会收到之前发送的最后一条数据,这个特性和SharedFlow的参数replay设置为1相似

二、使用分析

最好的分析是从使用时入手冷流flow热流SharedFlow和StateFlow热流的具体的实现类分别是MutableSharedFlow和MutableStateFlow

用一个简单的例子来说明什么是冷流,什么是热流。

private fun testFlow() {
    val flow = flow<Int> {
        (1..5).forEach {
            delay(1000)
            emit(it)
        }
    }
    mBind.btCollect.setOnClickListener {
        lifecycleScope.launch {
            flow.collect {
                Log.d(TAG, "testFlow 第一个收集器: 我是冷流:$it")
            }
        }
        lifecycleScope.launch {
            delay(5000)
            flow.collect {
                Log.d(TAG, "testFlow:第二个收集器 我是冷流:$it")
            }
        }
    }

}

我点击收集按钮响应事件后,打印结果如下图:

这就是冷流,需要去触发收集,才能接收到结果。

从上图时间可知flow每次重新订阅收集都会将所有事件重新发送一次

private fun testSharedFlow() {

    val sharedFlow = MutableSharedFlow<Int>(
        replay = 0,//相当于粘性数据
        extraBufferCapacity = 0,//接受的慢时候,发送的入栈
        onBufferOverflow = BufferOverflow.SUSPEND
    )
    lifecycleScope.launch {
        launch {

            sharedFlow.collect {
                println("collect1 received ago shared flow $it")

            }
        }
        launch {
            (1..5).forEach {
                println("emit1  send ago  flow $it")
                sharedFlow.emit(it)
                println("emit1 send after flow $it")
            }
        }
        // wait a 100
        delay(100)
        launch {
            sharedFlow.collect {
                println("collect2 received shared flow $it")
            }
        }
    }

}

第二个流收集被延迟,晚了100毫秒后就收不到了,想当于不管是否订阅,流都会发送,只管发,而collect1能够收集到是因为他在发送之前进行了订阅收集。

三、分析MutableSharedFlow中参数的具体含义

以上面testSharedFlow()方法中对象为例,上面的配置就是,当前对象的默认配置 源码如下图:

val sharedFlow = MutableSharedFlow<Int>(
    replay = 0,//相当于粘性数据
    extraBufferCapacity = 0,//接受的慢时候,发送的入栈
    onBufferOverflow = BufferOverflow.SUSPEND //产生背压现象后的,执行策略
)

3.1、 reply:事件粘滞数

reply:事件粘滞数以testSharedFlow方法为例如果设置了数目的话,那么其他订阅者不管什么时候订阅都能够收到replay数目的最新的事件,reply=1的话有点类似Android中使用的livedata。

eg:和testSharedFlow方法区别在于 replay = 2

private fun testSharedFlowReplay() {

    val sharedFlow = MutableSharedFlow<Int>(
        replay = 2,//相当于粘性数据
        extraBufferCapacity = 0,//接受的慢时候,发送的入栈
        onBufferOverflow = BufferOverflow.SUSPEND
    )
    lifecycleScope.launch {
        launch {

            sharedFlow.collect {
                println("collect1 received ago shared flow $it")

            }
        }
        launch {
            (1..5).forEach {
                println("emit1  send ago  flow $it")
                sharedFlow.emit(it)
                println("emit1 send after flow $it")
            }
        }
        // wait a minute
        delay(100)
        launch {
            sharedFlow.collect {
                println("collect2 received shared flow $it")
            }
        }
    }

}

按照上面的解释collect2会收集到最新的4,5两个事件如下图:

3.2 extraBufferCapacity:缓存容量

extraBufferCapacity:缓存容量,就是先发送几个事件,不管已经订阅的消费者是否接收,这种只管发不管消费者消费能力的情况就会出现背压,参数onBufferOverflow就是用于处理背压问题

eg:和testSharedFlow方法区别在于 extraBufferCapacity = 2

private fun testSharedFlowCapacity() {

    val sharedFlow = MutableSharedFlow<Int>(
        replay = 0,//相当于粘性数据
        extraBufferCapacity = 2,//接受的慢时候,发送的入栈
        onBufferOverflow = BufferOverflow.SUSPEND
    )
    lifecycleScope.launch {
        launch {

            sharedFlow.collect {
                println("collect1 received ago shared flow $it")

            }
        }
        launch {
            (1..5).forEach {
                println("emit1  send ago  flow $it")
                sharedFlow.emit(it)
                println("emit1 send after flow $it")
            }
        }
        // wait a minute
        delay(100)
        launch {
            sharedFlow.collect {
                println("collect2 received shared flow $it")
            }
        }
    }

}

结果如下图:

优先发送将其缓存起来,testSharedFlow测试中发送与接收在没有干扰(延时之类的干扰)的情况下 是一条顺序链,而设置了extraBufferCapacity优先发送两条,不管消费情况,不设置的话(extraBufferCapacity = 0)这时如果在collect1里面设置延时delay(100),send会被阻塞(因为默认是 onBufferOverflow = BufferOverflow.SUSPEND的策略)

3.3、onBufferOverflow

onBufferOverflow:由背压就有处理策略,sharedflow默认为BufferOverflow.SUSPEND ,也即是如果当事件数量超过缓存,发送就会被挂起,上面提到了一句,DROP_OLDEST销毁最旧的值,DROP_LATEST销毁最新的值

三种参数含义

public enum class BufferOverflow {
    /**
     * 在缓冲区溢出时挂起。
     */
    SUSPEND,

    /**
     * 在缓冲区溢出时删除** *旧的**值,添加新的值到缓冲区,不挂起。
     */
    DROP_OLDEST,

    /**
     * 在缓冲区溢出时,删除当前添加到缓冲区的最新的**值\
*(使缓冲区内容保持不变),不要挂起。
     */
    DROP_LATEST
}

eg:和testSharedFlowCapacity方法区别在于 多了个delay(100)

private fun testSharedFlow2() {

    val sharedFlow = MutableSharedFlow<Int>(
        replay = 0,//相当于粘性数据
        extraBufferCapacity = 2,//接受的慢时候,发送的入栈
        onBufferOverflow = BufferOverflow.SUSPEND
    )
    lifecycleScope.launch {
        launch {

            sharedFlow.collect {
                println("collect1 received ago shared flow $it")
                delay(100)
            }
        }
        launch {
            (1..5).forEach {
                println("emit1  send ago  flow $it")
                sharedFlow.emit(it)
                println("emit1 send after flow $it")
            }
        }
        // wait a minute
        delay(100)
        launch {
            sharedFlow.collect {
                println("collect2 received shared flow $it")
            }
        }
    }

}

SUSPEND情况下从第一张图知道collect1都收集了,第二张图发现collect2也打印了两次,为什么只有两次呢?

因为 extraBufferCapacity = 2,等于2,错过了两次的事件发送的接收,不信的话可以试一下extraBufferCapacity = 0,这时候肯定打印了4次,可能有人问为什么是4次呢,因为collect2的订阅者延时了100毫秒才开始订阅,

private fun testSharedFlow2() {

    val sharedFlow = MutableSharedFlow<Int>(
        replay = 0,//相当于粘性数据
        extraBufferCapacity = 2,//接受的慢时候,发送的入栈
        onBufferOverflow = BufferOverflow.DROP_LATEST

    )
    lifecycleScope.launch {
        launch {

            sharedFlow.collect {
                println("collect1 received ago shared flow $it")
                delay(100)
            }
        }
        launch {
            (1..5).forEach {
                println("emit1  send ago  flow $it")
                sharedFlow.emit(it)
                println("emit1 send after flow $it")
            }
        }
        // wait a minute
        delay(100)
        launch {
            sharedFlow.collect {
                println("collect2 received shared flow $it")
            }
        }
    }

}

发送过快的话,销毁最新的,只保留最老的两条事件,我们可以知道1,2,肯定保留其他丢失

要想不丢是怎么办呢,很简单不要产生背压现象就行,在emit中延时delay(200),比收集耗时长就行。

四、StateFlow

初始化

val stateFlow = MutableStateFlow<Int>(value = -1)

由上图的继承关系可知stateFlow其实就是一种特殊的SharedFlow,它多了个初始值value

由上图可知:每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值。

SharedFlow和StateFlow的侧重点

eg测试如下中间值丢失:

    private fun testSharedFlow2() {
        val stateFlow = MutableStateFlow<Int>(value = -1)

        lifecycleScope.launch {
            launch {

                stateFlow.collect {
                    println("collect1 received ago shared flow $it")
                }
            }
            launch {
                (1..5).forEach {
                    println("emit1  send ago  flow $it")
                    stateFlow.emit(it)
                    println("emit1 send after flow $it")
                }
            }
            // wait a minute
            delay(100)
            launch {
                stateFlow.collect {
                    println("collect2 received shared flow $it")
                }
            }
        }

    }

由下图可知,中间值丢失,collect2结果可知永远有状态

好了到这里文章就结束了,源码分析后续再写。

作者:五问
链接:https://juejin.cn/post/7142038525997744141

上一篇下一篇

猜你喜欢

热点阅读