Kotlin Coroutines Flow 系列(二) Fl
![](https://img.haomeiwen.com/i2613397/dc9f50bdd7c8e876.jpg)
三. Flow VS Sequences
每一个 Flow 其内部是按照顺序执行的,这一点跟 Sequences 很类似。
Flow 跟 Sequences 之间的区别是 Flow 不会阻塞主线程的运行,而 Sequences 会阻塞主线程的运行。
使用 flow:
fun main() = runBlocking {
launch {
for (j in 1..5) {
delay(100)
println("I'm not blocked $j")
}
}
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect { println(it) }
println("Done")
}
执行结果:
1
I'm not blocked 1
2
I'm not blocked 2
3
I'm not blocked 3
4
I'm not blocked 4
5
Done
I'm not blocked 5
使用 sequence:
fun main() = runBlocking {
launch {
for (k in 1..5) {
delay(100)
println("I'm blocked $k")
}
}
sequence {
for (i in 1..5) {
Thread.sleep(100)
yield(i)
}
}.forEach { println(it) }
println("Done")
}
执行结果:
1
2
3
4
5
Done
I'm blocked 1
I'm blocked 2
I'm blocked 3
I'm blocked 4
I'm blocked 5
由此,可以得出 Flow 在使用各个 suspend 函数时(本例子中使用了collect、emit函数)不会阻塞主线程的运行。
四. Flow VS RxJava
Kotlin 协程库的设计本身也参考了 RxJava ,下图展示了如何从 RxJava 迁移到 Kotlin 协程。(火和冰形象地表示了 Hot、Cold Stream)
![](https://img.haomeiwen.com/i2613397/c6471bd00cf12c5b.jpeg)
4.1 Cold Stream
flow 的代码块只有调用 collected() 才开始运行,正如 RxJava 创建的 Observables 只有调用 subscribe() 才开始运行一样。
4.2 Hot Stream
如图上所示,可以借助 Kotlin Channel 来实现 Hot Stream。
4.3. Completion
Flow 完成时(正常或出现异常时),如果需要执行一个操作,它可以通过两种方式完成:imperative、declarative。
4.3.1 imperative
通过使用 try ... finally 实现
fun main() = runBlocking {
try {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect { println(it) }
} finally {
println("Done")
}
}
4.3.2 declarative
通过 onCompletion() 函数实现
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.onCompletion { println("Done") }
.collect { println(it) }
}
4.3.3 onCompleted (借助扩展函数实现)
借助扩展函数可以实现类似 RxJava 的 onCompleted() 功能,只有在正常结束时才会被调用:
fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
collect { value -> emit(value) }
action()
}
它的使用类似于 onCompletion()
fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
collect { value -> emit(value) }
action()
}
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.onCompleted { println("Completed...") }
.collect{println(it)}
}
但是假如 Flow 异常结束时,是不会执行 onCompleted() 函数的。
4.4 Backpressure
Backpressure 是响应式编程的功能之一。
RxJava2 Flowable 支持的 Backpressure 策略,包括:
- MISSING:创建的 Flowable 没有指定背压策略,不会对通过 OnNext 发射的数据做缓存或丢弃处理。
- ERROR:如果放入 Flowable 的异步缓存池中的数据超限了,则会抛出 MissingBackpressureException 异常。
- BUFFER:Flowable 的异步缓存池同 Observable 的一样,没有固定大小,可以无限制添加数据,不会抛出 MissingBackpressureException 异常,但会导致 OOM。
- DROP:如果 Flowable 的异步缓存池满了,会丢掉将要放入缓存池中的数据。
- LATEST:如果缓存池满了,会丢掉将要放入缓存池中的数据。这一点跟 DROP 策略一样,不同的是,不管缓存池的状态如何,LATEST 策略会将最后一条数据强行放入缓存池中。
而 Flow 的 Backpressure 是通过 suspend 函数实现。
4.4.1 buffer() 对应 BUFFER 策略
fun currTime() = System.currentTimeMillis()
var start: Long = 0
fun main() = runBlocking {
val time = measureTimeMillis {
(1..5)
.asFlow()
.onStart { start = currTime() }
.onEach {
delay(100)
println("Emit $it (${currTime() - start}ms) ")
}
.buffer()
.collect {
println("Collect $it starts (${currTime() - start}ms) ")
delay(500)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
println("Cost $time ms")
}
执行结果:
Emit 1 (104ms)
Collect 1 starts (108ms)
Emit 2 (207ms)
Emit 3 (309ms)
Emit 4 (411ms)
Emit 5 (513ms)
Collect 1 ends (613ms)
Collect 2 starts (613ms)
Collect 2 ends (1114ms)
Collect 3 starts (1114ms)
Collect 3 ends (1615ms)
Collect 4 starts (1615ms)
Collect 4 ends (2118ms)
Collect 5 starts (2118ms)
Collect 5 ends (2622ms)
Collected in 2689 ms
4.4.2 conflate() 对应 LATEST 策略
fun main() = runBlocking {
val time = measureTimeMillis {
(1..5)
.asFlow()
.onStart { start = currTime() }
.onEach {
delay(100)
println("Emit $it (${currTime() - start}ms) ")
}
.conflate()
.collect {
println("Collect $it starts (${currTime() - start}ms) ")
delay(500)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
println("Cost $time ms")
}
执行结果:
Emit 1 (106ms)
Collect 1 starts (110ms)
Emit 2 (213ms)
Emit 3 (314ms)
Emit 4 (419ms)
Emit 5 (520ms)
Collect 1 ends (613ms)
Collect 5 starts (613ms)
Collect 5 ends (1113ms)
Cost 1162 ms
4.4.3 DROP 策略
RxJava 的 contributor:David Karnok, 他写了一个kotlin-flow-extensions库,其中包括:FlowOnBackpressureDrop.kt,这个类支持 DROP 策略。
/**
* Drops items from the upstream when the downstream is not ready to receive them.
*/
@FlowPreview
fun <T> Flow<T>.onBackpressurureDrop() : Flow<T> = FlowOnBackpressureDrop(this)
使用这个库的话,可以通过使用 Flow 的扩展函数 onBackpressurureDrop() 来支持 DROP 策略。
该系列的相关文章:
Kotlin Coroutines Flow 系列(一) Flow 基本使用
Kotlin Coroutines Flow 系列(三) 异常处理