Kotlin

Kotlin异步流

2022-02-24  本文已影响0人  漆先生

一、表示多个值

Kotlin 中可以使用集合来表示多个值

1.序列

fun  simple(): Sequence<Int> = sequence {//序列构建器
    for (i in 1..3) {
        Thread.sleep(100)
        yield(i)
    }
}

fun main() {
    simple().forEach { value -> println(value) }
}

2.挂起函数

计算过程阻塞运行该代码的主线程。当这些值由异步代码计算时,我们可以使用 suspend 修饰符标记函数 simple ,这样它就可以在不阻塞的情况下执行其工作并将结果作为列表返回:

suspend fun simple(): List<Int> {
    delay(1000)
    return listOf(1, 2, 3)
}

fun main() = runBlocking {
    simple().forEach { value -> println(value) }
}

3.流

为了表式异步计算的值流(stream),我们可以使用 Flow 类型(正如同步计算值会使用 Sequence 类型):

fun simple(): Flow<Int> = flow { // 流构建器
    for (i in 1..3) {
        delay(100)
        emit(i) // 发送下一个值
    }
}

fun main() = runBlocking {
    // 启动并发的协程以验证主线程并未阻塞
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    simple().collect { value -> println(value) }
}

输出:
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

二、流是冷的

Flow 是⼀种类似于序列的冷流 。 flow 构建器中的代码直到流被收集的时候才运行。

三、流取消基础

流采用与协程同样的协作取消。

四、流构建器

flow { ... } 构建器是最基础的⼀个。还有构建器使流的声明更简单:

五、过度流操作符

过渡操作符应用于上游流,并返回下游流。这些操作符也是冷操作符,就像流⼀样。这类操作符本身不是挂起函数。它运行的速度很快,返回新的转换流的定义。
流与序列的主要区别在于这些操作符中的代码可以调用挂起函数。

suspend fun performRequest(request: Int): String {
    delay(1000)
    return "response $request"
}

fun main() = runBlocking {
    (1..3).asFlow()
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

输出:

response 1
response 2
response 3

1.转换操作符

在流转换操作符中,最通用的⼀种称为 transform。它可以用来模仿简单的转换,例如 map 与 filter,以及实施更复杂的转换。使用 transform 操作符,我们可以发射任意值任意次。
使用 transform 我们可以在执行长时间运行的异步请求之前发射⼀个字符串并跟踪这个响应。

suspend fun performRequest(request: Int): String {
    delay(1000)
    return "response $request"
}

fun main() = runBlocking {
    (1..3).asFlow()
        .transform { request ->
            emit("Making request $request")
            emit(performRequest(request))
        }
        .collect { response -> println(response) }
}

输出:

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

2.限长操作符

限长过渡操作符(例如 take)在流触及相应限制的时候会将它的执行取消。协程中的取消操作总是通过抛出异常来执行(不会在控制台输出对账信息,被取消的协程中 CancellationException 被认为是协程执行结束的正常原因),不会奔溃这样所有的资源管理函数(如 try {...} finally {...} 块)会在取消的情况下正常运行:

fun numbers(): Flow<Int> = flow {
    try {
        emit(1)
        emit(2)
        println("This line will not execute")
        emit(3)
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking {
    numbers()
        .take(2)
        .collect { value -> println(value) }
}

输出:

1
2
Finally in numbers

六、末端流操作符

末端操作符是在流上用于启动流收集的挂起函数。collect 是最基础的末端操作符,还有⼀些其它末端操作符:

fun main() = runBlocking {
    val sum = (1..5).asFlow()
        .map { it * it }
//        .first()//输出1
//        .reduce { a, b -> a + b }  //输出55
          .fold(1) { a, b -> a + b } //带初始值的累加输出56
    println(sum)
}

七、流是连续的

流的每次单独收集都是按顺序执行的,除非进行特殊操作的操作符使用多个流。默认情况下不启动新协程。从上游到下游每个过渡操作符都会处理每个发射出的值,然后再交给末端操作符。

八、流上下文

流的收集总是在调用协程的上下文中发生。该属性称为上下文保存 。所以默认的,flow { ... } 构建器中的代码运行在相应流的收集器提供的上下文中。

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}

fun main() = runBlocking {
    simple().collect { value -> log("Collected $value") }
}

输出:

[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

由于 simple().collect 是在主线程调用的,那么 simple 的流主体也是在主线程调用的。这是快速运行或异步代码的理想默认形式,它不关心执行的上下文并且不会阻塞调用者。

1.withContext发出错误

长时间运行的消耗 CPU 的代码也许需要在 Dispatchers.Default 上下文中执行,并且更新 UI 的代码也许需要在 Dispatchers.Main 中执行。
通常,withContext 用于在 Kotlin 协程中改变代码的上下文,但是 flow {...} 构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射(emit)。

2.flowOn操作符

flowOn 函数用于更改流发射的上下文。

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100)
        log("Emitting $i")
        emit(i)
    }
}.flowOn(Dispatchers.Default)

