Flow简介

2022-12-29  本文已影响0人  Jack921

Kotlin 协程中使用挂起函数可以实现非阻塞地执行任务并将结果返回回来,但是只能返回一个计算结果。但是如果希望有多个计算结果返回回来,则可以使用 flow,flow有像Rxjava的各种操作符,实现各种功能,同时和协程一起使用,可以替代Rxjava和liveData,并且也没有像Rxjava上手这么难,所以学kotlin,flow是必须的。

flow简单使用:
flow{
    //发送者发送数值
    emit(1)   
}.collect{
    //接受者接受发送的数值
    println(it.toString())
}

看起来和Rxjava很像,但是又简单很多吧

flow的冷流与热流

我们准备3个按钮,分别对应代码如下:

//发送者代码:
var test: Flow<Int>?=null
test= flow {
    for (i in 0..4) {
    Log.e(TAG+"2", i.toString())
    emit(i)
}

//订阅者1代码:
test?.collect{
    delay(1000)
    Log.e(TAG, it.toString())
}

//订阅者2代码:
test?.collect{
    delay(1000)
    Log.e(TAG, it.toString())
}

上面三个按钮的代码都贴上去了,其中订阅者1和订阅者2代码一样,当我们只是点发送者按钮时,flow {...} flow里面的代码块是没有执行的,然后我们再点击订阅者1按钮,这时候发送者代码才开始执行,从而发送给订阅者,连续执行


image.png

当我们再点击订阅者2按钮的时候,会发现和上面的订阅者1按钮的效果一样,所以印证了一对一的关系,每个订阅者都会收到发送者完整的流程。

val mutableSharedFlow=MutableSharedFlow<Int>()
    lifecycleScope.launch {
        mutableSharedFlow.collect{
            Log.e("mutableSharedFlow1",it.toString())
        }
    }
    lifecycleScope.launch {
        (1..5).forEach {
            Log.e("mutableSharedFlow1_before",it.toString())
            mutableSharedFlow.emit(it)
            Log.e("mutableSharedFlow1_after",it.toString())
        }
    }
    lifecycleScope.launch {
        delay(1000)
        mutableSharedFlow.collect {
        Log.e("mutableSharedFlow2",it.toString())
    }
}

运行结果如下:


image.png

可以看到,第二个订阅者是没有收到发送者的数据,因为在订阅之前已经被消费了,所以收不到数据

热流的具体实现SharedFlow和StateFlow,分别对应的实现类MutableSharedFlow和是MutableStateFlow,所以我们要讲的也就是这两个类。

1. MutableSharedFlow

有缓冲区区,并可以定义缓冲区的溢出规则,可以定义给一个新的接收器发送多少数据的缓存值。
MutableSharedFlow 的参数如下:

replay:事件粘滞数

当我们把上面的MutableSharedFlow的replay设置为1是,即如下代码:

val mutableSharedFlow=MutableSharedFlow<Int>(replay = 1)
lifecycleScope.launch {
    mutableSharedFlow.collect{
        Log.e("mutableSharedFlow1",it.toString())
    }
}
lifecycleScope.launch {
    (1..5).forEach {
        mutableSharedFlow.emit(it)
    }
}
lifecycleScope.launch {
    delay(1000)
    mutableSharedFlow.collect {
        Log.e("mutableSharedFlow2",it.toString())
    }
}

运行结果如下:


image.png

可以看到,第二个订阅者收到了最后一次运行的结果5,所以replay会保留上次运行的结果,replay设置多少,他就保留最新的前多少数据。

extraBufferCapacity

缓存容量,就是先发送几个事件,不管已经订阅的消费者是否接收,都先发送先。

val mutableSharedFlow=MutableSharedFlow<Int>(extraBufferCapacity = 2)
lifecycleScope.launch {
    mutableSharedFlow.collect{
        Log.e("mutableSharedFlow1",it.toString())
    }
}
lifecycleScope.launch {
    (1..5).forEach {
        Log.e("mutableSharedFlow1_before",it.toString())
        mutableSharedFlow.emit(it)
        Log.e("mutableSharedFlow1_after",it.toString())
    }
}
lifecycleScope.launch {
    delay(1000)
    mutableSharedFlow.collect {
        Log.e("mutableSharedFlow2",it.toString())
    }
}

运行结果如下:


image.png

相对于上面,可以看到extraBufferCapacity设置2之后,头两个会先发送而不管有没有被消费完,超过第3个之后,才开始执行,执行完之后又先发送先发送两个而不管有没有被消费。

onBufferOverflow

因为有第二个参数,所以当没有被消费完的时候,这可能导致缓存容量过多,只管发不管消费者消费能力的情况就会出现背压,所以第3个参数就是出现背压的时候要怎么处理的。

分别是 SUSPEND: 挂起,DROP_OLDEST: 移除旧的值,DROP_LATEST: 移除新的值。

SUSPEND

因为默认就是SUSPEND,所以上面的MutableSharedFlow<Int>(extraBufferCapacity = 2)就是MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.SUSPEND),所以和讲extraBufferCapacity的demo是一样的。

