【Koltin Flow(五)】SharedFlow及State

2022-08-05  本文已影响0人  MakerGaoGao

目录

【Koltin Flow(一)】五种创建flow的方式
【Koltin Flow(二)】Flow操作符之末端操作符
【Koltin Flow(三)】Flow操作符之中间操作符(一)
【Koltin Flow(三)】Flow操作符之中间操作符(二)
【Koltin Flow(三)】Flow操作符之中间操作符(三)
【Koltin Flow(四)】Flow背压
【Koltin Flow(五)】SharedFlow及StateFlow

SharedFlow

简介

相对于Flow而言,SharedFlow为热流,也就是说不管有无接收者,都会发送值。

一、基本使用

代码如下:
            val sharedFlow = MutableSharedFlow<Int>()
            launch {
                sharedFlow.collect {
                    Log.d(TAG.TAG,"SharedFlow $it")
                }
            }

            delay(10)
            sharedFlow.emit(1)
            sharedFlow.emit(100)
            sharedFlow.emit(100)
日志如下:
2022-08-03 10:16:53.366 4955-4981/edu.test.demo D/Test-TAG: SharedFlow 1
2022-08-03 10:16:53.366 4955-4981/edu.test.demo D/Test-TAG: SharedFlow 100
2022-08-03 10:16:53.367 4955-4981/edu.test.demo D/Test-TAG: SharedFlow 100
分析

二、设置订阅重发

我们看SharedFlow构造的第一个参数replay,此参数用来设置订阅重发的个数。
代码如下:
            val sharedFlow = MutableSharedFlow<Int>(
                replay = 2
            )

            sharedFlow.emit(1)
            sharedFlow.emit(100)
            sharedFlow.emit(100)
            //注释1  在此处加上重发缓存打印
            //Log.d(TAG.TAG,"SharedFlow ${sharedFlow.replayCache}")
            delay(100)

            launch {
                sharedFlow.collect {
                    Log.d(TAG.TAG,"SharedFlow $it")
                }
            }
             //注释2 重置缓存
            //delay(100)
            //sharedFlow.resetReplayCache()
            //Log.d(TAG.TAG,"SharedFlow ${sharedFlow.replayCache}")
            //launch {
              //  sharedFlow.collect {
                //    Log.d(TAG.TAG,"SharedFlow 2 $it")
                //}
            //}
日志如下:
//注释1 处的打印
//2022-08-03 10:23:13.621 5106-5131/edu.test.demo D/Test-TAG: SharedFlow [100, 100]
2022-08-03 10:21:14.906 5043-5070/edu.test.demo D/Test-TAG: SharedFlow 100
2022-08-03 10:21:14.906 5043-5070/edu.test.demo D/Test-TAG: SharedFlow 100
//注释2下的打印
//2022-08-03 10:26:54.917 5233-5260/edu.test.demo D/Test-TAG: SharedFlow []
分析:

三、其他两个参数,可参考背压部分的内容

四、shareIn操作符

shareIn操作符是将冷流flow转换为热流SharedFlow,主要参数有三个
1、第一个为作用域。
2、策略,分为三种Eagerly(立即发送)、Lazily(有第一个订阅者之后发送)、
WhileSubscribed()(在第一个订阅者出现之后开始、在最后一个订阅者、消失后结束),可配置二外的参数:
stopTimeoutMillis 为最后一个订阅者小时候保留的时长,单位ms,默认为0。
replayExpirationMillis 为最后一个订阅者消失后,缓存保留的时长,单位ms,默认为Long.MAX_VALUE。
3、缓存重发的个数。
第一种策略
代码如下:
     val sharedFlow = (1..5).asFlow().shareIn(this, SharingStarted.Eagerly,0)
            delay(100)
            sharedFlow.collect {
                Log.d(TAG.TAG,"shareIn $it")
            }
日志没有
分析:
第二种策略
代码如下:
    val sharedFlow = (1..5).asFlow().shareIn(this, SharingStarted.Lazily, 2)
            delay(100)
            launch {
                sharedFlow.collect {
                    Log.d(TAG.TAG, "shareIn $it")
                }
            }
            delay(100)
            launch {
                sharedFlow.collect {
                    Log.d(TAG.TAG, "shareIn 2 $it")
                }
            }
