kotlin<第十篇>:Flow-异步流

2023-02-23  本文已影响0人  NoBugException

Flow: 是一种类似于序列的冷流,flow构建器中的代码直到流被手机的时候才运行。
流的连续性:流的每次单独收集都是按顺序执行的,除非使用特殊操作符。
从上游到下游每个过渡操作符都会处理每个发射出的值,然后再交给末端操作符。

flow构建器创建一个函数
返回多个值,而且是异步的,不是一次性返回

(1)构建流的三种方式

// flow构建器创建一个函数
// 返回多个值,而且是异步的,不是一次性返回
suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        delay(1000)
        emit(i) // 发射,产生一个元素
    }
}
runBlocking {
    // Flow构建方式1
    simpleFlow().collect { value -> println(value) } // 收集元素

    // Flow构建方式2
    (1..5).asFlow().filter {
        it % 2 == 0
    }.map {
        println("Map $it")
    }.onEach {
        delay(1000)
    }.collect {
        println("Collect $it")
    }

    // Flow构建方式3
    flowOf("one", "two", "three").onEach { delay(1000) }.collect { values ->
        println(values)
    }
}

(2)流的上下文

    // Flow上下文验证
    (1..5).asFlow().filter {
        println("当前线程-filter:" + Thread.currentThread().name)
        it % 2 == 0
    }.map {
        println("当前线程-map:" + Thread.currentThread().name)
    }.onEach {
        delay(1000)
    }.collect {
        println("当前线程-collect:" + Thread.currentThread().name)
        println("Collect $it")
    }

从打印结果上看,上游和下游都是在主线程。
但是,一般情况下,Flow构建之后的代码块中是耗时操作,所以不能放在主线程,解决方案是:在Flow构建器后面添加 flowOn(Dispatchers.Default),改造后的代码如下:

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        delay(1000)
        emit(i) // 发射,产生一个元素
    }
}.flowOn(Dispatchers.Default)

fun main() {
    runBlocking {
        // Flow构建方式1
        simpleFlow().collect { value -> println(value) } // 收集元素

        // Flow构建方式2
        (1..5).asFlow().filter {
            println("当前线程-filter:" + Thread.currentThread().name)
            it % 2 == 0
        }.map {
            println("当前线程-map:" + Thread.currentThread().name)
        }.onEach {
            delay(1000)
        }.flowOn(Dispatchers.Default).collect {
            println("当前线程-collect:" + Thread.currentThread().name)
            println("Collect $it")
        }

        // Flow构建方式3
        flowOf("one", "two", "three").flowOn(Dispatchers.Default).onEach { delay(1000) }.collect { values ->
            println(values)
        }

    }
}

(3)启动流

启动流:launchIn传入协程作用域形参,使用launchIn替换collect我们可以在指定协程中启动流的收集

    (1..5).asFlow().onEach {
        delay(1000)
    }.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO)).join()


    (1..5).asFlow().onEach {
        delay(1000)
    }.flowOn(Dispatchers.Default).launchIn(this).join()

(4)流的取消

使用 withTimeoutOrNull 方式取消:

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        delay(1000)
        emit(i) // 发射,产生一个元素
    }
}.flowOn(Dispatchers.Default)

fun main() {
    runBlocking {

        withTimeoutOrNull(2000) {
            // Flow构建方式1
            simpleFlow().collect { value -> println(value) } // 收集元素
        }

        withTimeoutOrNull(2000) {
            (1..5).asFlow().onEach {
                delay(1000)
            }.flowOn(Dispatchers.Default).collect {
                println("Collect $it")
            }
        }

        withTimeoutOrNull(2000) {
            flowOf("one", "two", "three").flowOn(Dispatchers.Default).onEach { delay(1000) }.collect { values ->
                println(values)
            }
        }

        withTimeoutOrNull(2000) {
            (1..5).asFlow().onEach {
                delay(1000)
            }.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO)).join()
        }

        println("Done...")

    }
}