DROP_OLDEST
移除旧的值,保留最新的,extraBufferCapacity就保留多少,代码如下:

val mutableSharedFlow=MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.DROP_OLDEST)
lifecycleScope.launch {
    mutableSharedFlow.collect{
         Log.e("mutableSharedFlow1",it.toString())
    }
}
lifecycleScope.launch {
    (1..5).forEach {
        Log.e("mutableSharedFlow1_before",it.toString())
        mutableSharedFlow.emit(it)
        Log.e("mutableSharedFlow1_after",it.toString())
    }
}
lifecycleScope.launch {
    delay(1000)
    mutableSharedFlow.collect {
        Log.e("mutableSharedFlow2",it.toString())
    }
}

运行效果如下:

image.png

可以看到运行5个,超出缓存容量,只保留最新的两个,这就实现了消费者消费速度小于生产者的时候的背压问题。

DROP_LATEST

移除新的值,保留最旧的,extraBufferCapacity就保留多少,代码如下:

val mutableSharedFlow=MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.DROP_LATEST)
    lifecycleScope.launch {
        mutableSharedFlow.collect{
            Log.e("mutableSharedFlow1",it.toString())
        }
    }
    lifecycleScope.launch {
        (1..5).forEach {
            Log.e("mutableSharedFlow1_before",it.toString())
            mutableSharedFlow.emit(it)
            Log.e("mutableSharedFlow1_after",it.toString())
        }
    }
    lifecycleScope.launch {
        delay(1000)
        mutableSharedFlow.collect {
            Log.e("mutableSharedFlow2",it.toString())
        }
    }

运行结果如下:

image.png

可以看到运行5个,超出缓存容量,只保留最旧的两个。

2.MutableStateFlow

MutableStateFlow 就是reply为1的MutableSharedFlow,同时它必须要有一个初始值,此外每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值。具体demo如下:

val stateFlow =MutableStateFlow(value = -1)
    lifecycleScope.launch {
        stateFlow.collect{
            Log.e("mutableStateFlow1",it.toString())
        }
    }
    lifecycleScope.launch {
        listOf(1,2,3,4,4).forEach {
            Log.e("mutableStateFlow_before",it.toString())
            stateFlow.emit(it)
            Log.e("mutableStateFlow_after",it.toString())
        }
    }
    lifecycleScope.launch {
        delay(1000)
        stateFlow.collect {
        Log.e("mutableStateFlow2",it.toString())
    }
}

运行结果如下:


image.png

可以看到,只要初始值和最新值,其他的值都不会,StateFlow重点在状态,只有初始值和最新值,而不会有中间值,这对于UI的状态更合适,防止重复刷新,而SharedFlow更适合事件的处理。

背压三剑客

从上面的讲解里,我们了解了MutableSharedFlow和MutableStateFlow的背压。
那冷流要怎么实现呢,其实操作符也有背压处理的。

背压说白了就是消费者的消费速度达不到生产者的创建速度时,就会产生数据的淤积。

flow {
    (1..5).forEach{
        emit(it)
    }
}.collectLatest {
    Log.e("collectLatest_start",it.toString())
    delay(1000)
    Log.e("collectLatest_end",it.toString())
 }

运行结果如下:

image.png

可以看到,会结束旧的数据执行即使在执行中,而执行最新的数据

flow {
    (1..5).forEach{
         emit(it)
    }
}.conflate()
 .collect {
    Log.e("conflate_start",it.toString())
    delay(1000)
     Log.e("conflate_end",it.toString())
 }

运行结果:

image.png

可以看到,对1数据也执行到结束才执行5,中间的数据直接过滤掉,有始有终

  1. capacity: 缓存数量
  2. onBufferOverflow: 处理缓存策略

下面一个个验证,首先看发送,demo如下:

flow {
    (1..5).forEach {
        emit(it)
    }
}.onEach {
    Log.e("buffer1","$it is ready")
}.collect {
    delay(1000)
    Log.e("buffer2","$it is handled")
}

结果如下:


image.png

可以看到,执行流程是先发一个,执行完再发下一个,事件发送和处理是连续的,假如加上buffer()呢,

flow {
    (1..5).forEach {
        emit(it)
    }
}.onEach {
    Log.e("buffer1","$it is ready")
}.buffer()
.collect {
    delay(1000)
    Log.e("buffer2","$it is handled")
}

结果如下:


image.png

可以看到加了之后不管有没有执行,都先发送,然后再一个个执行,不再受collect{}影响。

其二说白了就是设置缓存数量和处理策略,即设置capacity和onBufferOverflow,和上面的MutableSharedFlow有点像,举一个例子基本可以。

flow {
    (1..5).forEach {
        emit(it)
    }
}.onEach {
    Log.e("buffer1","$it is ready")
}
.buffer(capacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)
.collect {
    delay(1000)
    Log.e("buffer2","$it is handled")
}

结果如下:


image.png

设置缓存数量为1,保持处理最旧的事件,DROP_OLDEST(处理缓存最旧),其他的SUSPEND(挂起),DROP_OLDEST(处理缓存最新)

上一篇下一篇

猜你喜欢

热点阅读