日志如下:
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 1
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 2
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 3
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 4
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 5
2022-08-03 11:09:10.205 6691-6720/edu.test.demo D/Test-TAG: shareIn 2 4
2022-08-03 11:09:10.205 6691-6720/edu.test.demo D/Test-TAG: shareIn 2 5
分析:
第三种策略
1. 采用默认参数
代码如下:
  var time  = 0L
            time = System.currentTimeMillis()
            val sharedFlow = (1..100).asFlow().onStart {
                Log.d(TAG.TAG,"onStart ${System.currentTimeMillis() - time}")
            }.onCompletion {
                Log.d(TAG.TAG,"onCompletion ${System.currentTimeMillis() - time}")
            }.onEach {
                delay(1000)
            }.shareIn(this, SharingStarted.WhileSubscribed(), 0)

            delay(1000)
            launch {
                Log.d(TAG.TAG, "1 shareIn ${sharedFlow.first()}")
                Log.d(TAG.TAG,"1 接收到第一个值 ${System.currentTimeMillis() - time}")
            }

            delay(3000)
            launch {
                Log.d(TAG.TAG, "2 shareIn ${sharedFlow.first()}")
                Log.d(TAG.TAG,"2 接收到第一个值 ${System.currentTimeMillis() - time}")
            }
日志如下:
2022-08-03 14:00:32.842 8905-8930/edu.test.demo D/Test-TAG: onStart 1029
2022-08-03 14:00:33.844 8905-8930/edu.test.demo D/Test-TAG: 1 shareIn 1
2022-08-03 14:00:33.844 8905-8930/edu.test.demo D/Test-TAG: 1 接收到第一个值 2031
2022-08-03 14:00:33.845 8905-8931/edu.test.demo D/Test-TAG: onCompletion 2032
2022-08-03 14:00:35.839 8905-8931/edu.test.demo D/Test-TAG: onStart 4026
2022-08-03 14:00:36.841 8905-8931/edu.test.demo D/Test-TAG: 2 shareIn 1
2022-08-03 14:00:36.841 8905-8931/edu.test.demo D/Test-TAG: 2 接收到第一个值 5028
2022-08-03 14:00:36.841 8905-8930/edu.test.demo D/Test-TAG: onCompletion 5028
分析:
2.进行相关的参数配置
代码如下:
             var time  = 0L
            time = System.currentTimeMillis()
            val sharedFlow = (1..100).asFlow().onStart {
                Log.d(TAG.TAG,"onStart ${System.currentTimeMillis() - time}")
            }.onCompletion {
                Log.d(TAG.TAG,"onCompletion ${System.currentTimeMillis() - time}")
            }.onEach {
                delay(1000)
            }.shareIn(this, SharingStarted.WhileSubscribed(
                stopTimeoutMillis = 500,
                replayExpirationMillis = 2000
            ), replay = 5)

            delay(1*1000)
            launch {
                Log.d(TAG.TAG, "1 shareIn ${sharedFlow.take(5).toList()}")
                Log.d(TAG.TAG,"1 接收到值 ${System.currentTimeMillis() - time}")
            }

            delay(10*1000)
            launch {
                Log.d(TAG.TAG, "2 shareIn ${sharedFlow.take(10).toList()}")
                Log.d(TAG.TAG,"2 接收到值 ${System.currentTimeMillis() - time}")
            }
