Kotlin(二十)异步流-操作符<5> 流完成
-
流完成
当流收集完成时(普通情况或异常情况),它可能需要执行一个动作。 你可能已经注意到,它可以通过两种方式完成:命令式或声明式。
命令式 finally 块
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} finally {
println("Done")
}
}
1
2
3
Done
除了 try/catch 之外,收集器还能使用 finally 块在 collect 完成时执行一个动作。
声明式处理
[onCompletion]的主要优点是其 lambda 表达式的可空参数 Throwable
可以用于确定流收集是正常完成还是有异常发生。在下面的示例中 simple
流在发射数字 1 之后抛出了一个异常
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
1
Flow completed exceptionally
Caught exception
completed 操作符与 catch 不同,它不处理异常。我们可以看到前面的示例代码,异常仍然流向下游。它将被提供给后面的 onCompletion
操作符,并可以由 catch
操作符处理。
成功完成
与 [catch] 操作符的另一个不同点是 [onCompletion]能观察到所有异常并且仅在上游流成功完成(没有取消或失败)的情况下接收一个 null
异常。
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
我们可以看到完成时 cause 不为空,因为流由于下游异常而中止:
1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at FileKt$main$1$invokeSuspend$$inlined$collect$1.emit (Collect.kt:135)
at kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$9.collect (SafeCollector.common.kt:115)
at kotlinx.coroutines.flow.FlowKt__EmittersKt$onCompletion$$inlined$unsafeFlow$1.collect (SafeCollector.common.kt:114)
命令式还是声明式
现在我们知道如何收集流,并以命令式与声明式的方式处理其完成及异常情况。 这里有一个很自然的问题是,哪种方式应该是首选的?为什么? 作为一个库,我们不主张采用任何特定的方式,并且相信这两种选择都是有效的, 应该根据自己的喜好与代码风格进行选择。
-
启动流
使用流表示来自一些源的异步事件是很简单的。 在这个案例中,我们需要一个类似 addEventListener
的函数,该函数注册一段响应的代码处理即将到来的事件,并继续进行进一步的处理。onEach 操作符可以担任该角色。 然而,onEach
是一个过渡操作符。我们也需要一个末端操作符来收集流。 否则仅调用 onEach
是无效的。
如果我们在 onEach
之后使用 collect 末端操作符,那么后面的代码会一直等待直至流被收集:
// 模仿事件流
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- 等待流收集
println("Done")
}
Event: 1
Event: 2
Event: 3
Done
末端操作符可以在这里派上用场。使用 launchIn 替换 collect 我们可以在单独的协程中启动流的收集,这样就可以立即继续进一步执行代码:
un main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- 在单独的协程中执行流
println("Done")
}
Done
Event: 1
Event: 2
Event: 3
launchIn
必要的参数 CoroutineScope 指定了用哪一个协程来启动流的收集。在先前的示例中这个作用域来自 runBlocking 协程构建器,在这个流运行的时候,runBlocking 作用域等待它的子协程执行完毕并防止 main 函数返回并终止此示例。
在实际的应用中,作用域来自于一个寿命有限的实体。在该实体的寿命终止后,相应的作用域就会被取消,即取消相应流的收集。这种成对的 onEach { ... }.launchIn(scope)
工作方式就像 addEventListener
一样。而且,这不需要相应的 removeEventListener
函数, 因为取消与结构化并发可以达成这个目的。
注意,launchIn 也会返回一个 Job,可以在不取消整个作用域的情况下仅取消相应的流收集或对其进行 join。
-
流取消检测
为方便起见,[流]构建器对每个发射值执行附加的 [ensureActive]检测以进行取消。 这意味着从 flow { ... }
发出的繁忙循环是可以取消的:
fun foo(): Flow<Int> = flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
仅得到不超过 3 的数字,在尝试发出 4 之后抛出
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
at kotlinx.coroutines.JobSupport.cancel (JobSupport.kt:1578)
at kotlinx.coroutines.CoroutineScopeKt.cancel (CoroutineScope.kt:287)
at kotlinx.coroutines.CoroutineScopeKt.cancel$default (CoroutineScope.kt:285)
参考: http://www.kotlincn.net/docs/reference/coroutines/flow.html#%E5%BC%82%E6%AD%A5%E6%B5%81