【译】协程与响应式流
译者前言
这是协程官方文档中早期的一篇关于响应式流与协程关系的一篇指南,2019 年下半年后,由于协程推出了自己的异步流 API,
所以这篇指南已经被官方删除,但当年本人翻译这篇指南的时候还是花了不少功夫,所以特此在这里保留下来,以备在以后需要的时候参考。
响应式流与协程指南
-->协程指南中的基础协程概念不是必须的,
在 kotlinx.coroutines
项目中有一系列和响应式流相关的模块:
- kotlinx-coroutines-reactive ——为 Reactive Streams 提供的适配
- kotlinx-coroutines-reactor ——为 Reactor 提供的适配
- kotlinx-coroutines-rx2 ——为 RxJava 2.x 提供的适配
本指南主要基于 Reactive Streams 的规范并使用
Publisher
接口和一些基于 RxJava 2.x 的示例,
该示例实现了响应式流的规范。
欢迎你在 Github 上 clone
kotlinx.coroutines
项目
reactive/kotlinx-coroutines-rx2/test/guide
路径中。
目录
响应式流与通道的区别
本节主要包含响应式流与以协程为基础的通道的不同点。
迭代的基础
[Channel] 与如下所示的响应式流类有类似的概念:
- Reactive stream Publisher;
- Rx Java 1.x Observable;
- Rx Java 2.x Flowable,
Publisher
的实现者。
它们都描述了一个异步的有限或无限的元素流(在 Rx 中又名 items),
并且都支持背压。
在通道中每调用一次 [receive][ReceiveChannel.receive] 就消费一个元素。
让我们用以下的例子来说明:
fun main() = runBlocking<Unit> {
// 创建一个通道,该通道每200毫秒生产一个数字,从 1 到 3
val source = produce<Int> {
println("Begin") // 在输出中标记协程开始运行
for (x in 1..3) {
delay(200) // 等待 200 毫秒
send(x) // 将数字 x 发送到通道中
}
}
// 从 source 中打印元素
println("Elements:")
source.consumeEach { // 在 source 中消费元素
println(it)
}
// 再次从 source 中打印元素
println("Again:")
source.consumeEach { // 从 source 中消费元素
println(it)
}
}
可以在这里获取完整代码。
这段代码产生了如下输出:
Elements:
Begin
1
2
3
Again:
注意,“Begin”只被打印了一次,因为 [produce] 协程构建器 被执行的时候,
只创建了一个协程来生产元素流。所有被生产的元素都被
[ReceiveChannel.consumeEach][consumeEach]
再次尝试接收元素将不会接收到任何东西。
让我们使用 kotlinx-coroutines-reactive
模块中的 [publish] 协程构建器 代替 kotlinx-coroutines-core
模块中的 [produce]
来重写这段代码。代码保持相似,
但是在 source
接收 [ReceiveChannel] 类型的地方,现在它接收响应式流的
Publisher
类型,在 [consumeEach] 被用来 消费 来源于通道中的元素的地方,
现在 [collect][org.reactivestreams.Publisher.collect] 被用来从 publisher 中 收集 元素。
fun main() = runBlocking<Unit> {
// 创建一个 publisher,每 200 毫秒生产一个数字,从 1 到 3
val source = publish<Int> {
// ^^^^^^^ <--- 这里与先前的示例不同
println("Begin") // 在输出中标记协程开始运行
for (x in 1..3) {
delay(200) // 等待 200 毫秒
send(x) // 将数字 x 发送到通道中
}
}
// 从 source 中打印元素
println("Elements:")
source.collect { // 收集元素 it
println(it)
}
// 再次从 source 中打印元素
println("Again:")
source.collect { // 收集元素 it
println(it)
}
}
可以在这里获取完整代码。
现在这段代码的输出变为:
Elements:
Begin
1
2
3
Again:
Begin
1
2
3
-->在流中生产元素的方案。当 收集 发生时,它成为元素的实际流。每个收集器都将接收到一个相同或不同的流,这取决于取决于 Publisher
的相应实现如何工作。
[publish] 协程构建器在上面的示例中没有启动协程,
但是每个 [collect][org.reactivestreams.Publisher.collect] 调用都会启动一个协程。
在这里有两个 [publish],这也就是为什么我们看到了“Begin”被打印了两次。
订阅与取消
在先前小节的第二个示例中,source.collect { ... }
用于收集所有的元素。
相反,我们可以使用 [openSubscription][org.reactivestreams.Publisher.openSubscription]
fun main() = runBlocking<Unit> {
val source = Flowable.range(1, 5) // 五个数字的区间
.doOnSubscribe { println("OnSubscribe") } // 提供了一些可被观察的点
.doOnComplete { println("OnComplete") } // ...
.doFinally { println("Finally") } // ... 在正在执行的代码中
var cnt = 0
source.openSubscription().consume { // 在源中打开通道
for (x in this) { // 迭代通道以从中接收元素
println(x)
if (++cnt >= 3) break // 当三个元素被打印出来的时候,执行 break
}
// 注意:当这段代码执行完成并阻塞的时候 `consume` 取消了该通道
}
}
可以在这里获取完整代码。
它将产生如下输出:
OnSubscribe
1
2
3
Finally
使用一个显式的 openSubscription
我们应该从相应的订阅源 [cancel][ReceiveChannel.cancel]
订阅,但是这里不需要显式调用 cancel
——
[consume] 会在内部为我们做这些事。
配置
doFinally
监听器并打印“Finally”来确认订阅确实被取消了。注意“OnComplete”
永远不会被打印,因为我们没有消费所有的元素。
如果我们 collect
所有的元素,那我们不需要使用显式的 cancel
:
fun main() = runBlocking<Unit> {
val source = Flowable.range(1, 5) // 五个数字的区间
.doOnSubscribe { println("OnSubscribe") } // 提供了一些可被观察的点
.doOnComplete { println("OnComplete") } // ……
.doFinally { println("Finally") } // …… 在正在执行的代码中
// collect the source fully
source.collect { println(it) }
}
可以在这里获取完整代码。
我们得到如下输出:
OnSubscribe
1
2
3
OnComplete
Finally
4
5
注意,如何使“OnComplete”与“Finally”在最后的元素“4”与“5”之前输出。
在这个示例中它将发生在我们的 main
函数在协程中执行时,使用 [runBlocking] 协程构建器来启动它。
我们的主协程在 flowable 中使用 source.collect { …… }
扩展函数来接收通道。
当它等待源发射元素的时候该主协程是 挂起的 ,
当最后一个元素被 Flowable.range(1, 5)
发射时它
恢复 了主协程,它被分派到主线程上打印出来
最后一个元素在稍后的时间点打印,而 source 执行完成并打印“Finally”。
背压
在 Rx Java 2.x 中一个支持背压的类被称为
Flowable。
在下面的示例中,我们可以使用 kotlinx-coroutines-rx2
模块中的协程构建器 [rxFlowable] 来定义一个
发送从 1 到 3 三个整数的 flowable。
在调用挂起的 [send][SendChannel.send] 函数之前,
它在输出中打印了一条消息,所以我们可以来研究它是如何操作的。
这些整数在主线程的上下文中被产生,
但是在使用 Rx 的
observeOn
操作符后缓冲区大小为 1 的订阅被转移到了另一个线程。
为了模拟订阅者很慢,它使用了 Thread.sleep
来模拟消耗 500 毫秒来处理每个元素。
fun main() = runBlocking<Unit> {
// 协程 —— 在主线程的上下文中快速生成元素
val source = rxFlowable {
for (x in 1..3) {
send(x) // 这是一个挂起函数
println("Sent $x") // 在成功发送元素后打印
}
}
// 使用 Rx 让一个处理速度很慢的订阅者在另一个线程订阅
source
.observeOn(Schedulers.io(), false, 1) // 指定缓冲区大小为 1 个元素
.doOnComplete { println("Complete") }
.subscribe { x ->
Thread.sleep(500) // 处理每个元素消耗 500 毫秒
println("Processed $x")
}
delay(2000) // 挂起主线程几秒钟
}
可以在这里获取完整代码。
这段代码的输出更好地说明了背压是如何在协程中工作的:
Sent 1
Processed 1
Sent 2
Processed 2
Sent 3
Processed 3
Complete
当尝试发送另一个元素的时候,我们看到这里的处理者协程是如何将第一个元素放入缓冲区并挂起的。
只有当消费者处理了第一个元素,处理者才会发送第二个元素并恢复,等等。
Rx 主题 vs 广播通道
[BroadcastChannel]。在 Rx 中有一种主题——
BehaviorSubject
被用来管理状态:
fun main() {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two") // 更新 BehaviorSubject 的状态,“one”变量被丢弃
// 现在订阅这个主题并打印所有信息
subject.subscribe(System.out::println)
subject.onNext("three")
subject.onNext("four")
}
可以在这里获取完整代码。
这段代码打印订阅时主题的当前状态及其所有后续更新:
two
three
four
您可以像使用任何其他响应式流一样从协程订阅主题:
fun main() = runBlocking<Unit> {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two")
// 现在启动一个协程来打印所有东西
GlobalScope.launch(Dispatchers.Unconfined) { // 在不受限的上下文中启动协程
subject.collect { println(it) }
}
subject.onNext("three")
subject.onNext("four")
}
可以在这里获取完整代码。
结果是相同的:
two
three
four
这里我们使用 [Dispatchers.Unconfined] 协程上下文以与 Rx 中的订阅相同的行为启动消费协程。
协程的优点是很容易获得单线程 UI 更新的混合行为。
一个典型的 UI 应用程序不需要响应每一个状态改变。只有最近的状态需要被响应。
应用程序状态的一系列背靠背更新只需在UI中反映一次,
-->并释放主线程:
fun main() = runBlocking<Unit> {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two")
// 现在启动一个协程来打印最近的更新
launch { // 为协程使用主线程的上下文
subject.collect { println(it) }
}
subject.onNext("three")
subject.onNext("four")
yield() // 使主线程让步来启动协程 <--- 这里
subject.onComplete() // 现在也结束主题的序列来取消消费者
}
可以在这里获取完整代码。
现在协程只处理(打印)最近的更新:
four
没有桥接到响应式流:
fun main() = runBlocking<Unit> {
val broadcast = ConflatedBroadcastChannel<String>()
broadcast.offer("one")
broadcast.offer("two")
// 现在启动一个协程来打印最近的更新
launch { // 为协程使用主线程的上下文
broadcast.consumeEach { println(it) }
}
broadcast.offer("three")
broadcast.offer("four")
yield() // 使主线程让步来启动协程
broadcast.close() // 现在也结束主题的序列来取消消费者
}
可以在这里获取完整代码。
它与基于 BehaviorSubject
的先前的示例产生了相同的输出:
four
操作符
-->以及反转来处理相关的流。创建你自己的并且支持背压的操作符是非常臭名昭著以及困难的。
协程与通道则被设计为提供完全相反的体验。这里没有内建的操作符,
但是处理元素流是非常简单并且自动支持背压的,
即使是在你没有明确思考这一点的情况下。
本节将展示以协程为基础而实现的一系列响应式流操作符。
Range
让我们推出自己的为响应式流 Publisher
接口实现的
range
-->这篇博客中。
它需要很多代码。
以下是与协同程序相对应的代码:
fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) send(x)
}
-->的小型响应式流库。
直接在协程中使用:
fun main() = runBlocking<Unit> {
// Range 从 runBlocking 中承袭了父 job,但是使用 Dispatchers.Default 来覆盖调度器
range(Dispatchers.Default, 1, 5).collect { println(it) }
}
可以在这里获取完整代码。
这段代码的结果非常值得我们期待:
1
2
3
4
5
熔合 filter 与 map 操作符
fun <T, R> Publisher<T>.fusedFilterMap(
context: CoroutineContext, // 协程执行的上下文
predicate: (T) -> Boolean, // 过滤器 predicate
mapper: (T) -> R // mapper 函数
) = publish<R>(context) {
collect { // 收集源流
if (predicate(it)) // 过滤的部分
send(mapper(it)) // 转换的部分
}
}
使用先前 range
中的示例我们可以测试我们的 fusedFilterMap
来过滤偶数以及将它们映射到字符串:
fun main() = runBlocking<Unit> {
range(1, 5)
.fusedFilterMap(Dispatchers.Unconfined, { it % 2 == 0}, { "$it is even" })
.collect { println(it) } // 打印所有的字符串结果
}
可以在这里获取完整代码。
不难看出其结果会是:
2 is even
4 is even
Take until
我们来实现自己的
takeUntil
操作符。这非常棘手,
因为需要跟踪和管理对两个流的订阅。
fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
this@takeUntil.openSubscription().consume { // 显式地打开 Publisher<T> 的通道
val current = this
other.openSubscription().consume { // 显式地打开 Publisher<U> 的通道
val other = this
whileSelect {
other.onReceive { false } // 释放任何从 `other` 接收到的元素
current.onReceive { send(it); true } // 在这个通道上重新发送元素并继续
}
}
}
}
这段代码使用 [whileSelect] 作为比 while(select{...}) {}
循环更好的快捷方式,并且 Kotlin 的
[consume] 表达式会在退出时关闭通道,并取消订阅相应的发布者。
fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> {
for (x in start until start + count) {
delay(time) // 在每次发送数字之前等待
send(x)
}
}
下面的代码展示了 takeUntil
是如何工作的:
fun main() = runBlocking<Unit> {
val slowNums = rangeWithInterval(200, 1, 10) // 数字之间有 200 毫秒的间隔
val stop = rangeWithInterval(500, 1, 10) // 第一个在 500 毫秒之后
slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // 让我们测试它
}
可以在这里获取完整代码。
执行
1
2
Merge
使用协程处理多个数据流总是至少有两种方法。一种方法是调用
merge
操作符来使用第二种的方法:
fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
collect { pub -> // for each publisher collected
launch { // launch a child coroutine
pub.collect { send(it) } // resend all element from this publisher
}
}
}
-->并且当 publish
协程被取消或以其它的方式执行完毕时将会被取消。
fun CoroutineScope.testPub() = publish<Publisher<Int>> {
send(rangeWithInterval(250, 1, 4)) // 数字 1 在 250 毫秒发射,2 在 500 毫秒,3 在 750 毫秒,4 在 1000 毫秒
delay(100) // 等待 100 毫秒
send(rangeWithInterval(500, 11, 3)) // 数字 11 在 600 毫秒,12 在 1100 毫秒,13 在 1600 毫秒
delay(1100) // 在启动完成后的 1.2 秒之后等待 1.1 秒
}
这段测试代码在 testPub
上使用了 merge
并且展示结果:
fun main() = runBlocking<Unit> {
testPub().merge(Dispatchers.Unconfined).collect { println(it) } // 打印整个流
}
可以在这里获取完整代码。
并且结果应该是:
1
2
11
3
4
12
13
协程上下文
所有的示例操作符都在先前的示例中显式地设置了
CoroutineContext
线程与 Rx
下面的示例中展示了基本的在 Rx 中管理线程上下文。
这里的 rangeWithIntervalRx
是rangeWithInterval
函数使用 Rx 的
zip
、range
以及 interval
操作符的一个实现。
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
Flowable.zip(
Flowable.range(start, count),
Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
BiFunction { x, _ -> x })
fun main() {
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
可以在这里获取完整代码。
我们显式地通过
Schedulers.computation()
1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1
线程与协程
在协程的世界中 Schedulers.computation()
大致对应于 [Dispatchers.Default],
所以先前的示例将变成下面这样:
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // 在每次数字发射前等待
send(x)
}
}
fun main() {
Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3))
.subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
可以在这里获取完整代码。
产生的输出将类似于:
1 on thread ForkJoinPool.commonPool-worker-1
2 on thread ForkJoinPool.commonPool-worker-1
3 on thread ForkJoinPool.commonPool-worker-1
这里我们使用了 Rx 的
subscribe
Rx observeOn
在 Rx 中你操作使用了特别的操作符来为调用链修改线程上下文。
如果你不熟悉它的话,
你可以从这篇很棒的教程中获得指导。
举例来说,这里使用了
observeOn
操作符。让我们修改先前的示例并观察使用 Schedulers.computation()
的效果:
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // 在每次数字发射前等待
send(x)
}
}
fun main() {
Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3))
.observeOn(Schedulers.computation()) // <-- 添加了这一行
.subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
可以在这里获取完整代码。
这里的输出有所不同了,提示了“RxComputationThreadPool”:
1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1
使用协程上下文来管理它们
替代使用 Rx 的 subscribe
操作符遍历结果:
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
Flowable.zip(
Flowable.range(start, count),
Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
BiFunction { x, _ -> x })
fun main() = runBlocking<Unit> {
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.collect { println("$it on thread ${Thread.currentThread().name}") }
}
可以在这里获取完整代码。
结果信息将会被打印在主线程中:
1 on thread main
2 on thread main
3 on thread main
不受限的上下文
-->看到了 subscribe
运算符的示例。
在协程的世界中,[Dispatchers.Unconfined] 则承担了类似的任务。让我们修改先前的示例,
-->只是等待的时候使用 [Job.join]:
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
Flowable.zip(
Flowable.range(start, count),
Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
BiFunction { x, _ -> x })
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Unconfined) { // 在不受限的山下文中启动一个新协程(没有它自己的线程池)
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.collect { println("$it on thread ${Thread.currentThread().name}") }
}
job.join() // 等待我们的协程结束
}
可以在这里获取完整代码。
1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1
注意,该 [Dispatchers.Unconfined] 上下文应该被谨慎使用。由于降低了局部的堆栈操作以及开销调度的减少,
如果一个协程将一个元素发送到一个通道,那么调用的线程
[send][SendChannel.send] 可能会开始使用 [Dispatchers.Unconfined] 调度程序执行协程的代码。
-->线程切换操作符是非常类似的。这在 Rx 中是默认正常的,因为操作符经常做一些非常小块的工作并且你必须做一些复杂处理来合并大量的操作符。然而,这对于协程来说是不常见的,
[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
[Dispatchers.Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-unconfined.html
[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/yield.html
[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html
[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
[consume]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume.html
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
[BroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/index.html
[ConflatedBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-conflated-broadcast-channel/index.html
[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
[whileSelect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/while-select.html
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
[org.reactivestreams.Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html
[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html