fun main() = runBlocking {
    simple().collect { value ->
        log("Collected $value")
    }
}

输出:

[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

flowOn 操作符已改变流的默认顺序性。现在收集发生在⼀个协程中 (“coroutine#1”)而发射发生在运行于另⼀个线程中与收集协程并发运行的另⼀个协程(“coroutine#2”)中。当上游流必须改变其上下文中的 CoroutineDispatcher 的时候,flowOn 操作符创建了另⼀个协程。

九、缓冲

buffer 操作符来并发运行流中发射元素的代码以及收集的代码,而不是顺序运行它们:

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking {
    val time = measureTimeMillis {
        simple()
            .buffer()
            .collect { value ->
                delay(300)
                println(value)
            }
    }
    println("Collected in $time ms")
}

输出:
1
2
3
Collected in 1098 ms

仅需要等待第⼀个数字产生的 100 毫秒以及处理每个数字各需花费的 300 毫秒

1.合并

当流代表部分操作结果或操作状态更新时,可能没有必要处理每个值,而是只处理最新的那个。
当收集器处理它们太慢的时候,conflate 操作符可以用于跳过中间值。

fun main() = runBlocking {
    val time = measureTimeMillis {
        simple()
            .conflate()
            .collect { value ->
                delay(300)
                println(value)
            }
    }
    println("Collected in $time ms")
}
输出:
1
3
Collected in 779 ms

2.处理最新值

当发射器和收集器都很慢的时候,合并是加快处理速度的⼀种方式。它通过删除发射值来实现。另⼀种方式是取消缓慢的收集器,并在每次发射新值的时候重新启动它。有⼀组与 xxx 操作符执行相同基本逻辑的 xxxLatest 操作符,但是在新值产生的时候取消执行其块中的代码。
如collecLatest

fun main() = runBlocking {
    val time = measureTimeMillis {
        simple()
            .collectLatest { value ->
                println("Collecting $value")
                delay(300)
                println("Done $value")
            }
    }
    println("Collected in $time ms")
}
输出:
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 767 ms

十、组合多个流

1.zip

zip 操作符用于组合两个流中的相关值

fun main() = runBlocking {
    val nums = (1..3).asFlow()
    val strs = flowOf("one", "two", "three")
    nums.zip(strs) { a, b -> "$a -> $b" }
        .collect { println(it) }
}

输出:
1 -> one
2 -> two
3 -> three

2.combine

当流表示⼀个变量或操作的最新值时,可能需要执行计算,参考合并操作符conflate。这依赖于相应流的最新值,并且每当上游流产生值的时候都需要重新计算。这种相应的操作符家族称为combine

fun main() = runBlocking {
    val nums = (1..3).asFlow().onEach { delay(300) }
    val strs = flowOf("one", "two", "three").onEach { delay(400) }
    val startTime = currentTimeMillis()
    nums.combine(strs) { a, b -> "$a -> $b" }
        .collect { value ->
            println("$value at ${currentTimeMillis() - startTime} ms from start")
        }
}

输出:
1 -> one at 452 ms from start
2 -> one at 656 ms from start
2 -> two at 858 ms from start
3 -> two at 967 ms from start
3 -> three at 1265 ms from start

十一、展平流

1.flatMapConcat

连接模式由 flatMapConcat 与 flattenConcat 操作符实现。它们是相应序列操作符最相近的类似物。它们在等待内部流完成之后开始收集下⼀个值。

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500)
    emit("$i: Second")
}

