kotlin<第十篇>:Flow-异步流
Flow: 是一种类似于序列的冷流,flow构建器中的代码直到流被手机的时候才运行。
流的连续性:流的每次单独收集都是按顺序执行的,除非使用特殊操作符。
从上游到下游每个过渡操作符都会处理每个发射出的值,然后再交给末端操作符。
flow构建器创建一个函数
返回多个值,而且是异步的,不是一次性返回
(1)构建流的三种方式
// flow构建器创建一个函数
// 返回多个值,而且是异步的,不是一次性返回
suspend fun simpleFlow() = flow<Int> {
for (i in 1..3) {
delay(1000)
emit(i) // 发射,产生一个元素
}
}
runBlocking {
// Flow构建方式1
simpleFlow().collect { value -> println(value) } // 收集元素
// Flow构建方式2
(1..5).asFlow().filter {
it % 2 == 0
}.map {
println("Map $it")
}.onEach {
delay(1000)
}.collect {
println("Collect $it")
}
// Flow构建方式3
flowOf("one", "two", "three").onEach { delay(1000) }.collect { values ->
println(values)
}
}
(2)流的上下文
// Flow上下文验证
(1..5).asFlow().filter {
println("当前线程-filter:" + Thread.currentThread().name)
it % 2 == 0
}.map {
println("当前线程-map:" + Thread.currentThread().name)
}.onEach {
delay(1000)
}.collect {
println("当前线程-collect:" + Thread.currentThread().name)
println("Collect $it")
}
从打印结果上看,上游和下游都是在主线程。
但是,一般情况下,Flow构建之后的代码块中是耗时操作,所以不能放在主线程,解决方案是:在Flow构建器后面添加 flowOn(Dispatchers.Default)
,改造后的代码如下:
suspend fun simpleFlow() = flow<Int> {
for (i in 1..3) {
delay(1000)
emit(i) // 发射,产生一个元素
}
}.flowOn(Dispatchers.Default)
fun main() {
runBlocking {
// Flow构建方式1
simpleFlow().collect { value -> println(value) } // 收集元素
// Flow构建方式2
(1..5).asFlow().filter {
println("当前线程-filter:" + Thread.currentThread().name)
it % 2 == 0
}.map {
println("当前线程-map:" + Thread.currentThread().name)
}.onEach {
delay(1000)
}.flowOn(Dispatchers.Default).collect {
println("当前线程-collect:" + Thread.currentThread().name)
println("Collect $it")
}
// Flow构建方式3
flowOf("one", "two", "three").flowOn(Dispatchers.Default).onEach { delay(1000) }.collect { values ->
println(values)
}
}
}
(3)启动流
启动流:launchIn传入协程作用域形参,使用launchIn替换collect我们可以在指定协程中启动流的收集
(1..5).asFlow().onEach {
delay(1000)
}.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO)).join()
(1..5).asFlow().onEach {
delay(1000)
}.flowOn(Dispatchers.Default).launchIn(this).join()
(4)流的取消
使用 withTimeoutOrNull
方式取消:
suspend fun simpleFlow() = flow<Int> {
for (i in 1..3) {
delay(1000)
emit(i) // 发射,产生一个元素
}
}.flowOn(Dispatchers.Default)
fun main() {
runBlocking {
withTimeoutOrNull(2000) {
// Flow构建方式1
simpleFlow().collect { value -> println(value) } // 收集元素
}
withTimeoutOrNull(2000) {
(1..5).asFlow().onEach {
delay(1000)
}.flowOn(Dispatchers.Default).collect {
println("Collect $it")
}
}
withTimeoutOrNull(2000) {
flowOf("one", "two", "three").flowOn(Dispatchers.Default).onEach { delay(1000) }.collect { values ->
println(values)
}
}
withTimeoutOrNull(2000) {
(1..5).asFlow().onEach {
delay(1000)
}.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO)).join()
}
println("Done...")
}
}
另外,启动流还可以调用 cancelAndJoin
取消。
val job = (1..5).asFlow().onEach {
delay(1000)
}.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO))
delay(1000)
job.cancelAndJoin()
(5)流的取消检测
为方便起见,流构建器对每个发射值执行附加的ensureActive 检测以进行取消,这意味着从 flow{...} 发出的繁忙循环是可以取消的。
出于性能原因,大多数其他流操作不会自行执行其他取消检测,在协程处于繁忙循环的情况下,必须明确检测是否取消。
通过cancellable操作符来执行此操作。
suspend fun simpleFlow() = flow<Int> {
for (i in 1..5) {
delay(1000)
emit(i) // emit自带检测是否取消的能力
}
}.flowOn(Dispatchers.Default)
fun main() {
runBlocking {
// emit 自带检测是否取消的能力
simpleFlow().collect { value ->
if (value == 3) cancel()
}
// 如果没有emit,需要使用 cancellable
(1..5).asFlow().cancellable().onEach {
delay(1000)
}.flowOn(Dispatchers.Default).collect { value ->
if (value == 3) cancel()
}
}
}
(6)背压
背压:水流受到与流动方向一致的压力。
生产者、消费者模式,只要生产效率 > 消费效率,那么就会产生背压。
处理背压的方式有:
- buffer(),并发运行流中发射元素的代码
- conflate(),合并发射项,不对每个值进行处理
- collectLatest(),取消并重新发射最后一个值
- 当必须更改CoroutineDispatcher时,flowOn操作符使用了相同的缓冲机制,但是buffer函数显示地请求缓冲而
不改变执行上下文
。
suspend fun simpleFlow() = flow<Int> {
for (i in 1..50) {
println("发送数据:$i")
delay(100)
emit(i)
}
}
fun main() {
runBlocking {
val time = measureTimeMillis {
simpleFlow()
.collect { value ->
delay(300)
println("接收数据:$value")
}
}
println("耗时:$time")
}
}
以上代码,发送数据和接收数据都是在同一个线程中并行执行,如果存在耗时程序,将特别影响效率。
为了增加执行效率,可以使用 buffer
设置缓存大小,从而起到加快执行速率的效果。
val time = measureTimeMillis {
// 背压
simpleFlow()
.buffer(10)
.collect { value ->
delay(300)
println("接收数据:$value")
}
}
但是,从生产者/消费者的设计思想的角度上考虑,发送数据最好放在子线程。
val time = measureTimeMillis {
// 背压
simpleFlow()
.flowOn(Dispatchers.Default)
.collect { value ->
delay(300)
println("接收数据:$value")
}
}
使用 flowOn
可以指定 Flow 的协程作用域,这样可以将 并行
转成 并发
,从而加快执行效率。
runBlocking {
val time = measureTimeMillis {
// 背压
simpleFlow()
.conflate()
.collect { value ->
delay(300)
println("接收数据==:$value")
}
}
println("耗时:$time")
}
以上代码使用 conflate
,中间一些元素不会处理,从而加快执行效率。
val time = measureTimeMillis {
// 背压
simpleFlow()
.collectLatest { value ->
delay(300)
println("接收数据==:$value")
}
以上代码将 collect
改成 collectLatest
之后,只会处理最后一个值,从而加速执行速度。
(7)转换操作符
使用map转换:
suspend fun simpleFlow() = flow<Int> {
for (i in 1..3) {
println(i)
emit(i)
}
}
fun main() {
runBlocking {
simpleFlow()
.map { value ->
"response $value"
}
.collect { value ->
println(value)
}
}
}
使用transform转换,可以转换成任意次、任意值的Flow:
suspend fun simpleFlow() = flow<Int> {
for (i in 1..3) {
println(i)
emit(i)
}
}
fun main() {
runBlocking {
simpleFlow()
.transform { request ->
emit("request $request")
emit("request $request")
}
.collect { value ->
println(value)
}
}
}
(8)限长操作符
take
是限长操作符,可以限制处理的数量:
suspend fun simpleFlow() = flow<Int> {
for (i in 1..3) {
println(i)
emit(i)
}
}
fun main() {
runBlocking {
simpleFlow()
.take(2)
.collect { value ->
println(value)
}
}
}
(9)末端操作符
末端操作符是在流上用于 启动流收集的挂起函数
。collect是最基础的末端操作符,但是还有另外一些更加方便使用的末端操作符:
- 转化为各种集合,例如:toList与toSet。
- 获取第一个(first)值与确保流发射单个(single)值的操作符。
- 使用reduce与fold将流规约到单个值。
fun main() {
runBlocking {
val sum = simpleFlow()
.reduce { a, b ->
a + b
}
println(sum)
}
}
reduce 操作符可以将元素累加。
reduce的返回值类型必须和集合的元素类型相符。
suspend fun simpleFlow() = flow<Int> {
for (i in 1..3) {
emit(i)
}
}
fun main() {
runBlocking {
val newStr = simpleFlow()
.fold(StringBuilder()) { str: StringBuilder, a: Int ->
str.append(a).append(" ")
}
println(newStr)
}
}
而fold的返回值类型则不受约束。
(10)组合操作符
zip
操作符将两个流合并。
runBlocking {
val nums1 = (1..3).asFlow()
val nums2 = flowOf("one", "two", "three")
nums1.zip(nums2) {a, b ->
"$a $b"
}.collect {value->
println(value)
}
}
(11)展平操作符
流表示异步接收的值序列,所以很容易遇到这种情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不同的展平模式,为此,存在一系列的流展平操作符:
- flatMapConcat:连接模式
- flatMapMerge:合并模式
- flatMapLatest: 最新展平模式
suspend fun requestFlow(i: Int) = flow<String> {
emit("request $i first")
delay(500)
emit("request $i second")
}
fun main() {
runBlocking {
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach { delay(100) }
.flatMapConcat {
requestFlow(it) // Flow的元素是Flow
}
.collect { value->
println("$value -- ${System.currentTimeMillis() - startTime}")
}
}
}
代码中 flatMapConcat
可以换成 flatMapMerge
或者 flatMapLatest
。
三者的执行结果是:
flatMapConcat :(requestFlow全部执行完)
request 1 first -- 198
request 1 second -- 701
request 2 first -- 815
request 2 second -- 1319
request 3 first -- 1428
request 3 second -- 1932
flatMapMerge:(不需要等待requestFlow全部执行完)
request 1 first -- 281
request 2 first -- 361
request 3 first -- 470
request 1 second -- 798
request 2 second -- 876
request 3 second -- 985
flatMapLatest:
request 1 first -- 250
request 2 first -- 376
request 3 first -- 485
request 3 second -- 1001
(12)流的异常处理
suspend fun requestFlow() = flow<Int> {
for (i in 1..3) {
emit(i)
throw RuntimeException("exception")
}
}.catch {e: Throwable ->
println("上游异常捕获:" + e.message)
}
fun main() {
runBlocking {
try {
requestFlow()
.collect { value->
check(value < 2) // 检查异常
println(value)
}
} catch (e: Throwable) {
println("下游异常捕获:" + e.message)
}
}
}
check:检查异常,一旦检查到异常,程序crash。
下游通过 try...catch
捕获异常,上游Flow自带 catch
函数。
(13)流的完成
收集完成时,使用 finally
,表示收集完成。
suspend fun requestFlow() = flow<Int> {
for (i in 1..3) {
emit(i)
}
}
fun main() {
runBlocking {
try {
requestFlow().collect { value-> println(value) }
} finally {
println("...完成...")
}
}
}
使用 onCompletion
也可以表示完成:
suspend fun requestFlow() = flow<Int> {
for (i in 1..3) {
emit(i)
throw RuntimeException("exception")
}
}.catch {exception->
println("catch -> exception:" + exception.message)
}
fun main() {
runBlocking {
requestFlow()
.onCompletion {exception ->
if (exception != null) { // 异常导致完成
println("finish -> exception:" + exception.message)
} else { // 正常结束
println("正常结束")
}
}
.collect { value-> println(value) }
}
}
onCompletion
可以拿到异常信息,但是不能捕获异常。
[完...]