另外,启动流还可以调用 cancelAndJoin 取消。

    val job = (1..5).asFlow().onEach {
        delay(1000)
    }.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO))
    delay(1000)
    job.cancelAndJoin()

(5)流的取消检测

为方便起见,流构建器对每个发射值执行附加的ensureActive 检测以进行取消,这意味着从 flow{...} 发出的繁忙循环是可以取消的。
出于性能原因,大多数其他流操作不会自行执行其他取消检测,在协程处于繁忙循环的情况下,必须明确检测是否取消。
通过cancellable操作符来执行此操作。

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..5) {
        delay(1000)
        emit(i) // emit自带检测是否取消的能力
    }
}.flowOn(Dispatchers.Default)

fun main() {
    runBlocking {

        // emit 自带检测是否取消的能力
        simpleFlow().collect { value ->
            if (value == 3) cancel()
        }

        // 如果没有emit,需要使用 cancellable
        (1..5).asFlow().cancellable().onEach {
            delay(1000)
        }.flowOn(Dispatchers.Default).collect { value ->
            if (value == 3) cancel()
        }

    }
}

(6)背压

背压:水流受到与流动方向一致的压力。
生产者、消费者模式,只要生产效率 > 消费效率,那么就会产生背压。

处理背压的方式有:

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..50) {
        println("发送数据:$i")
        delay(100)
        emit(i)
    }
}

fun main() {
    runBlocking {
        val time = measureTimeMillis {
            simpleFlow()
                .collect { value ->
                delay(300)
                println("接收数据:$value")
            }
        }
        println("耗时:$time")
    }
}

以上代码,发送数据和接收数据都是在同一个线程中并行执行,如果存在耗时程序,将特别影响效率。

为了增加执行效率,可以使用 buffer 设置缓存大小,从而起到加快执行速率的效果。

    val time = measureTimeMillis {
        // 背压
        simpleFlow()
            .buffer(10)
            .collect { value ->
            delay(300)
            println("接收数据:$value")
        }
    }

但是,从生产者/消费者的设计思想的角度上考虑,发送数据最好放在子线程。

    val time = measureTimeMillis {
        // 背压
        simpleFlow()
            .flowOn(Dispatchers.Default)
            .collect { value ->
            delay(300)
            println("接收数据:$value")
        }
    }

使用 flowOn 可以指定 Flow 的协程作用域,这样可以将 并行 转成 并发,从而加快执行效率。

runBlocking {
    val time = measureTimeMillis {
        // 背压
        simpleFlow()
            .conflate()
            .collect { value ->
            delay(300)
            println("接收数据==:$value")
        }
    }
    println("耗时:$time")
}

以上代码使用 conflate,中间一些元素不会处理,从而加快执行效率。

    val time = measureTimeMillis {
        // 背压
        simpleFlow()
            .collectLatest { value ->
            delay(300)
            println("接收数据==:$value")
        }

以上代码将 collect 改成 collectLatest 之后,只会处理最后一个值,从而加速执行速度。

(7)转换操作符

使用map转换:

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        println(i)
        emit(i)
    }
}

fun main() {
    runBlocking {
        simpleFlow()
            .map { value ->
                "response $value"
            }
            .collect { value ->
                println(value)
            }
    }
}

使用transform转换,可以转换成任意次、任意值的Flow:

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        println(i)
        emit(i)
    }
}

fun main() {
    runBlocking {
        simpleFlow()
            .transform { request ->
                emit("request $request")
                emit("request $request")
            }
            .collect { value ->
                println(value)
            }
    }
}

(8)限长操作符

take 是限长操作符,可以限制处理的数量:

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        println(i)
        emit(i)
    }
}

fun main() {
    runBlocking {
        simpleFlow()
            .take(2)
            .collect { value ->
                println(value)
            }
    }
}

(9)末端操作符

