Kotlin(二十)异步流 开始
异步流
挂起函数可以异步的返回单个值,但是该如何异步返回多个计算好的值呢?这正是 Kotlin 流(Flow)的用武之地
-
表示多个值
fun simple(): List<Int> = listOf(1, 2, 3)
fun main() {
simple().forEach { value -> println(value) }
}
-
序列
如果使用一些消耗 CPU 资源的阻塞代码计算数字(每次计算需要 100 毫秒)那么我们可以使用securence 表示
fun simple(): Sequence<Int> = sequence { // 序列构建器
for (i in 1..3) {
Thread.sleep(100) // 假装我们正在计算
yield(i) // 产生下一个值
}
}
fun main() {
simple().forEach { value -> println(value) }
}
-
挂起函数
计算过程阻塞运行该代码的主线程。 当这些值由异步代码计算时,我们可以使用 suspend 修饰符标记函数 simple, 这样它就可以在不阻塞的情况下执行其工作并将结果作为列表返回:
suspend fun simple(): List<Int> {
delay(1000) // 假装我们在这里做了一些异步的事情
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
-
流
使用 List 结果类型,意味着我们只能一次返回所有值。 为了表示异步计算的值流(stream),我们可以使用 Flow 类型(正如同步计算值会使用 Sequence 类型)
fun simple(): Flow<Int> = flow { // 流构建器
for (i in 1..3) {
delay(100) // 假装我们在这里做了一些有用的事情
emit(i) // 发送下一个值
}
}
fun main() = runBlocking<Unit> {
// 启动并发的协程以验证主线程并未阻塞
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// 收集这个流
simple().collect { value -> println(value) }
}
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
-
Flow 使用细节
- 名为 flow 的 Flow 类型构建器函数。
-
flow { ... }
构建块中的代码可以挂起。 - 函数
simple
不再标有suspend
修饰符。 - 流使用 emit 函数 发射 值。
- 流使用 collect 函数 收集 值。
我们可以在 simple
的 flow { ... }
函数体内使用 Thread.sleep
代替 delay 以观察主线程在本案例中被阻塞了。
-
流是冷的
Flow 是一种类似于序列的冷流 — 这段 flow 构建器中的代码直到流被收集的时候才运行。这在以下的示例中非常明显:
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
这是返回一个流的 simple 函数没有标记 suspend 修饰符的主要原因。 通过它自己,simple() 调用会尽快返回且不会进行任何等待。该流在每次收集的时候启动, 这就是为什么当我们再次调用 collect 时我们会看到“Flow started”。
-
流取消基础
流采用与协程同样的协作取消。像往常一样,流的收集可以在当流在一个可取消的挂起函数(例如 delay)中挂起的时候取消。 以下示例展示了当 withTimeoutOrNull 块中代码在运行的时候流是如何在超时的情况下取消并停止执行其代码的:
withTimeoutOrNull
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // 在 250 毫秒后超时
simple().collect { value -> println(value) }
}
println("Done")
}
Emitting 1
1
Emitting 2
2
Done
在 simple 函数中流仅发射两个数字 就结束
-
流构建器
先前示例中的 flow { ... } 构建器是最基础的一个。还有其他构建器使流的声明更简单:
- flowOf 构建器定义了一个发射固定值集的流。
- 使用
.asFlow()
扩展函数,可以将各种集合与序列转换为流。
/ 将一个整数区间转化为流
(1..3).asFlow().collect { value -> println(value) }/