日志如下( stopTimeoutMillis = 500,replayExpirationMillis = 2000,replay = 5):
2022-08-03 14:21:15.245 9591-9616/edu.test.demo D/Test-TAG: onStart 1030
2022-08-03 14:21:20.252 9591-9616/edu.test.demo D/Test-TAG: 1 shareIn [1, 2, 3, 4, 5]
2022-08-03 14:21:20.252 9591-9616/edu.test.demo D/Test-TAG: 1 接收到值 6037
2022-08-03 14:21:20.753 9591-9616/edu.test.demo D/Test-TAG: onCompletion 6538
2022-08-03 14:21:25.243 9591-9622/edu.test.demo D/Test-TAG: onStart 11028
2022-08-03 14:21:35.253 9591-9622/edu.test.demo D/Test-TAG: 2 shareIn [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
2022-08-03 14:21:35.253 9591-9622/edu.test.demo D/Test-TAG: 2 接收到值 21038
2022-08-03 14:21:35.755 9591-9617/edu.test.demo D/Test-TAG: onCompletion 21540
分析:
日志如下( stopTimeoutMillis = 500,replayExpirationMillis = Long.MAX_VALUE,replay = 5):
2022-08-03 14:47:30.144 9697-9724/edu.test.demo D/Test-TAG: onStart 1030
2022-08-03 14:47:35.154 9697-9723/edu.test.demo D/Test-TAG: 1 shareIn [1, 2, 3, 4, 5]
2022-08-03 14:47:35.154 9697-9723/edu.test.demo D/Test-TAG: 1 接收到值 6040
2022-08-03 14:47:35.655 9697-9723/edu.test.demo D/Test-TAG: onCompletion 6541
2022-08-03 14:47:40.141 9697-9726/edu.test.demo D/Test-TAG: onStart 11027
2022-08-03 14:47:45.149 9697-9727/edu.test.demo D/Test-TAG: 2 shareIn [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
2022-08-03 14:47:45.149 9697-9727/edu.test.demo D/Test-TAG: 2 接收到值 16035
2022-08-03 14:47:45.652 9697-9726/edu.test.demo D/Test-TAG: onCompletion 16538
分析:

StateFlow

简介

和SharedFlow一样,StateFlow也是热流,但是区别在于状态的保存,保存了最新的值,也就是新的接收者会收到最新的值,
和设置了replay = 1的SharedFlow比较类似。

简单使用

代码如下:
            val stateFlow = MutableStateFlow(0)
            launch {
                stateFlow.collect{
                    Log.d(TAG.TAG,"stateFlow 1 collect $it")
                }
            }
            delay(1000)
            stateFlow.value = 10
            delay(1000)
            stateFlow.value = 10
            delay(1000)
            stateFlow.value = 11
            launch {
                stateFlow.collect{
                    Log.d(TAG.TAG,"stateFlow 2 collect $it")
                }
            }

日志如下:
2022-08-03 15:43:14.618 11669-11705/edu.test.demo D/Test-TAG: stateFlow 1 collect 0
2022-08-03 15:43:15.623 11669-11704/edu.test.demo D/Test-TAG: stateFlow 1 collect 10
2022-08-03 15:43:17.626 11669-11705/edu.test.demo D/Test-TAG: stateFlow 1 collect 11
2022-08-03 15:43:17.626 11669-11705/edu.test.demo D/Test-TAG: stateFlow 2 collect 11
分析:

stateIn操作符

stateIn操作符是将冷流flow转换为热流StateFlow,主要参数有三个
1、第一个为作用域。
2、策略,分为三种Eagerly(立即发送)、Lazily(有第一个订阅者之后发送)、
WhileSubscribed()(在第一个订阅者出现之后开始、在最后一个订阅者、消失后结束),可配置二外的参数:
stopTimeoutMillis 为最后一个订阅者小时候保留的时长,单位ms,默认为0。
replayExpirationMillis 为最后一个订阅者消失后,缓存保留的时长,单位ms,默认为Long.MAX_VALUE。
3、StateFlow的初始值。
策略基本和shareIn是一致的,只是replay固定为1,另外会有一个初始值。
分析一种,其他的和shareIn类比即可,策略为WhileSubscribed()。
代码如下:
             var time = 0L
            time = System.currentTimeMillis()
            val stateFlow = (1..10).asFlow().onStart {
                Log.d(TAG.TAG,"onStart ${System.currentTimeMillis() - time}")
            }.onCompletion {
                Log.d(TAG.TAG,"onCompletion ${System.currentTimeMillis() - time}")
            }.onEach {
                delay(1000)
            }.stateIn(this, SharingStarted.WhileSubscribed(),0)

            launch {
                Log.d(TAG.TAG, "stateFlow 1 collect ${stateFlow.take(5).toList()}")
                Log.d(TAG.TAG,"接收到值 ${System.currentTimeMillis() - time}")
            }
            delay(10*1000)
            launch {
                Log.d(TAG.TAG, "stateFlow 2 collect ${stateFlow.take(5).toList()}")
                Log.d(TAG.TAG,"接收到值 ${System.currentTimeMillis() - time}")
            }
日志如下:
2022-08-03 15:33:35.204 11353-11379/edu.test.demo D/Test-TAG: onStart 70
2022-08-03 15:33:39.219 11353-11380/edu.test.demo D/Test-TAG: stateFlow 1 collect [0, 1, 2, 3, 4]
2022-08-03 15:33:39.219 11353-11380/edu.test.demo D/Test-TAG: 接收到值 4101
2022-08-03 15:33:39.221 11353-11378/edu.test.demo D/Test-TAG: onCompletion 4102
2022-08-03 15:33:45.145 11353-11378/edu.test.demo D/Test-TAG: onStart 10027
2022-08-03 15:33:49.151 11353-11378/edu.test.demo D/Test-TAG: stateFlow 2 collect [4, 1, 2, 3, 4]
2022-08-03 15:33:49.151 11353-11378/edu.test.demo D/Test-TAG: 接收到值 14033
2022-08-03 15:33:49.151 11353-11380/edu.test.demo D/Test-TAG: onCompletion 14033
分析:

总结

上一篇下一篇

猜你喜欢

热点阅读