末端操作符是在流上用于 启动流收集的挂起函数。collect是最基础的末端操作符,但是还有另外一些更加方便使用的末端操作符:

fun main() {
    runBlocking {
        val sum = simpleFlow()
            .reduce { a, b ->
                a + b
            }
        println(sum)
    }
}

reduce 操作符可以将元素累加。
reduce的返回值类型必须和集合的元素类型相符。

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        emit(i)
    }
}

fun main() {
    runBlocking {
        val newStr = simpleFlow()
            .fold(StringBuilder()) { str: StringBuilder, a: Int ->
                str.append(a).append(" ")
            }
        println(newStr)
    }
}

而fold的返回值类型则不受约束。

(10)组合操作符

zip 操作符将两个流合并。

runBlocking {
    val nums1 = (1..3).asFlow()
    val nums2 = flowOf("one", "two", "three")
    nums1.zip(nums2) {a, b ->
        "$a $b"
    }.collect {value->
        println(value)
    }
}

(11)展平操作符

流表示异步接收的值序列,所以很容易遇到这种情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不同的展平模式,为此,存在一系列的流展平操作符:

suspend fun requestFlow(i: Int) = flow<String> {
    emit("request $i first")
    delay(500)
    emit("request $i second")
}

fun main() {
    runBlocking {
        val startTime = System.currentTimeMillis()
        (1..3).asFlow()
            .onEach { delay(100) }
            .flatMapConcat {
                requestFlow(it) // Flow的元素是Flow
            }
            .collect { value->
            println("$value -- ${System.currentTimeMillis() - startTime}")
        }
    }
}

代码中 flatMapConcat 可以换成 flatMapMerge 或者 flatMapLatest

三者的执行结果是:

flatMapConcat :(requestFlow全部执行完)

request 1 first -- 198
request 1 second -- 701
request 2 first -- 815
request 2 second -- 1319
request 3 first -- 1428
request 3 second -- 1932

flatMapMerge:(不需要等待requestFlow全部执行完)

request 1 first -- 281
request 2 first -- 361
request 3 first -- 470
request 1 second -- 798
request 2 second -- 876
request 3 second -- 985

flatMapLatest:

request 1 first -- 250
request 2 first -- 376
request 3 first -- 485
request 3 second -- 1001

(12)流的异常处理

suspend fun requestFlow() = flow<Int> {
    for (i in 1..3) {
        emit(i)
        throw RuntimeException("exception")
    }
}.catch {e: Throwable ->
    println("上游异常捕获:" + e.message)
}

fun main() {
    runBlocking {
        try {
            requestFlow()
                .collect { value->
                    check(value < 2) // 检查异常
                    println(value)
                }
        } catch (e: Throwable) {
            println("下游异常捕获:" + e.message)
        }
    }
}

check:检查异常,一旦检查到异常,程序crash。
下游通过 try...catch 捕获异常,上游Flow自带 catch 函数。

(13)流的完成

收集完成时,使用 finally,表示收集完成。

suspend fun requestFlow() = flow<Int> {
    for (i in 1..3) {
        emit(i)
    }
}

fun main() {
    runBlocking {
        try {
            requestFlow().collect { value-> println(value) }
        } finally {
            println("...完成...")
        }
    }
}

使用 onCompletion 也可以表示完成:

suspend fun requestFlow() = flow<Int> {
    for (i in 1..3) {
        emit(i)
        throw RuntimeException("exception")
    }
}.catch {exception->
    println("catch -> exception:" + exception.message)
}

fun main() {
    runBlocking {

        requestFlow()
            .onCompletion {exception ->
                if (exception != null) { // 异常导致完成
                    println("finish -> exception:" + exception.message)
                } else { // 正常结束
                    println("正常结束")
                }
            }
            .collect { value-> println(value) }

    }
}

onCompletion 可以拿到异常信息,但是不能捕获异常。

[完...]

上一篇下一篇

猜你喜欢

热点阅读