开始使用Kotlin协程
本文主要介绍协程的用法, 以及使用协程能带来什么好处. 另外, 也会粗略提一下协程的大致原理.
本文的意义可能仅仅是让你了解一下协程, 并愿意开始使用它.
如果想彻底理解协程, 请查看官方文档, 官方文档链接将在文章的结尾给出.
如果你以前在别的语言里学习过协程, 如Python的yield, 那请你先忘记它们, 毕竟还是有些区别, 等你弄懂了Kotlin的协程, 再去作对比, 否则, 可能会有一些先入为主的思路来阻碍你理解, 我就吃过这个亏.
初识协程:
首先我们来瞄一眼协程是长啥样的, 以下引用(copy)了官网的一个例子:
fun main(args: Array<String>) {
launch(CommonPool) {
delay(1000L)
println("World!")
}
println("Hello,")
Thread.sleep(2000L)
}
/*
运行结果: ("Hello,"会立即被打印, 1000毫秒之后, "World!"会被打印)
Hello,
World!
*/
姑且不管里面具体的细节, 上面代码大体的运行流程是这样的:
A. 主流程:
- 调用系统的launch方法启动了一个协程, 跟随的大括号可以看做是协程体.
(其中的CommonPool暂且理解成线程池, 指定了协程在哪里运行) - 打印出"Hello,"
- 主线程sleep两秒
(这里的sleep只是保持进程存活, 目的是为了等待协程执行完)
B. 协程流程:
- 协程延时1秒
- 打印出"World!"
解释一下delay方法:
在协程里delay方法作用等同于线程里的sleep, 都是休息一段时间, 但不同的是delay不会阻塞当前线程, 而像是设置了一个闹钟, 在闹钟未响之前, 运行该协程的线程可以被安排做了别的事情, 当闹钟响起时, 协程就会恢复运行.
协程启动后还可以取消
launch方法有一个返回值, 类型是Job, Job有一个cancel
方法, 调用cancel方法可以取消协程, 看一个数羊的例子:
fun main(args: Array<String>) {
val job = launch(CommonPool) {
var i = 1
while(true) {
println("$i little sheep")
++i
delay(500L) // 每半秒数一只, 一秒可以输两只
}
}
Thread.sleep(1000L) // 在主线程睡眠期间, 协程里已经数了两只羊
job.cancel() // 协程才数了两只羊, 就被取消了
Thread.sleep(1000L)
println("main process finished.")
}
运行结果是:
1 little sheep
2 little sheep
main process finished.
如果不调用cancel, 可以数到4只羊.
协程的核心是suspend方法, 下面先讲解一下suspend方法, 之后再继续别的话题.
理解suspend方法:
suspend方法是协程的核心, 理解suspend方法是使用和理解协程的关键.
(suspend lambda和suspend方法差不多, 只是没有名字, 不再单独介绍了)
suspend方法的语法很简单, 只是比普通方法只是多了个suspend
关键字:
suspend fun foo(): ReturnType {
// ...
}
suspend方法只能在协程里面调用, 不能在协程外面调用.
suspend方法本质上, 与普通方法有较大的区别, suspend方法的本质是异步返回(注意: 不是异步回调). 后面我们会解释这句话的含义.
现在, 我们先来看一个异步回调的例子:
fun main(...) {
requestDataAsync {
println("data is $it")
}
Thead.sleep(10000L) // 这个sleep只是为了保活进程
}
fun requestDataAsync(callback: (String)->Unit) {
Thread() {
// do something need lots of times.
// ...
callback(data)
}.start()
}
逻辑很简单, 就是通过异步的方法拉一个数据, 然后使用这个数据, 按照以往的编程方式, 若要接受异步回来的数据, 唯有使用callback.
但是假如使用协程, 可以不使用callback, 而是直接把这个数据"return"回来, 调用者不使用callback接受数据, 而是像调用同步方法一样接受返回值. 如果上述功能改用协程, 将会是:
fun main(...) {
launch(Unconfined) { // 请重点关注协程里是如何获取异步数据的
val data = requestDataAsync() // 异步回来的数据, 像同步一样return了
println("data is $it")
}
Thead.sleep(10000L) // 请不要关注这个sleep
}
suspend fun requestDataAsync() { // 请注意方法前多了一个suspend关键字
return async(CommonPool) { // 先不要管这个async方法, 后面解释
// do something need lots of times.
// ...
data // return data, lambda里的return要省略
}.await()
}
这里, 我们首先将requestDataAsync转成了一个suspend方法, 其原型的变化是:
- 在前加了个
suspend
关键字. - 去除了原来的callback参数.
这里先不去深究这个方法的新实现, 后面会专门解释.
这里需要关注的点是: 在协程里面, 调用suspend方法, 异步的数据像同步一样般return了.
这是怎么做到的呢?
当程序执行到requestDataAsync内部时, 通过async启动了另外一个新的子协程去拉取数据, 启动这个新的子协程后, 当前的父协程就挂起了, 此时requestDataAsync还没有返回.
子协程一直在后台跑, 过了一段时间, 子协程把数据拉回来之后, 会恢复它的父协程, 父协程继续执行, requestDataAsync就把数据返回了.
为了加深理解, 我们来对比一下另一个例子: 不使用协程, 将异步方法也可以转成同步的方法(在单元测试里, 我们经常这么做):
fun main(...) {
val data = async2Sync() // 数据是同步返回了, 但是线程也阻塞了
println("data is $it")
// Thead.sleep(10000L) // 这一句在这里毫无意义了, 注释掉
}
private var data = ""
private fun async2Sync(): String {
val obj = Object() // 随便创建一个对象当成锁使用
requestDataAsync { data ->
this.data = data // 暂存data
synchronized(locker) {
obj.notifyAll() // 通知所有的等待者
}
}
obj.wait() // 阻塞等待
return this.data
}
fun requestDataAsync(callback: (String)->Unit) {
// ...普通的异步方法
}
注意对比上一个协程的例子, 这样做表面上跟它是一样的, 但是这里main方法会阻塞的等待async2Sync()方法完成. 同样是等待, 协程就不会阻塞当前线程, 而是自己主动放弃执行权, 相当于遣散当前线程, 让它去干别的事情去.
为了更好的理解这个"遣散"的含义, 我们再来看一个例子:
fun main(args: Array<String>) {
// 1. 程序开始
println("${Thread.currentThread().name}: 1");
// 2. 启动一个协程, 并立即启动
launch(Unconfined) { // Unconfined意思是在当前线程(主线程)运行协程
// 3. 本协程在主线程上直接开始执行了第一步
println("${Thread.currentThread().name}: 2");
/* 4. 本协程的第二步调用了一个suspend方法, 调用之后,
* 本协程就放弃执行权, 遣散运行我的线程(主线程)请干别的去.
*
* delay被调用的时候, 在内部创建了一个计时器, 并设了个callback.
* 1秒后计时器到期, 就会调用刚设置的callback.
* 在callback里面, 会调用系统的接口来恢复协程.
* 协程在计时器线程上恢复执行了. (不是主线程, 跟Unconfined有关)
*/
delay(1000L) // 过1秒后, 计时器线程会resume协程
// 7. 计时器线程恢复了协程,
println("${Thread.currentThread().name}: 4")
}
// 5. 刚那个的协程不要我(主线程)干活了, 所以我继续之前的执行
println("${Thread.currentThread().name}: 3");
// 6. 我(主线程)睡2秒钟
Thread.sleep(2000L)
// 8. 我(主线程)睡完后继续执行
println("${Thread.currentThread().name}: 5");
}
运行结果:
main: 1
main: 2
main: 3
kotlinx.coroutines.ScheduledExecutor: 4
main: 5
上述代码的注释详细的列出了程序运行流程, 看完之后, 应该就能明白 "遣散" 和 "放弃执行权" 的含义了.
Unconfined
的含义是不给协程指定运行的线程, 逮到谁就使用谁, 启动它的线程直接执行它, 但被挂起后, 会由恢复它的线程继续执行, 如果一个协程会被挂起多次, 那么每次被恢复后, 都可能被不同线程继续执行.
现在再来回顾刚刚那句: suspend方法的本质就是异步返回.
含义就是将其拆成 "异步" + "返回":
- 首先, 数据不是同步回来的(同步指的是立即返回), 而是异步回来的.
- 其次, 接受数据不需要通过callback, 而是直接接收返回值.
调用suspend方法的详细流程是:
在协程里, 如果调用了一个suspend方法, 协程就会挂起, 释放自己的执行权, 但在协程挂起之前, suspend方法内部一般会启动了另一个线程或协程, 我们暂且称之为"分支执行流"吧, 它的目的是运算得到一个数据.
当suspend方法里的*分支执行流"完成后, 就会调用系统API重新恢复协程的执行, 同时会数据返回给协程(如果有的话).
__为什么不能再协程外面调用suspend方法? __
suspend方法只能在协程里面调用, 原因是只有在协程里, 才能遣散当前线程, 在协程外面, 不允许遣散, 反过来思考, 假如在协程外面也能遣散线程, 会怎么样, 写一个反例:
fun main(args: Array<String>) {
requestDataSuspend();
doSomethingNormal();
}
suspend fun requestDataSuspend() {
// ...
}
fun doSomethingNormal() {
// ...
}
requestDataSuspend是suspend方法, doSomethingNormal是正常方法, doSomethingNormal必须等到requestDataSuspend执行完才会开始, 后果main方法失去了并行的能力, 所有地方都失去了并行的能力, 这肯定不是我们要的, 所以需要约定只能在协程里才可以遣散线程, 放弃执行权, 于是suspend方法只能在协程里面调用.
概念解释: Continuation 与 suspension point
----个人建议专有名词别翻译成中文, 否则很容易因为断句错误而产生误解
协程的执行其实是断断续续的: 执行一段, 挂起来, 再执行一段, 再挂起来, ...
每个挂起的地方是一个suspension point
, 每一小段执行是一个Continuation
.
协程的执行流被它的 "suspension point" 分割成了很多个 "Continuation" .
我们可以用一条画了很多点的线段来表示:
其中的<u>Continuation 0</u>比较特殊, 是从起点开始, 到第一个suspension point结束, 由于它的特殊性, 又被称为Initial Continuation
.
协程创建后, 并不总是立即执行, 要分是怎么创建的协程, 通过launch方法的第二个参数是一个枚举类型
CoroutineStart
, 如果不填, 默认值是DEFAULT
, 那么久协程创建后立即启动, 如果传入LAZY
, 创建后就不会立即启动, 直到调用Job的start
方法才会启动.
suspension point只是一个概念, 而Continuation在Kotlin里有一个对应interface, 关于这个interface后面再介绍.
封装异步回调方法
在没有协程的世界里, 通常异步的方法都需要接受一个callback用于发布运算结果.
在协程里, 所有接受callback的方法, 都可以转成不需要callback的suspend方法.
上面的requestDataSuspend方法就是一个这样的例子, 我们回过头来再看一眼:
suspend fun requestDataSuspend() {
return async(CommonPool) {
// do something need lots of times.
// ...
data // return data
}.await()
}
其内部通过调用了async和await方法来实现(关于async和await我们后面再介绍), 这样虽然实现功能没问题, 但并不最合适的方式, 上面那样做只是为了追求最简短的实现, 合理的实现应该是调用suspendCoroutine
方法, 大概是这样:
suspend fun requestDataSuspend() {
suspendCoroutine { cont ->
// ... 细节暂时省略
}
}
// 可简写成:
suspend fun requestDataSuspend() = suspendCoroutine { cont ->
// ...
}
在完整实现之前, 需要先理解suspendCoroutine
方法, 它是Kotlin标准库里的一个方法, 原型如下:
suspend fun <T> suspendCoroutine(block: (Continuation<T>) -> Unit): T
后面我们讨论Kotlin协程官方API的时候就会知道, 这是Kotlin标准库里用于支持协程的底层API非常少(大多数API不在标准库, 而是在应用层的扩展库, 如上面的launch方法), 这是其中一个.
suspendCoroutine的作用就是将当前执行流挂起, 在适合的时机再将协程恢复执行, 我们可以看到他的参数是一个lambda, lambda的参数是一个Continuation
, 我们刚刚其实已经提到过Continuation了, 它表示一段执行流, 这里就不做过多解释了, 这个方法里的<u>Continuation实例</u>代表的执行流是从当前的suspension point开始, 到下一个suspension point结束, 当前的suspension point就是调用suspendCoroutine这一刻.
调用suspendCoroutine之后, 当前的执行流会挂起(调用suspendCoroutine的线程会遣散, 但不是整个进程都挂起, 不然谁做事呢), 然后开另一个执行流去做异步的事情, 等到异步的事情做完, 当前的执行流又会恢复, 下面看一下是如何恢复的.
suspendCoroutine的会自动捕获当前的执行环境(如临时变量, 参数等), 然后存放到一个Continuation中, 并且作为参数传给它的lambda.
之前已经提到Continuation是标准库里的一个interface, 它的原型是:
interface Continuation<in T> {
val context: CoroutineContext // 暂时不管这个
fun resume(value: T)
fun resumeWithException(exception: Throwable)
}
它有两个方法resume
和resumeWithException
:
- 若调用resume就是正常恢复
- 调用resumeWithException就是异常恢复
现在来完善一下刚刚的例子:
suspend fun requestDataSuspend() = suspendCoroutine { cont ->
requestDataFromServer { data -> // 普通方法还是通过callback接受数据
if (data != null) {
cont.resume(data)
} else {
cont.resumeWithException(MyException())
}
}
}
/** 普通的异步回调方法 */
fun requestDataFromServer(callback: (String)->Unit) {
// ... get data from server, it will call back when finished.
}
逻辑很简单, 如果data有效就正常恢复, 否则异常恢复.
但这里需要注意的是: 传给resume的参数会变成suspendCoroutine的返回值, 进而成为了requestDataSuspend方法的返回值.
这个地方太神奇了, Kotlin是如何做到的呢?, 估计短时间也难以理解, 先记住吧.
suspendCoroutine有个特点:
suspendCoroutine { cont ->
// 如果本lambda里返回前, cont的resume和resumeWithException都没有调用
// 那么当前执行流就会挂起, 并且挂起的时机是在suspendCoroutine之前
// 就是在suspendCoroutine内部return之前就挂起了
// 如果本lambda里返回前, 调用了cont的resume或resumeWithException
// 那么当前执行流不会挂起, suspendCoroutine直接返回了,
// 若调用的是resume, suspendCoroutine就会像普通方法一样返回一个值
// 若调用的是resumeWithException, suspendCoroutine会抛出一个异常
// 外面可以通过try-catch来捕获这个异常
}
回过头来看一下, 刚刚的实现有调用resume方法吗, 我们把它折叠一下:
suspend fun requestDataSuspend() = suspendCoroutine { cont ->
requestDataFromServer { ... }
}
清晰了吧, 没有调用, 所以suspendCoroutine还没有返回之前就挂起了, 但是挂起之前lambda执行完了, lambda里调用了requestDataFromServer, requestDataFromServer里启动了真正做事的流程(异步执行的), 而suspendCoroutine则在挂起等待.
等到requestDataFromServer完成工作, 就会调用传入的callback, 而这个callback里调用了cont.resume(data), 外层的协程就恢复了, 随后suspendCoroutine就会返回, 返回值就是data.
大家一定很好奇Kotlin内部是如何实现的, 要像彻底了解其中的奥妙, 还是要看官方文档和代码, 这里只简单介绍一下大致原理, 太细的我也不懂, 大家凑合着看一下, 看不懂也没关系.
在Kotlin内部, 协程被实现成了一个状态机, 状态的个数就是suspension point的个数+1(初始状态), 当前的状态就是当前的suspension point, 当调用resume时, 就会执行下一个Continuation.
估计大家这个时候应该是似懂非懂, 其实作为使用者, 这已经够了, 但是要深入研究, 还是靠自己研究代码.
async/await模式:
我们前面多次使用了launch方法, 它的作用是创建协程并立即启动, 但是有一个问题, 就是通过launch方法创建的协程都没办法携带返回值. async之前也出现过, 但一直没有详细介绍.
async方法作用和launch方法基本一样, 创建一个协程并立即启动, 但是async创建的协程可以携带返回值.
launch方法的返回值类型是Job, async方法的返回值类型是Deferred, 是Job的子类, Deferred里有个await
方法, 调用它可得到协程的返回值.
async/await
是一种常用的模式, async的含义是启动一个异步操作, await的含义是等待这个异步操作结果.
是谁要等它啊, 在传统的不使用协程的代码里, 是线程在等(线程不干别的事, 就在那里傻等). 在协程里不是线程在等, 而且是执行流在等, 当前的流程挂起(底下的线程会被遣散去干别的事), 等到有了运算结果, 流程才继续运行.
所以我们又可以顺便得出一个结论: 在协程里执行流是线性的, 其中的步骤无论是同步的还是异步的, 后面的步骤都会等前面的步骤完成.
我们可以通过async起多个任务, 他们会同时运行, 我们之前使用的async姿势不是很正常, 下面看一下使用async正常的姿势:
fun main(...) {
launch(Unconfined) {
// 任务1会立即启动, 并且会在别的线程上并行执行
val deferred1 = async { requestDataAsync1() }
// 上一个步骤只是启动了任务1, 并不会挂起当前协程
// 所以任务2也会立即启动, 也会在别的线程上并行执行
val deferred2 = async { requestDataAsync2() }
// 先等待任务1结束(等了约1000ms),
// 然后等待任务2, 由于它和任务1几乎同时启动的, 所以也很快完成了
println("data1=$deferred2.await(), data2=$deferred2.await()")
}
Thead.sleep(10000L) // 继续无视这个sleep
}
suspend fun requestDataAsync1(): String {
delay(1000L)
return "data1"
}
suspend fun requestDataAsync2(): String {
delay(1000L)
return "data2"
}
运行结果很简单, 不用说了, 但是协程总耗时是多少呢, 约1000ms, 不是2000ms, 因为两个任务是并行运行的.
有一个问题: 假如任务2先于任务1完成, 结果是怎样的呢?
答案是: 任务2的结果会先保存在deferred2里, 当调用deferred2.await()时, 会立即返回, 不会引起协程挂起, 因为deferred2已经准备好了.
所以, suspend方法并不总是引起协程挂起, 只有其内部的数据未准备好时才会.
需要注意的是: await是suspend方法, 但async不是, 所以它才可以在协程外面调用, async只是启动了协程, async本身不会引起协程挂起, 传给async的lambda(也就是协程体)才可能引起协程挂起.
async/await模式在别的语言里, 被实现成了两个关键字, 但在Kotlin里只是两个很平常的方法.
Generators介绍:
学习Python的协程的时候, 最先学习的就是Generators, 它的作用就是通过计算产生序列, 而不用通过列表之类存储机制. 以下通过Generators产生斐波那契序列:
// inferred type is Sequence<Int>
val fibonacci = buildSequence {
yield(1) // first Fibonacci number
var cur = 1
var next = 1
while (true) {
yield(next) // next Fibonacci number
val tmp = cur + next
cur = next
next = tmp
}
}
fun main(...) {
launch(Unconfined) { // 请重点关注协程里是如何获取异步数据的
fibonacci.take(10).forEach { print("$it, ") }
}
Thead.sleep(10000L) // 请不要关注这个sleep
}
// will print 1, 1, 2, 3, 5, 8, 13, 21, 34, 55,
我觉得这个没什么好解释的, yield
是一个suspend方法, 放弃执行权, 并将数据返回.
根据前面的知识, 我们可以推断出, yield内部肯定最终会调用到Continuation的resume方法.
yield在别的语言, 一般是一个关键字, Kotlin中也是一个方法.
yield是标准库里的API, 大多数情况我们不需要直接调用这个方法, 使用kotlinx.coroutines里面的Channel
和produce
方法更加方法. 具体可以参考这里:
https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md#building-channel-producers
序列的产生跟RX其实有点像, 但也是区别的, 具体可以参考这里:
https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/coroutines-guide-reactive.md
目前没有发现特别需要使用Generators的场景, 所以这里不做太多讨论.
协程API说明:
Kotlin的开发者对协程的实现比较独特, 语言机制本身只增加了极少的关键字, 标准库也只有极少的API, 但这并不代表功能少, 根据Kotlin的设计, 很多功能型API都放到了更上层的应用库里去实现.
只将少量核心的机制才放到语言本身和标准库上. 这样做不仅使得语言更简单, 而且灵活性更强.
Kotlin官方对协程提供的三种级别的能力支持, 分别是: 最底层的语言层, 中间层标准库(kotlin-stdlib), 以及最上层应用层(kotlinx.coroutines).
应用层:
这一层是我们的程序直接调用的层, 提供一些常用的实现方法, 如launch方法, async方法等, 它的实现在kotlinx.coroutines里面.
标准库:
标准库仅仅提供了少量创建协程的方法, 位于:
kotlin.coroutines.experimental:
-- createCoroutine()
-- startCoroutine()
-- suspendCoroutine()
到目前为止, 我们直接使用到的只有suspendCoroutine
方法.
launch和async方法的实现里最终调用了startCoroutine
方法.
Generators里的buildSequence方法, 最终会调用buildSequence
来实现.
语言层:
语言本身主要提供了对suspend
关键字的支持, Kotlin编译器会对suspend修饰的方法或lambda特殊处理, 生成一些中间类和逻辑代码.
我们平常用到的基本都是应用层的接口, 应用层提供了很多非常核心功能, 这些功能在其他语言里大多是通过关键字来实现的, 而在Kotlin里, 这些都是实现成了方法.
总结
协程是什么:
看了这么多例子, 我们现在可以总结一下协程是什么, 协程到底是什么, 很难给出具体的定义, 就算能给出具体定义, 也会非常抽象难以理解的.
另一方面, 协程可以说是编译器的能力, 因为协程并不需要操作系统和硬件的支持(线程需要), 是编译器为了让开发者写代码更简单方便, 提供了一些关键字, 并在内部自动生成了一些支持型代码(可能是字节码).
以下我个人的总结:
- 首先, 协程是一片包含特定逻辑的代码块, 这个代码块可以调度到不同的线程上执行;
- 其次, 协程一种环境, 在这种环境里, 方法可以被等待执行, 有了运算结果之后才返回, 在等待期间, 承载协程的线程资源可以被别的地方使用.
- 第三, 协程是一个独立于运行流程的逻辑流程, 协程里面的步骤, 无论是同步的还是异步的, 都是线性(从前到后依次完成的).
协程和线程区别与关系:
线程和协程的目的本质上存在差异:
- 线程的目的是提高CPU资源使用率, 使多个任务得以并行的运行, 所以线程是为了服务于机器的.
- 协程的目的是为了让多个任务之间更好的协作, 主要体现在代码逻辑上, 所以协程是为了服务于人的, 写代码的人. (也有可能结果会能提升资源的利用率, 但并不是原始目的)
在调度上, 协程跟线程也不同:
- 线程的调度是系统完成的, 一般是抢占式的, 根据优先级来分配, 是空分复用.
- 协程的调度是开发者根据程序逻辑指定好的, 在不同的时期把资源合理的分配给不同的任务, 是时分复用的.
作用上的不同:
- 协程确保了代码逻辑是顺序的, 不管同步操作要是异步操作, 前一个完成, 后一个才会开始.
- 线程可以被调度到CPU上执行, 这样代码才能真正运行起来.
协程与线程的关系:
协程并不是取代线程, 而且抽象于线程之上, 线程是被分割的CPU资源, 协程是组织好的代码流程, 协程需要线程来承载运行, 线程是协程的资源, 但协程不会直接使用线程, 协程直接利用的是执行器(Interceptor), 执行器可以关联任意线程或线程池, 可以使当前线程, UI线程, 或新建新程. 可总结如下:
- 线程是协程的资源.
- 协程通过Interceptor来间接使用线程这个资源.
结语:
如果需要经常使用协程, 建议抽时间看一下官方文档.
最后, 感谢大家的阅读, 希望本文对你有所帮助 !
官方英文文档连接:
_
第1个页面是是官方指南的子页面, 第2个和第3个分别是两个GitHub项目里面的markdown文档, 他们所在的工程还包含其他文档, 有需要可以自行浏览.
另外, 提个建议: 如果看着看着卡壳了, 可以跳过或查阅另外几个文档, 以后再回过来看, 别的文档有可能会用别的方式或别的例子来描述了同一个东西)
_