Kotlin---使用协程的异步
协程间的通信
协程与协程间不能直接通过变量来访问数据,会导致数据原子性的问题,所以协程提供了一套Channel
机制来在协程间传递数据。
Channel 是一个和 BlockingQueue
非常相似的概念。其中一个不同是它代替了阻塞的 put
操作并提供了挂起的 send
,还替代了阻塞的 take
操作并提供了挂起的 receive
。
Channel
发送和接收操作是 公平的 并且尊重调用它们的多个协程。它们遵守先进先出原则,可以看到第一个协程调用 receive 并得到了元素
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
// 这里可能是消耗大量CPU运算的异步逻辑,我们将仅仅做5次整数的平方并发送
for (x in 1..5) channel.send(x * x)
channel.close()
}
// 这里我们打印了5次被接收的整数:
// channel.receive()是阻塞的,等待发送数据发送完毕
repeat(5) { println(channel.receive()) }
println("Done!")
}
当发送完毕后,记得调用channel.close()
,close()
操作就像向通道发送了一个特殊的关闭指令。 这个迭代停止就说明关闭指令已经被接收了。所以这里保证所有先前发送出去的元素都在通道关闭前被接收到。
基于协程的生产者\消费者
在协程中,可以通过produce
来模拟生产者生产数据。并且通过consume
来模拟消费者情况。
目前,在1.3.11版本的Kotlin中,produce
与consume
都还只是实验性的功能,没有正式release,使用时记得使用@ExperimentalCoroutinesApi
标记使用的函数
runBlocking {
val receiveChannel: ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}
receiveChannel.consumeEach {
println(it)
}
}
扇入
扇入的概念与文件系统中的IO多路复用差不多。
扇入允许多个协程可以发送到同一个通道。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // 接收前六个
println("${channel.receive()}...${Thread.currentThread().name}")
}
coroutineContext.cancelChildren() // 取消所有子协程来让主协程结束
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
运行结果:
foo...main @coroutine#1
foo...main @coroutine#1
BAR!...main @coroutine#1
foo...main @coroutine#1
foo...main @coroutine#1
foo...main @coroutine#1
使用 async 并发
async
就类似于 launch
。它启动了一个单独的协程,这是一个轻量级的线程并与其它所有的协程一起并发的工作。不同之处在于 launch
返回一个 Job
并且不附带任何结果值,而 async
返回一个 Deferred
—— 一个轻量级的非阻塞 future, 这代表了一个将会在稍后提供结果的 promise。你可以使用 .await()
在一个延期的值上得到它的最终结果, 但是 Deferred
也是一个 Job
,所以如果需要的话,你可以取消它。
其实区别就是,async
可以获取执行结果,而launch
只是简单的执行任务。
import kotlinx.coroutines.*
import kotlin.system.*
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // 假设我们在这里做了些有用的事
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // 假设我们在这里也做了些有用的事
return 29
}
执行结果:
The answer is 42
Completed in 1017 ms
而async{}
会直接启动协程,如果需要等待某个事件启动的话,则需要使用CoroutineStart.LAZY
:
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
// 执行一些计算
one.start() // 启动第一个
two.start() // 启动第二个
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
协程安全
协程与线程一样,对于数据的操作无法保持原子性,所以在协程中,需要使用原子性的数据结构,例如AotimicInteger
等,或者使用mutex.withLock
,来处理数据的原子性
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
val jobs = List(n) {
launch {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
val mutex = Mutex()
var counter = 0
fun main() = runBlocking<Unit> {
GlobalScope.massiveRun {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
输出结果:
Completed 100000 actions in 104 ms
Counter = 100000
Actor
Actor 是由协程、被限制并封装到该协程中的状态以及一个与其它协程通信的 通道 组合而成的一个实体。一个简单的 actor 可以简单的写成一个函数, 但是一个拥有复杂状态的 actor 更适合由类来表示。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.*
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同个动作的次数
val time = measureTimeMillis {
val jobs = List(n) {
launch {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
// 计数器 Actor 的各种类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 递增计数器的单向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 携带回复的请求
// 这个函数启动一个新的计数器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor 状态
for (msg in channel) { // 即将到来消息的迭代器
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
fun main() = runBlocking<Unit> {
val counter = counterActor() // 创建该 actor
GlobalScope.massiveRun {
counter.send(IncCounter)
}
// 发送一条消息以用来从一个 actor 中获取计数值
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // 关闭该actor
}
actor 本身执行时所处上下文(就正确性而言)无关紧要。一个 actor 是一个协程,而一个协程是按顺序执行的,因此将状态限制到特定协程可以解决共享可变状态的问题。实际上,actor 可以修改自己的私有状态,但只能通过消息互相影响(避免任何锁定)。
actor 在高负载下比锁更有效,因为在这种情况下它总是有工作要做,而且根本不需要切换到不同的上下文。