fun main() = runBlocking {
    val startTime = currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) }
        .flatMapConcat { requestFlow(it) }
        .collect { value ->
            println("$value at ${currentTimeMillis() - startTime} ms from start")
        }
}

输出:
1: First at 134 ms from start
1: Second at 649 ms from start
2: First at 758 ms from start
2: Second at 1270 ms from start
3: First at 1379 ms from start
3: Second at 1889 ms from start

2.flatMapMerge

另⼀种展平模式是并发收集所有传入的流,并将它们的值合并到⼀个单独的流,以便尽快的发射值。它由 flatMapMerge 与 flattenMerge 操作符实现。他们都接收可选的用于限制并发收集的流的个数的 concurrency 参数。

fun main() = runBlocking {
    val startTime = currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) }
        .flatMapMerge { requestFlow(it) }
        .collect { value ->
            println("$value at ${currentTimeMillis() - startTime} ms from start")
        }
}

输出:
1: First at 175 ms from start
2: First at 268 ms from start
3: First at 377 ms from start
1: Second at 685 ms from start
2: Second at 777 ms from start
3: Second at 888 ms from start

3.flatMapLatest

flatMapLatest与 collectLatest 操作符类似,在发出新流后立即取消先前流的收集。

fun main() = runBlocking {
    val startTime = currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) }
        .flatMapLatest { requestFlow(it) }
        .collect { value ->
            println("$value at ${currentTimeMillis() - startTime} ms from start")
        }
}

输出:
1: First at 160 ms from start
2: First at 334 ms from start
3: First at 445 ms from start
3: Second at 955 ms from start

十二、流异常

当运算符中的发射器或代码抛出异常时,流收集可以带有异常的完成。

1.收集

try 与 catch

2.透明性

流必须对异常透明,即在 flow { ... } 构建器内部的 try/catch 块中发射值是违反异常透明性的。

透明捕获

catch 过渡操作符遵循异常透明性,仅捕获上游异常( catch 操作符上游的异常,但是它下面的不是)。如果 collect { ... } 块(位于 catch 之下)抛出⼀个异常,那么异常会逃逸。

声明式捕获

可以将 catch 操作符的声明性与处理所有异常的期望相结合,将 collect 操作符的代码块移动到 onEach 中,并将其放到 catch 操作符之前。收集该流必须调用无参的 collect() 。

十三、流完成

当流收集完成时(普通情况或异常情况),它可能需要执行⼀个动作。它可以通过两种方式完成:命令式或声明式。

与 catch 操作符的另⼀个不同点是 onCompletion 能观察到所有异常并且仅在上游流成功完成(没有取消或失败)的情况下接收⼀个 null 异常。

fun main() = runBlocking {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
}

输出:

1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2//程序异常终止

十四、启动流

launchIn末端操作符, 必要参数是 CoroutineScope ,指定哪个协程来启动流的收集。

十五、流取消检测

流构建器对每个发射值执行附加的 ensureActive 检测以进行取消。这意味着从 flow { ... } 发出的繁忙循环是可以取消的:

fun foo(): Flow<Int> = flow {
    for (i in 1..5) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking {
    foo().collect { value ->
        if (value == 3) cancel()
        println(value)
    }
}

输出:

Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@2a742aa2//异常终止

出于性能原因,大多数其他流操作不会自动执行其他取消检测。例如,如果使⽤ IntRange.asFlow 扩展来编写相同的繁忙循环。
必须明确检测是否取消。可以添加 .onEach { currentCoroutineContext().ensureActive() } ,也可使用 cancellable 操作符来执行此操作。

fun main() = runBlocking {
    (1..5).asFlow().cancellable().collect { value ->
        if (value == 3) cancel()
        println(value)
    }
}

十六、流(flow)与响应式流(Reactive Streams)

Flow 依然是响应式流,设计上和RxJava相似。

上一篇 下一篇

猜你喜欢

热点阅读